This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 199344458 [FLINK-34874][source-connector/mongodb] Support 
initial.snapshotting.pipeline related configs in Table API
199344458 is described below

commit 19934445800cc8931ade34b5741cb69f8bada3a0
Author: Runkang He <[email protected]>
AuthorDate: Tue Apr 22 23:11:57 2025 +0800

    [FLINK-34874][source-connector/mongodb] Support 
initial.snapshotting.pipeline related configs in Table API
    
    This closes #3707
---
 .../docs/connectors/flink-sources/mongodb-cdc.md   | 40 +++++++++
 .../docs/connectors/flink-sources/mongodb-cdc.md   | 46 +++++++++-
 .../source/config/MongoDBSourceOptions.java        | 25 +++++-
 .../mongodb/table/MongoDBTableSource.java          | 16 ++++
 .../mongodb/table/MongoDBTableSourceFactory.java   | 25 ++++++
 .../mongodb/table/MongoDBTableFactoryTest.java     | 98 +++++++++++++++++++++-
 6 files changed, 243 insertions(+), 7 deletions(-)

diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
index f52b3be98..0e46ec2a8 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
@@ -247,6 +247,35 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
         <td>Long</td>
         <td>起始毫秒数, 仅适用于 <code>'timestamp'</code> 启动模式.</td>
     </tr>
+    <tr>
+      <td>initial.snapshotting.queue.size</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">16000</td>
+      <td>Integer</td>
+      <td>进行初始快照时的队列大小。仅在 scan.startup.mode 选项设置为 initial 时生效。<br>
+          注意:已弃用的选项名是 copy.existing.queue.size,为了兼容旧版本的作业,该选项名仍可用,但是推荐升级到新选项名
+      </td>
+    </tr>
+    <tr>
+      <td>initial.snapshotting.max.threads</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">Processors Count</td>
+      <td>Integer</td>
+      <td>执行数据复制时使用的线程数。仅在 scan.startup.mode 选项设置为 initial 时生效。<br>
+          注意:已弃用的选项名是 copy.existing.max.threads,为了兼容旧版本的作业,该选项名仍可用,但是推荐升级到新选项名
+      </td>
+    </tr>
+    <tr>
+      <td>initial.snapshotting.pipeline</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>MongoDB 管道操作的 JSON 对象数组,在快照读取阶段,会把该操作下推到 MongoDB,只筛选所需的数据,从而提高读取效率,
+          比如管道操作 [{"$match": {"closed": "false"}}] 表示只复制 closed 字段为 "false" 
的文档。<br>
+          该选项仅在 scan.startup.mode 选项设置为 initial 时生效,且仅限于在 Debezium 
模式下使用,不能用于增量快照模式,因为会出现语义不一致的问题。<br>
+          注意:已弃用的选项名是 copy.existing.pipeline,为了兼容旧版本的作业,该选项名仍可用,但是推荐升级到新选项名
+      </td>
+    </tr>
     <tr>
       <td>batch.size</td>
       <td>optional</td>
