This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 199344458 [FLINK-34874][source-connector/mongodb] Support
initial.snapshotting.pipeline related configs in Table API
199344458 is described below
commit 19934445800cc8931ade34b5741cb69f8bada3a0
Author: Runkang He <[email protected]>
AuthorDate: Tue Apr 22 23:11:57 2025 +0800
[FLINK-34874][source-connector/mongodb] Support
initial.snapshotting.pipeline related configs in Table API
This closes #3707
---
.../docs/connectors/flink-sources/mongodb-cdc.md | 40 +++++++++
.../docs/connectors/flink-sources/mongodb-cdc.md | 46 +++++++++-
.../source/config/MongoDBSourceOptions.java | 25 +++++-
.../mongodb/table/MongoDBTableSource.java | 16 ++++
.../mongodb/table/MongoDBTableSourceFactory.java | 25 ++++++
.../mongodb/table/MongoDBTableFactoryTest.java | 98 +++++++++++++++++++++-
6 files changed, 243 insertions(+), 7 deletions(-)
diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
index f52b3be98..0e46ec2a8 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
@@ -247,6 +247,35 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
<td>Long</td>
<td>起始毫秒数, 仅适用于 <code>'timestamp'</code> 启动模式.</td>
</tr>
+ <tr>
+ <td>initial.snapshotting.queue.size</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">16000</td>
+ <td>Integer</td>
+ <td>进行初始快照时的队列大小。仅在 scan.startup.mode 选项设置为 initial 时生效。<br>
+ 注意:已弃用的选项名是 copy.existing.queue.size,为了兼容旧版本的作业,该选项名仍可用,但是推荐升级到新选项名
+ </td>
+ </tr>
+ <tr>
+ <td>initial.snapshotting.max.threads</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">Processors Count</td>
+ <td>Integer</td>
+ <td>执行数据复制时使用的线程数。仅在 scan.startup.mode 选项设置为 initial 时生效。<br>
+ 注意:已弃用的选项名是 copy.existing.max.threads,为了兼容旧版本的作业,该选项名仍可用,但是推荐升级到新选项名
+ </td>
+ </tr>
+ <tr>
+ <td>initial.snapshotting.pipeline</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>MongoDB 管道操作的 JSON 对象数组,在快照读取阶段,会把该操作下推到 MongoDB,只筛选所需的数据,从而提高读取效率,
+ 比如管道操作 [{"$match": {"closed": "false"}}] 表示只复制 closed 字段为 "false"
的文档。<br>
+ 该选项仅在 scan.startup.mode 选项设置为 initial 时生效,且仅限于在 Debezium
模式下使用,不能用于增量快照模式,因为会出现语义不一致的问题。<br>
+ 注意:已弃用的选项名是 copy.existing.pipeline,为了兼容旧版本的作业,该选项名仍可用,但是推荐升级到新选项名
+ </td>
+ </tr>
<tr>
<td>batch.size</td>
<td>optional</td>
@@ -416,6 +445,17 @@ CREATE TABLE mongodb_source (...) WITH (
**Notes:**
- 'timestamp' 指定时间戳启动模式,需要开启增量快照读。
+### 快照数据筛选器
+
+配置选项 `initial.snapshotting.pipeline` 描述复制现有数据时的筛选器。<br>
+在快照读取阶段,会把该筛选器下推到 MongoDB,只筛选所需的数据,从而提高读取效率。
+
+在下面的示例中,`$match` 聚合运算符确保只复制 closed 字段设置为 "false" 的文档。
+
+```
+'initial.snapshotting.pipeline' = '[ { "$match": { "closed": "false" } } ]'
+```
+
### 更改流
我们将 [MongoDB's official Kafka
Connector](https://docs.mongodb.com/kafka-connector/current/kafka-source/) 从
MongoDB 中读取快照或更改事件,并通过 Debezium 的 `EmbeddedEngine` 进行驱动。
diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
index c4fb00af7..944c038b0 100644
--- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
@@ -241,11 +241,40 @@ Connector Options
<td>Timestamp in millis of the start point, only used for
<code>'timestamp'</code> startup mode.</td>
</tr>
<tr>
- <td>copy.existing.queue.size</td>
+ <td>initial.snapshotting.queue.size</td>
<td>optional</td>
- <td style="word-wrap: break-word;">10240</td>
+ <td style="word-wrap: break-word;">16000</td>
<td>Integer</td>
- <td>The max size of the queue to use when copying data.</td>
+ <td>The max size of the queue to use when copying data. Only available
when scan.startup.mode is set to 'initial'.<br>
+ Note: The deprecated option name is copy.existing.queue.size. To be
compatible with old versions of jobs,
+ this parameter is still available, but it is recommended to upgrade
to the new option name.
+ </td>
+ </tr>
+ <tr>
+ <td>initial.snapshotting.max.threads</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">Processors Count</td>
+ <td>Integer</td>
+ <td>The number of threads to use when performing the data copy. Only
available when scan.startup.mode is set to 'initial'.<br>
+ Note: The deprecated option name is copy.existing.max.threads. To be
compatible with old versions of jobs,
+ this parameter is still available, but it is recommended to upgrade
to the new option name.
+ </td>
+ </tr>
+ <tr>
+ <td>initial.snapshotting.pipeline</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td> An array of JSON objects describing the pipeline operations to run
when copying existing data.<br>
+ This can improve the use of indexes by the copying manager and make
copying more efficient.
+ eg. <code>[{"$match": {"closed": "false"}}]</code> ensures that
+ only documents in which the closed field is set to false are
copied.<br>
+ The initial.snapshotting.pipeline config is only available when
scan.startup.mode is set to 'initial',
+ and is only used in Debezium mode and cannot be used in
+ incremental snapshot mode because the semantic is inconsistent.<br>
+ Note: The deprecated option name is copy.existing.pipeline. To be
compatible with old versions of jobs,
+ this parameter is still available, but it is recommended to upgrade
to the new option name.
+ </td>
</tr>
<tr>
<td>batch.size</td>
@@ -439,6 +468,17 @@ CREATE TABLE mongodb_source (...) WITH (
)
```
+### Snapshot Data Filters
+
+The config option `initial.snapshotting.pipeline` describing the filters when
copying existing data.<br>
+This can filter only required data and improve the use of indexes by the
copying manager.
+
+In the following example, the `$match` aggregation operator ensures that only
documents in which the closed field is set to "false" are copied.
+
+```
+'initial.snapshotting.pipeline' = '[ { "$match": { "closed": "false" } } ]'
+```
+
### Change Streams
We integrate the [MongoDB's official Kafka
Connector](https://docs.mongodb.com/kafka-connector/current/kafka-source/) to
read snapshot or change events from MongoDB and drive it by Debezium's
`EmbeddedEngine`.
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
index 6d2fb1654..3e4cf6096 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
@@ -83,9 +83,30 @@ public class MongoDBSourceOptions {
public static final ConfigOption<Integer> INITIAL_SNAPSHOTTING_QUEUE_SIZE =
ConfigOptions.key("initial.snapshotting.queue.size")
.intType()
- .defaultValue(10240)
+ .noDefaultValue()
+ .withDeprecatedKeys("copy.existing.queue.size")
+ .withDescription(
+ "The max size of the queue to use when copying
data. When not set,"
+ + "it uses default value 16000 of mongo
kafka connect sdk.");
+
+ public static final ConfigOption<Integer> INITIAL_SNAPSHOTTING_MAX_THREADS
=
+ ConfigOptions.key("initial.snapshotting.max.threads")
+ .intType()
+ .noDefaultValue()
+ .withDeprecatedKeys("copy.existing.max.threads")
+ .withDescription(
+ "The number of threads to use when performing the
data copy."
+ + " Defaults to the number of
processors.");
+
+ public static final ConfigOption<String> INITIAL_SNAPSHOTTING_PIPELINE =
+ ConfigOptions.key("initial.snapshotting.pipeline")
+ .stringType()
+ .noDefaultValue()
+ .withDeprecatedKeys("copy.existing.pipeline")
.withDescription(
- "The max size of the queue to use when copying
data. Defaults to 10240.");
+ "An array of JSON objects describing the pipeline
operations "
+ + "to run when copying existing data. "
+ + "This can improve the use of indexes by
the copying manager and make copying more efficient.");
public static final ConfigOption<Integer> BATCH_SIZE =
ConfigOptions.key("batch.size")
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
index 2ea676309..03d006264 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java
@@ -72,6 +72,8 @@ public class MongoDBTableSource implements ScanTableSource,
SupportsReadingMetad
private final String collection;
private final StartupOptions startupOptions;
private final Integer initialSnapshottingQueueSize;
+ private final Integer initialSnapshottingMaxThreads;
+ private final String initialSnapshottingPipeline;
private final Integer batchSize;
private final Integer pollMaxBatchSize;
private final Integer pollAwaitTimeMillis;
@@ -109,6 +111,8 @@ public class MongoDBTableSource implements ScanTableSource,
SupportsReadingMetad
@Nullable String connectionOptions,
StartupOptions startupOptions,
@Nullable Integer initialSnapshottingQueueSize,
+ @Nullable Integer initialSnapshottingMaxThreads,
+ @Nullable String initialSnapshottingPipeline,
@Nullable Integer batchSize,
@Nullable Integer pollMaxBatchSize,
@Nullable Integer pollAwaitTimeMillis,
@@ -134,6 +138,8 @@ public class MongoDBTableSource implements ScanTableSource,
SupportsReadingMetad
this.connectionOptions = connectionOptions;
this.startupOptions = checkNotNull(startupOptions);
this.initialSnapshottingQueueSize = initialSnapshottingQueueSize;
+ this.initialSnapshottingMaxThreads = initialSnapshottingMaxThreads;
+ this.initialSnapshottingPipeline = initialSnapshottingPipeline;
this.batchSize = batchSize;
this.pollMaxBatchSize = pollMaxBatchSize;
this.pollAwaitTimeMillis = pollAwaitTimeMillis;
@@ -243,6 +249,10 @@ public class MongoDBTableSource implements
ScanTableSource, SupportsReadingMetad
Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
Optional.ofNullable(initialSnapshottingQueueSize)
.ifPresent(builder::initialSnapshottingQueueSize);
+ Optional.ofNullable(initialSnapshottingMaxThreads)
+ .ifPresent(builder::initialSnapshottingMaxThreads);
+ Optional.ofNullable(initialSnapshottingPipeline)
+ .ifPresent(builder::initialSnapshottingPipeline);
Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
@@ -298,6 +308,8 @@ public class MongoDBTableSource implements ScanTableSource,
SupportsReadingMetad
connectionOptions,
startupOptions,
initialSnapshottingQueueSize,
+ initialSnapshottingMaxThreads,
+ initialSnapshottingPipeline,
batchSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
@@ -337,6 +349,8 @@ public class MongoDBTableSource implements ScanTableSource,
SupportsReadingMetad
&& Objects.equals(connectionOptions, that.connectionOptions)
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(initialSnapshottingQueueSize,
that.initialSnapshottingQueueSize)
+ && Objects.equals(initialSnapshottingMaxThreads,
that.initialSnapshottingMaxThreads)
+ && Objects.equals(initialSnapshottingPipeline,
that.initialSnapshottingPipeline)
&& Objects.equals(batchSize, that.batchSize)
&& Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
&& Objects.equals(pollAwaitTimeMillis,
that.pollAwaitTimeMillis)
@@ -369,6 +383,8 @@ public class MongoDBTableSource implements ScanTableSource,
SupportsReadingMetad
connectionOptions,
startupOptions,
initialSnapshottingQueueSize,
+ initialSnapshottingMaxThreads,
+ initialSnapshottingPipeline,
batchSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
index 47ade2675..71770cf1e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
@@ -48,6 +48,8 @@ import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourc
import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.FULL_DOCUMENT_PRE_POST_IMAGE;
import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HOSTS;
+import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.INITIAL_SNAPSHOTTING_MAX_THREADS;
+import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.INITIAL_SNAPSHOTTING_PIPELINE;
import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.INITIAL_SNAPSHOTTING_QUEUE_SIZE;
import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.PASSWORD;
import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
@@ -96,6 +98,10 @@ public class MongoDBTableSourceFactory implements
DynamicTableSourceFactory {
StartupOptions startupOptions = getStartupOptions(config);
Integer initialSnapshottingQueueSize =
config.getOptional(INITIAL_SNAPSHOTTING_QUEUE_SIZE).orElse(null);
+ Integer initialSnapshottingMaxThreads =
+
config.getOptional(INITIAL_SNAPSHOTTING_MAX_THREADS).orElse(null);
+ String initialSnapshottingPipeline =
+ config.getOptional(INITIAL_SNAPSHOTTING_PIPELINE).orElse(null);
String zoneId =
context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
ZoneId localTimeZone =
@@ -104,6 +110,21 @@ public class MongoDBTableSourceFactory implements
DynamicTableSourceFactory {
: ZoneId.of(zoneId);
boolean enableParallelRead =
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+
+ // The initial.snapshotting.pipeline related config is only used in
Debezium mode and
+ // cannot be used in incremental snapshot mode because the semantic is
inconsistent.
+ // The reason is that in snapshot phase of incremental snapshot mode,
the oplog
+ // will be backfilled after each snapshot to compensate for changes,
but the pipeline
+ // operations in initial.snapshotting.pipeline are not applied to the
backfill oplog,
+ // which means the semantic of this config is inconsistent.
+ checkArgument(
+ !(enableParallelRead
+ && (initialSnapshottingPipeline != null
+ || initialSnapshottingMaxThreads != null
+ || initialSnapshottingQueueSize != null)),
+ "The initial.snapshotting.*/copy.existing.* config only
applies to Debezium mode, "
+ + "not incremental snapshot mode");
+
boolean enableCloseIdleReaders =
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill =
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean scanNewlyAddedTableEnabled =
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
@@ -136,6 +157,8 @@ public class MongoDBTableSourceFactory implements
DynamicTableSourceFactory {
connectionOptions,
startupOptions,
initialSnapshottingQueueSize,
+ initialSnapshottingMaxThreads,
+ initialSnapshottingPipeline,
batchSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
@@ -219,6 +242,8 @@ public class MongoDBTableSourceFactory implements
DynamicTableSourceFactory {
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(INITIAL_SNAPSHOTTING_QUEUE_SIZE);
+ options.add(INITIAL_SNAPSHOTTING_MAX_THREADS);
+ options.add(INITIAL_SNAPSHOTTING_PIPELINE);
options.add(BATCH_SIZE);
options.add(POLL_MAX_BATCH_SIZE);
options.add(POLL_AWAIT_TIME_MILLIS);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
index 5960fed9f..5716eeed4 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
@@ -63,6 +63,7 @@ import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourc
import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_NO_CURSOR_TIMEOUT;
import static
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
import static
org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link MongoDBTableSource} created by {@link
MongoDBTableSourceFactory}. */
class MongoDBTableFactoryTest {
@@ -142,6 +143,8 @@ class MongoDBTableFactoryTest {
null,
StartupOptions.initial(),
null,
+ null,
+ null,
BATCH_SIZE_DEFAULT,
POLL_MAX_BATCH_SIZE_DEFAULT,
POLL_AWAIT_TIME_MILLIS_DEFAULT,
@@ -167,7 +170,6 @@ class MongoDBTableFactoryTest {
options.put("connection.options",
"replicaSet=test&connectTimeoutMS=300000");
options.put("scan.startup.mode", "timestamp");
options.put("scan.startup.timestamp-millis", "1667232000000");
- options.put("initial.snapshotting.queue.size", "100");
options.put("batch.size", "101");
options.put("poll.max.batch.size", "102");
options.put("poll.await.time.ms", "103");
@@ -196,7 +198,9 @@ class MongoDBTableFactoryTest {
MY_TABLE,
"replicaSet=test&connectTimeoutMS=300000",
StartupOptions.timestamp(1667232000000L),
- 100,
+ null,
+ null,
+ null,
101,
102,
103,
@@ -239,6 +243,8 @@ class MongoDBTableFactoryTest {
null,
StartupOptions.initial(),
null,
+ null,
+ null,
BATCH_SIZE_DEFAULT,
POLL_MAX_BATCH_SIZE_DEFAULT,
POLL_AWAIT_TIME_MILLIS_DEFAULT,
@@ -280,6 +286,94 @@ class MongoDBTableFactoryTest {
.hasStackTraceContaining("Unsupported options:\n\nunknown");
}
+ @Test
+ public void testCopyExistingPipelineConflictWithIncrementalSnapshotMode() {
+ // test with 'initial.snapshotting.pipeline' configuration
+ assertThatThrownBy(
+ () -> {
+ Map<String, String> properties = getAllOptions();
+
properties.put("scan.incremental.snapshot.enabled", "true");
+ properties.put(
+ "initial.snapshotting.pipeline",
+ "[{\"$match\": {\"closed\": \"false\"}}]");
+ createTableSource(SCHEMA, properties);
+ })
+ .rootCause()
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "The initial.snapshotting.*/copy.existing.* config
only applies to "
+ + "Debezium mode, not incremental snapshot
mode");
+
+ // test with 'initial.snapshotting.max.threads' configuration
+ assertThatThrownBy(
+ () -> {
+ Map<String, String> properties = getAllOptions();
+
properties.put("scan.incremental.snapshot.enabled", "true");
+ properties.put("initial.snapshotting.max.threads",
"20");
+ createTableSource(SCHEMA, properties);
+ })
+ .rootCause()
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "The initial.snapshotting.*/copy.existing.* config
only applies to "
+ + "Debezium mode, not incremental snapshot
mode");
+
+ // test with 'initial.snapshotting.queue.size' configuration
+ assertThatThrownBy(
+ () -> {
+ Map<String, String> properties = getAllOptions();
+
properties.put("scan.incremental.snapshot.enabled", "true");
+ properties.put("initial.snapshotting.queue.size",
"20480");
+ createTableSource(SCHEMA, properties);
+ })
+ .rootCause()
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "The initial.snapshotting.*/copy.existing.* config
only applies to "
+ + "Debezium mode, not incremental snapshot
mode");
+ }
+
+ @Test
+ public void testCopyExistingPipelineInDebeziumMode() {
+ Map<String, String> properties = getAllOptions();
+ properties.put("scan.incremental.snapshot.enabled", "false");
+ properties.put("initial.snapshotting.pipeline", "[{\"$match\":
{\"closed\": \"false\"}}]");
+ properties.put("initial.snapshotting.max.threads", "20");
+ properties.put("initial.snapshotting.queue.size", "20480");
+ DynamicTableSource actualSource = createTableSource(SCHEMA,
properties);
+
+ MongoDBTableSource expectedSource =
+ new MongoDBTableSource(
+ SCHEMA,
+ SCHEME.defaultValue(),
+ MY_HOSTS,
+ USER,
+ PASSWORD,
+ MY_DATABASE,
+ MY_TABLE,
+ null,
+ StartupOptions.initial(),
+ 20480,
+ 20,
+ "[{\"$match\": {\"closed\": \"false\"}}]",
+ BATCH_SIZE_DEFAULT,
+ POLL_MAX_BATCH_SIZE_DEFAULT,
+ POLL_AWAIT_TIME_MILLIS_DEFAULT,
+ HEARTBEAT_INTERVAL_MILLIS_DEFAULT,
+ LOCAL_TIME_ZONE,
+ false,
+ CHUNK_META_GROUP_SIZE_DEFAULT,
+ SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT,
+ SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SAMPLES_DEFAULT,
+ SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
+ FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT,
+ SCAN_NO_CURSOR_TIMEOUT_DEFAULT,
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP_DEFAULT,
+ SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT,
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
+ Assertions.assertThat(actualSource).isEqualTo(expectedSource);
+ }
+
private Map<String, String> getAllOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "mongodb-cdc");