@@ -416,6 +445,17 @@ CREATE TABLE mongodb_source (...) WITH (
 **Notes:**
 - 'timestamp' 指定时间戳启动模式,需要开启增量快照读。
 
+### 快照数据筛选器
+
+配置选项 `initial.snapshotting.pipeline` 描述复制现有数据时的筛选器。<br>
+在快照读取阶段,会把该筛选器下推到 MongoDB,只筛选所需的数据,从而提高读取效率。
+
+在下面的示例中,`$match` 聚合运算符确保只复制 closed 字段设置为 "false" 的文档。
+
+```
+'initial.snapshotting.pipeline' = '[ { "$match": { "closed": "false" } } ]'
+```
+
 ### 更改流
 
 我们将 [MongoDB's official Kafka 
Connector](https://docs.mongodb.com/kafka-connector/current/kafka-source/) 从 
MongoDB 中读取快照或更改事件,并通过 Debezium 的 `EmbeddedEngine` 进行驱动。
diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md 
b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
index c4fb00af7..944c038b0 100644
--- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
@@ -241,11 +241,40 @@ Connector Options
         <td>Timestamp in millis of the start point, only used for 
<code>'timestamp'</code> startup mode.</td>
     </tr>
     <tr>
-      <td>copy.existing.queue.size</td>
+      <td>initial.snapshotting.queue.size</td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">10240</td>
+      <td style="word-wrap: break-word;">16000</td>
       <td>Integer</td>
-      <td>The max size of the queue to use when copying data.</td>
+      <td>The max size of the queue to use when copying data. Only available 
when scan.startup.mode is set to 'initial'.<br>
+          Note: The deprecated option name is copy.existing.queue.size. To be 
compatible with old versions of jobs,
+          this parameter is still available, but it is recommended to upgrade 
to the new option name.
+      </td>
+    </tr>
+    <tr>
+      <td>initial.snapshotting.max.threads</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">Processors Count</td>
+      <td>Integer</td>
+      <td>The number of threads to use when performing the data copy. Only 
available when scan.startup.mode is set to 'initial'.<br>
+          Note: The deprecated option name is copy.existing.max.threads. To be 
compatible with old versions of jobs,
+          this parameter is still available, but it is recommended to upgrade 
to the new option name.
+      </td>
+    </tr>
+    <tr>
+      <td>initial.snapshotting.pipeline</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td> An array of JSON objects describing the pipeline operations to run 
when copying existing data.<br>
+           This can improve the use of indexes by the copying manager and make 
copying more efficient.
+           eg. <code>[{"$match": {"closed": "false"}}]</code> ensures that 
+           only documents in which the closed field is set to false are 
copied.<br>
+           The initial.snapshotting.pipeline config is only available when 
scan.startup.mode is set to 'initial',
+           and is only used in Debezium mode and cannot be used in 
+           incremental snapshot mode because the semantic is inconsistent.<br>
+           Note: The deprecated option name is copy.existing.pipeline. To be 
compatible with old versions of jobs,
+           this parameter is still available, but it is recommended to upgrade 
to the new option name.
+      </td>
     </tr>
     <tr>
       <td>batch.size</td>
@@ -439,6 +468,17 @@ CREATE TABLE mongodb_source (...) WITH (
 )
 ```
 
+### Snapshot Data Filters
+
+The config option `initial.snapshotting.pipeline` describing the filters when 
copying existing data.<br>
+This can filter only required data and improve the use of indexes by the 
copying manager.
+
+In the following example, the `$match` aggregation operator ensures that only 
documents in which the closed field is set to "false" are copied.
+
+```
+'initial.snapshotting.pipeline' = '[ { "$match": { "closed": "false" } } ]'
+```
+
 ### Change Streams
 
 We integrate the [MongoDB's official Kafka 
Connector](https://docs.mongodb.com/kafka-connector/current/kafka-source/) to 
read snapshot or change events from MongoDB and drive it by Debezium's 
`EmbeddedEngine`.
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
index 6d2fb1654..3e4cf6096 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
@@ -83,9 +83,30 @@ public class MongoDBSourceOptions {
     public static final ConfigOption<Integer> INITIAL_SNAPSHOTTING_QUEUE_SIZE =
             ConfigOptions.key("initial.snapshotting.queue.size")
                     .intType()
-                    .defaultValue(10240)
+                    .noDefaultValue()
+                    .withDeprecatedKeys("copy.existing.queue.size")
+                    .withDescription(
+                            "The max size of the queue to use when copying 
data. When not set,"
+                                    + "it uses default value 16000 of mongo 
kafka connect sdk.");
+
+    public static final ConfigOption<Integer> INITIAL_SNAPSHOTTING_MAX_THREADS 
=
+            ConfigOptions.key("initial.snapshotting.max.threads")
+                    .intType()
+                    .noDefaultValue()
+                    .withDeprecatedKeys("copy.existing.max.threads")
+                    .withDescription(
+                            "The number of threads to use when performing the 
data copy."
+                                    + " Defaults to the number of 
processors.");
+
+    public static final ConfigOption<String> INITIAL_SNAPSHOTTING_PIPELINE =
+            ConfigOptions.key("initial.snapshotting.pipeline")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDeprecatedKeys("copy.existing.pipeline")
                     .withDescription(
-                            "The max size of the queue to use when copying 
data. Defaults to 10240.");
+                            "An array of JSON objects describing the pipeline 
operations "
+                                    + "to run when copying existing data. "
+                                    + "This can improve the use of indexes by 
the copying manager and make copying more efficient.");
 
     public static final ConfigOption<Integer> BATCH_SIZE =
             ConfigOptions.key("batch.size")
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
index 2ea676309..03d006264 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
@@ -72,6 +72,8 @@ public class MongoDBTableSource implements ScanTableSource, 
SupportsReadingMetad
     private final String collection;
     private final StartupOptions startupOptions;
     private final Integer initialSnapshottingQueueSize;
+    private final Integer initialSnapshottingMaxThreads;
+    private final String initialSnapshottingPipeline;
     private final Integer batchSize;
     private final Integer pollMaxBatchSize;
     private final Integer pollAwaitTimeMillis;
@@ -109,6 +111,8 @@ public class MongoDBTableSource implements ScanTableSource, 
SupportsReadingMetad
             @Nullable String connectionOptions,
             StartupOptions startupOptions,
             @Nullable Integer initialSnapshottingQueueSize,
+            @Nullable Integer initialSnapshottingMaxThreads,
+            @Nullable String initialSnapshottingPipeline,
             @Nullable Integer batchSize,
             @Nullable Integer pollMaxBatchSize,
             @Nullable Integer pollAwaitTimeMillis,
@@ -134,6 +138,8 @@ public class MongoDBTableSource implements ScanTableSource, 
SupportsReadingMetad
         this.connectionOptions = connectionOptions;
         this.startupOptions = checkNotNull(startupOptions);
         this.initialSnapshottingQueueSize = initialSnapshottingQueueSize;
+        this.initialSnapshottingMaxThreads = initialSnapshottingMaxThreads;
+        this.initialSnapshottingPipeline = initialSnapshottingPipeline;
         this.batchSize = batchSize;
         this.pollMaxBatchSize = pollMaxBatchSize;
         this.pollAwaitTimeMillis = pollAwaitTimeMillis;
@@ -243,6 +249,10 @@ public class MongoDBTableSource implements 
ScanTableSource, SupportsReadingMetad
             
Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
             Optional.ofNullable(initialSnapshottingQueueSize)
                     .ifPresent(builder::initialSnapshottingQueueSize);
+            Optional.ofNullable(initialSnapshottingMaxThreads)
+                    .ifPresent(builder::initialSnapshottingMaxThreads);
+            Optional.ofNullable(initialSnapshottingPipeline)
+                    .ifPresent(builder::initialSnapshottingPipeline);
             Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
             
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
             
Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
@@ -298,6 +308,8 @@ public class MongoDBTableSource implements ScanTableSource, 
SupportsReadingMetad
                         connectionOptions,
                         startupOptions,
                         initialSnapshottingQueueSize,
+                        initialSnapshottingMaxThreads,
+                        initialSnapshottingPipeline,
                         batchSize,
                         pollMaxBatchSize,
                         pollAwaitTimeMillis,
@@ -337,6 +349,8 @@ public class MongoDBTableSource implements ScanTableSource, 
SupportsReadingMetad
                 && Objects.equals(connectionOptions, that.connectionOptions)
                 && Objects.equals(startupOptions, that.startupOptions)
                 && Objects.equals(initialSnapshottingQueueSize, 
that.initialSnapshottingQueueSize)
+                && Objects.equals(initialSnapshottingMaxThreads, 
that.initialSnapshottingMaxThreads)
+                && Objects.equals(initialSnapshottingPipeline, 
that.initialSnapshottingPipeline)
                 && Objects.equals(batchSize, that.batchSize)
                 && Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
                 && Objects.equals(pollAwaitTimeMillis, 
that.pollAwaitTimeMillis)
@@ -369,6 +383,8 @@ public class MongoDBTableSource implements ScanTableSource, 
SupportsReadingMetad
                 connectionOptions,
                 startupOptions,
                 initialSnapshottingQueueSize,
+                initialSnapshottingMaxThreads,
+                initialSnapshottingPipeline,
                 batchSize,
                 pollMaxBatchSize,
                 pollAwaitTimeMillis,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
index 47ade2675..71770cf1e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
@@ -48,6 +48,8 @@ import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourc
 import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.FULL_DOCUMENT_PRE_POST_IMAGE;
 import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
 import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HOSTS;
+import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.INITIAL_SNAPSHOTTING_MAX_THREADS;
+import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.INITIAL_SNAPSHOTTING_PIPELINE;
 import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.INITIAL_SNAPSHOTTING_QUEUE_SIZE;
 import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
@@ -96,6 +98,10 @@ public class MongoDBTableSourceFactory implements 
DynamicTableSourceFactory {
         StartupOptions startupOptions = getStartupOptions(config);
         Integer initialSnapshottingQueueSize =
                 
config.getOptional(INITIAL_SNAPSHOTTING_QUEUE_SIZE).orElse(null);
+        Integer initialSnapshottingMaxThreads =
+                
config.getOptional(INITIAL_SNAPSHOTTING_MAX_THREADS).orElse(null);
+        String initialSnapshottingPipeline =
+                config.getOptional(INITIAL_SNAPSHOTTING_PIPELINE).orElse(null);
 
         String zoneId = 
context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
         ZoneId localTimeZone =
@@ -104,6 +110,21 @@ public class MongoDBTableSourceFactory implements 
DynamicTableSourceFactory {
                         : ZoneId.of(zoneId);
 
         boolean enableParallelRead = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+
+        // The initial.snapshotting.pipeline related config is only used in 
Debezium mode and
+        // cannot be used in incremental snapshot mode because the semantic is 
inconsistent.
+        // The reason is that in snapshot phase of incremental snapshot mode, 
the oplog
+        // will be backfilled after each snapshot to compensate for changes, 
but the pipeline
+        // operations in initial.snapshotting.pipeline are not applied to the 
backfill oplog,
+        // which means the semantic of this config is inconsistent.
+        checkArgument(
+                !(enableParallelRead
+                        && (initialSnapshottingPipeline != null
+                                || initialSnapshottingMaxThreads != null
+                                || initialSnapshottingQueueSize != null)),
+                "The initial.snapshotting.*/copy.existing.* config only 
applies to Debezium mode, "
+                        + "not incremental snapshot mode");
+
         boolean enableCloseIdleReaders = 
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         boolean scanNewlyAddedTableEnabled = 
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
@@ -136,6 +157,8 @@ public class MongoDBTableSourceFactory implements 
DynamicTableSourceFactory {
                 connectionOptions,
                 startupOptions,
                 initialSnapshottingQueueSize,
+                initialSnapshottingMaxThreads,
+                initialSnapshottingPipeline,
                 batchSize,
                 pollMaxBatchSize,
                 pollAwaitTimeMillis,
@@ -219,6 +242,8 @@ public class MongoDBTableSourceFactory implements 
DynamicTableSourceFactory {
         options.add(SCAN_STARTUP_MODE);
         options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
         options.add(INITIAL_SNAPSHOTTING_QUEUE_SIZE);
+        options.add(INITIAL_SNAPSHOTTING_MAX_THREADS);
+        options.add(INITIAL_SNAPSHOTTING_PIPELINE);
         options.add(BATCH_SIZE);
         options.add(POLL_MAX_BATCH_SIZE);
         options.add(POLL_AWAIT_TIME_MILLIS);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
index 5960fed9f..5716eeed4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
@@ -63,6 +63,7 @@ import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourc
 import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_NO_CURSOR_TIMEOUT;
 import static 
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
 import static 
org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link MongoDBTableSource} created by {@link 
MongoDBTableSourceFactory}. */
 class MongoDBTableFactoryTest {
@@ -142,6 +143,8 @@ class MongoDBTableFactoryTest {
                         null,
                         StartupOptions.initial(),
                         null,
+                        null,
+                        null,
                         BATCH_SIZE_DEFAULT,
                         POLL_MAX_BATCH_SIZE_DEFAULT,
                         POLL_AWAIT_TIME_MILLIS_DEFAULT,
@@ -167,7 +170,6 @@ class MongoDBTableFactoryTest {
         options.put("connection.options", 
"replicaSet=test&connectTimeoutMS=300000");
         options.put("scan.startup.mode", "timestamp");
         options.put("scan.startup.timestamp-millis", "1667232000000");
-        options.put("initial.snapshotting.queue.size", "100");
         options.put("batch.size", "101");
         options.put("poll.max.batch.size", "102");
         options.put("poll.await.time.ms", "103");
@@ -196,7 +198,9 @@ class MongoDBTableFactoryTest {
                         MY_TABLE,
                         "replicaSet=test&connectTimeoutMS=300000",
                         StartupOptions.timestamp(1667232000000L),
-                        100,
+                        null,
+                        null,
+                        null,
                         101,
                         102,
                         103,
@@ -239,6 +243,8 @@ class MongoDBTableFactoryTest {
                         null,
                         StartupOptions.initial(),
                         null,
+                        null,
+                        null,
                         BATCH_SIZE_DEFAULT,
                         POLL_MAX_BATCH_SIZE_DEFAULT,
                         POLL_AWAIT_TIME_MILLIS_DEFAULT,
@@ -280,6 +286,94 @@ class MongoDBTableFactoryTest {
                 .hasStackTraceContaining("Unsupported options:\n\nunknown");
     }
 
+    @Test
+    public void testCopyExistingPipelineConflictWithIncrementalSnapshotMode() {
+        // test with 'initial.snapshotting.pipeline' configuration
+        assertThatThrownBy(
+                        () -> {
+                            Map<String, String> properties = getAllOptions();
+                            
properties.put("scan.incremental.snapshot.enabled", "true");
+                            properties.put(
+                                    "initial.snapshotting.pipeline",
+                                    "[{\"$match\": {\"closed\": \"false\"}}]");
+                            createTableSource(SCHEMA, properties);
+                        })
+                .rootCause()
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "The initial.snapshotting.*/copy.existing.* config 
only applies to "
+                                + "Debezium mode, not incremental snapshot 
mode");
+
+        // test with 'initial.snapshotting.max.threads' configuration
+        assertThatThrownBy(
+                        () -> {
+                            Map<String, String> properties = getAllOptions();
+                            
properties.put("scan.incremental.snapshot.enabled", "true");
+                            properties.put("initial.snapshotting.max.threads", 
"20");
+                            createTableSource(SCHEMA, properties);
+                        })
+                .rootCause()
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "The initial.snapshotting.*/copy.existing.* config 
only applies to "
+                                + "Debezium mode, not incremental snapshot 
mode");
+
+        // test with 'initial.snapshotting.queue.size' configuration
+        assertThatThrownBy(
+                        () -> {
+                            Map<String, String> properties = getAllOptions();
+                            
properties.put("scan.incremental.snapshot.enabled", "true");
+                            properties.put("initial.snapshotting.queue.size", 
"20480");
+                            createTableSource(SCHEMA, properties);
+                        })
+                .rootCause()
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "The initial.snapshotting.*/copy.existing.* config 
only applies to "
+                                + "Debezium mode, not incremental snapshot 
mode");
+    }
+
+    @Test
+    public void testCopyExistingPipelineInDebeziumMode() {
+        Map<String, String> properties = getAllOptions();
+        properties.put("scan.incremental.snapshot.enabled", "false");
+        properties.put("initial.snapshotting.pipeline", "[{\"$match\": 
{\"closed\": \"false\"}}]");
+        properties.put("initial.snapshotting.max.threads", "20");
+        properties.put("initial.snapshotting.queue.size", "20480");
+        DynamicTableSource actualSource = createTableSource(SCHEMA, 
properties);
+
+        MongoDBTableSource expectedSource =
+                new MongoDBTableSource(
+                        SCHEMA,
+                        SCHEME.defaultValue(),
+                        MY_HOSTS,
+                        USER,
+                        PASSWORD,
+                        MY_DATABASE,
+                        MY_TABLE,
+                        null,
+                        StartupOptions.initial(),
+                        20480,
+                        20,
+                        "[{\"$match\": {\"closed\": \"false\"}}]",
+                        BATCH_SIZE_DEFAULT,
+                        POLL_MAX_BATCH_SIZE_DEFAULT,
+                        POLL_AWAIT_TIME_MILLIS_DEFAULT,
+                        HEARTBEAT_INTERVAL_MILLIS_DEFAULT,
+                        LOCAL_TIME_ZONE,
+                        false,
+                        CHUNK_META_GROUP_SIZE_DEFAULT,
+                        SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT,
+                        SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SAMPLES_DEFAULT,
+                        SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
+                        FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT,
+                        SCAN_NO_CURSOR_TIMEOUT_DEFAULT,
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP_DEFAULT,
+                        SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT,
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
+        Assertions.assertThat(actualSource).isEqualTo(expectedSource);
+    }
+
     private Map<String, String> getAllOptions() {
         Map<String, String> options = new HashMap<>();
         options.put("connector", "mongodb-cdc");

Reply via email to