This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 241eb03a7 [FLINK-35868][cdc-connector][mongodb] Bump dependency
version to support MongoDB 7.0
241eb03a7 is described below
commit 241eb03a74c521932ee5a4ba92a4ac479f9123a4
Author: yuxiqian <[email protected]>
AuthorDate: Thu Jul 25 19:23:51 2024 +0800
[FLINK-35868][cdc-connector][mongodb] Bump dependency version to support
MongoDB 7.0
This closes #3489.
---
.../docs/connectors/flink-sources/overview.md | 2 +-
.../docs/connectors/flink-sources/overview.md | 2 +-
.../flink-connector-mongodb-cdc/pom.xml | 4 +--
.../mongodb/source/MongoDBFullChangelogITCase.java | 34 ++++++++++++--------
.../source/MongoDBParallelSourceExampleTest.java | 24 +++++++++++++--
.../source/MongoDBParallelSourceITCase.java | 21 ++++++++++---
.../mongodb/source/MongoDBSourceTestBase.java | 30 ++++++++++--------
.../mongodb/source/NewlyAddedTableITCase.java | 19 ++++++++++--
.../reader/MongoDBSnapshotSplitReaderTest.java | 17 ++++++++--
.../reader/MongoDBStreamSplitReaderTest.java | 17 ++++++++--
.../mongodb/table/MongoDBConnectorITCase.java | 28 ++++++++++-------
.../mongodb/table/MongoDBRegexFilterITCase.java | 36 +++++++++++++---------
.../mongodb/table/MongoDBTimeZoneITCase.java | 32 ++++++++++---------
.../flink/cdc/connectors/tests/MongoE2eITCase.java | 20 ++++++++----
14 files changed, 197 insertions(+), 89 deletions(-)
diff --git a/docs/content.zh/docs/connectors/flink-sources/overview.md
b/docs/content.zh/docs/connectors/flink-sources/overview.md
index ee8169035..27b826317 100644
--- a/docs/content.zh/docs/connectors/flink-sources/overview.md
+++ b/docs/content.zh/docs/connectors/flink-sources/overview.md
@@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref
"docs/connectors/flink-sources/tutorials/b
| Connector |
Database
| Driver
|
|----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
-| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) |
<li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0
|
MongoDB Driver: 4.3.4 |
+| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) |
<li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0
|
MongoDB Driver: 4.11.2 |
| [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) |
<li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS
MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB
MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora
MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li>
[MariaDB](https://mariadb.org): 10.x <li> [PolarDB
X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28 |
| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) |
<li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase
EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x
|
OceanBase Driver: 2.4.x |
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) |
<li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21
| Oracle
Driver: 19.3.0.0 |
diff --git a/docs/content/docs/connectors/flink-sources/overview.md
b/docs/content/docs/connectors/flink-sources/overview.md
index 56ddd6261..962d02c1c 100644
--- a/docs/content/docs/connectors/flink-sources/overview.md
+++ b/docs/content/docs/connectors/flink-sources/overview.md
@@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref
"docs/connectors/flink-sources/tutorials/b
| Connector |
Database
| Driver
|
|----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
-| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) |
<li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0
|
MongoDB Driver: 4.3.4 |
+| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) |
<li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0
|
MongoDB Driver: 4.11.2 |
| [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) |
<li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS
MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB
MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora
MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li>
[MariaDB](https://mariadb.org): 10.x <li> [PolarDB
X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28 |
| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) |
<li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase
EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x
|
OceanBase Driver: 2.4.x |
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) |
<li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21
| Oracle
Driver: 19.3.0.0 |
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml
index 3c8efc046..2b58e6778 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml
@@ -53,7 +53,7 @@ limitations under the License.
<dependency>
<groupId>org.mongodb.kafka</groupId>
<artifactId>mongo-kafka-connect</artifactId>
- <version>1.10.1</version>
+ <version>1.13.0</version>
<exclusions>
<exclusion>
<artifactId>mongodb-driver-sync</artifactId>
@@ -69,7 +69,7 @@ limitations under the License.
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
- <version>4.9.1</version>
+ <version>4.11.2</version>
</dependency>
<!-- test dependencies on Flink -->
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
index 02d6a82d1..8d8047fa7 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
@@ -78,28 +78,36 @@ public class MongoDBFullChangelogITCase extends
MongoDBSourceTestBase {
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
+ private final String mongoVersion;
private final boolean parallelismSnapshot;
- public MongoDBFullChangelogITCase(boolean parallelismSnapshot) {
+ public MongoDBFullChangelogITCase(String mongoVersion, boolean
parallelismSnapshot) {
+ super(mongoVersion);
+ this.mongoVersion = mongoVersion;
this.parallelismSnapshot = parallelismSnapshot;
}
- @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
+ @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot:
{1}")
public static Object[] parameters() {
- return new Object[][] {new Object[] {false}, new Object[] {true}};
+ List<Object[]> parameterTuples = new ArrayList<>();
+ for (String mongoVersion : MONGO_VERSIONS) {
+ parameterTuples.add(new Object[] {mongoVersion, true});
+ parameterTuples.add(new Object[] {mongoVersion, false});
+ }
+ return parameterTuples.toArray();
}
@Test
public void testGetMongoDBVersion() {
MongoDBSourceConfig config =
new MongoDBSourceConfigFactory()
- .hosts(CONTAINER.getHostAndPort())
+ .hosts(mongoContainer.getHostAndPort())
.splitSizeMB(1)
.samplesPerChunk(10)
.pollAwaitTimeMillis(500)
.create(0);
- assertEquals(MongoUtils.getMongoVersion(config), "6.0.9");
+ assertEquals(MongoUtils.getMongoVersion(config), mongoVersion);
}
@Test
@@ -499,16 +507,16 @@ public class MongoDBFullChangelogITCase extends
MongoDBSourceTestBase {
"customer_" + Integer.toUnsignedString(new Random().nextInt(),
36);
// A - enable system-level fulldoc pre & post image feature
- CONTAINER.executeCommand(
+ mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: {
changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
// B - enable collection-level fulldoc pre & post image for change
capture collection
- CONTAINER.executeCommandInDatabase(
+ mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({ collMod:
'%s', changeStreamPreAndPostImages: { enabled: true } })",
"customers", "customers"),
customerDatabase);
- CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
+ mongoContainer.executeCommandFileInDatabase("customer",
customerDatabase);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
@@ -526,7 +534,7 @@ public class MongoDBFullChangelogITCase extends
MongoDBSourceTestBase {
TestTable customerTable = new TestTable(customerDatabase, "customers",
customersSchema);
MongoDBSource source =
new MongoDBSourceBuilder()
- .hosts(CONTAINER.getHostAndPort())
+ .hosts(mongoContainer.getHostAndPort())
.databaseList(customerDatabase)
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
@@ -613,12 +621,12 @@ public class MongoDBFullChangelogITCase extends
MongoDBSourceTestBase {
"customer_" + Integer.toUnsignedString(new Random().nextInt(),
36);
// A - enable system-level fulldoc pre & post image feature
- CONTAINER.executeCommand(
+ mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: {
changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
// B - enable collection-level fulldoc pre & post image for change
capture collection
for (String collectionName : captureCustomerCollections) {
- CONTAINER.executeCommandInDatabase(
+ mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({
collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
collectionName, collectionName),
@@ -654,14 +662,14 @@ public class MongoDBFullChangelogITCase extends
MongoDBSourceTestBase {
+ " 'scan.incremental.snapshot.backfill.skip'
= '%s'"
+ ")",
parallelismSnapshot ? "true" : "false",
- CONTAINER.getHostAndPort(),
+ mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
getCollectionNameRegex(customerDatabase,
captureCustomerCollections),
skipSnapshotBackfill);
- CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
+ mongoContainer.executeCommandFileInDatabase("customer",
customerDatabase);
// first step: check the snapshot data
String[] snapshotForSingleTable =
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java
index dd2134136..a285fe4a3 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java
@@ -24,22 +24,42 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.List;
import static
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;
/** Example Tests for {@link MongoDBSource}. */
+@RunWith(Parameterized.class)
public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase {
+ @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot:
{1}")
+ public static Object[] parameters() {
+ List<Object[]> parameterTuples = new ArrayList<>();
+ for (String mongoVersion : MONGO_VERSIONS) {
+ parameterTuples.add(new Object[] {mongoVersion, true});
+ parameterTuples.add(new Object[] {mongoVersion, false});
+ }
+ return parameterTuples.toArray();
+ }
+
+ public MongoDBParallelSourceExampleTest(String mongoVersion) {
+ super(mongoVersion);
+ }
+
@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testMongoDBExampleSource() throws Exception {
- String database =
CONTAINER.executeCommandFileInSeparateDatabase("inventory");
+ String database =
mongoContainer.executeCommandFileInSeparateDatabase("inventory");
MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
- .hosts(CONTAINER.getHostAndPort())
+ .hosts(mongoContainer.getHostAndPort())
.databaseList(database)
.collectionList(database + ".products")
.username(FLINK_USER)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java
index 11332eb74..25918fd8b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java
@@ -46,12 +46,15 @@ import org.bson.Document;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
import static
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@@ -65,11 +68,21 @@ import static
org.apache.flink.table.catalog.Column.physical;
import static org.apache.flink.util.Preconditions.checkState;
/** IT tests for {@link MongoDBSource}. */
+@RunWith(Parameterized.class)
public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
+ public MongoDBParallelSourceITCase(String mongoVersion) {
+ super(mongoVersion);
+ }
+
+ @Parameterized.Parameters(name = "mongoVersion: {0}")
+ public static Object[] parameters() {
+ return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
+ }
+
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
@Test
@@ -406,7 +419,7 @@ public class MongoDBParallelSourceITCase extends
MongoDBSourceTestBase {
private List<String> testBackfillWhenWritingEvents(
boolean skipBackFill, int fetchSize, int hookType, StartupOptions
startupOptions)
throws Exception {
- String customerDatabase =
CONTAINER.executeCommandFileInSeparateDatabase("customer");
+ String customerDatabase =
mongoContainer.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setParallelism(1);
@@ -423,7 +436,7 @@ public class MongoDBParallelSourceITCase extends
MongoDBSourceTestBase {
TestTable customerTable = new TestTable(customerDatabase, "customers",
customersSchema);
MongoDBSource source =
new MongoDBSourceBuilder()
- .hosts(CONTAINER.getHostAndPort())
+ .hosts(mongoContainer.getHostAndPort())
.databaseList(customerDatabase)
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
@@ -507,7 +520,7 @@ public class MongoDBParallelSourceITCase extends
MongoDBSourceTestBase {
boolean skipSnapshotBackfill)
throws Exception {
- String customerDatabase =
CONTAINER.executeCommandFileInSeparateDatabase("customer");
+ String customerDatabase =
mongoContainer.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -535,7 +548,7 @@ public class MongoDBParallelSourceITCase extends
MongoDBSourceTestBase {
+ " 'heartbeat.interval.ms' = '500',"
+ " 'scan.incremental.snapshot.backfill.skip'
= '%s'"
+ ")",
- CONTAINER.getHostAndPort(),
+ mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java
index 6bfcca824..49a362969 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java
@@ -26,8 +26,7 @@ import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
+import org.junit.Before;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +38,21 @@ import java.util.stream.Stream;
/** MongoDBSourceTestBase for MongoDB >= 5.0.3. */
public class MongoDBSourceTestBase {
- protected static MongoClient mongodbClient;
+ public MongoDBSourceTestBase(String mongoVersion) {
+ this.mongoContainer =
+ new MongoDBContainer("mongo:" + mongoVersion)
+ .withSharding()
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ }
+
+ public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"};
protected static final int DEFAULT_PARALLELISM = 4;
+ @Rule public final MongoDBContainer mongoContainer;
+
+ protected MongoClient mongodbClient;
+
@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
@@ -53,15 +63,15 @@ public class MongoDBSourceTestBase {
.withHaLeadershipControl()
.build());
- @BeforeClass
- public static void startContainers() {
+ @Before
+ public void startContainers() {
LOG.info("Starting containers...");
- Startables.deepStart(Stream.of(CONTAINER)).join();
+ Startables.deepStart(Stream.of(mongoContainer)).join();
MongoClientSettings settings =
MongoClientSettings.builder()
.applyConnectionString(
- new
ConnectionString(CONTAINER.getConnectionString()))
+ new
ConnectionString(mongoContainer.getConnectionString()))
.build();
mongodbClient = MongoClients.create(settings);
@@ -69,10 +79,4 @@ public class MongoDBSourceTestBase {
}
private static final Logger LOG =
LoggerFactory.getLogger(MongoDBSourceTestBase.class);
-
- @ClassRule
- public static final MongoDBContainer CONTAINER =
- new MongoDBContainer("mongo:6.0.9")
- .withSharding()
- .withLogConsumer(new Slf4jLogConsumer(LOG));
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
index 4ffc74cbf..69900b4e9 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
@@ -41,6 +41,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -55,6 +57,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static java.lang.String.format;
import static
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@@ -62,6 +65,7 @@ import static
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLI
import static org.apache.flink.util.Preconditions.checkState;
/** IT tests to cover various newly added collections during capture process.
*/
+@RunWith(Parameterized.class)
public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
@Rule public final Timeout timeoutPerTest = Timeout.seconds(500);
@@ -69,6 +73,15 @@ public class NewlyAddedTableITCase extends
MongoDBSourceTestBase {
private String customerDatabase;
protected static final int DEFAULT_PARALLELISM = 4;
+ public NewlyAddedTableITCase(String mongoVersion) {
+ super(mongoVersion);
+ }
+
+ @Parameterized.Parameters(name = "mongoVersion: {0}")
+ public static Object[] parameters() {
+ return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
+ }
+
private final ScheduledExecutorService mockChangelogExecutor =
Executors.newScheduledThreadPool(1);
@@ -79,7 +92,7 @@ public class NewlyAddedTableITCase extends
MongoDBSourceTestBase {
// prepare initial data for given collection
String collectionName = "produce_changelog";
// enable system-level fulldoc pre & post image feature
- CONTAINER.executeCommand(
+ mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: {
changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
// mock continuous changelog during the newly added collections
capturing process
@@ -846,7 +859,7 @@ public class NewlyAddedTableITCase extends
MongoDBSourceTestBase {
// make initial data for given collection.
String cityName = collectionName.split("_")[1];
// B - enable collection-level fulldoc pre & post image for change
capture collection
- CONTAINER.executeCommandInDatabase(
+ mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({
collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
collectionName, collectionName),
@@ -983,7 +996,7 @@ public class NewlyAddedTableITCase extends
MongoDBSourceTestBase {
+ " 'scan.newly-added-table.enabled' = 'true'"
+ " %s"
+ ")",
- CONTAINER.getHostAndPort(),
+ mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java
index e439c098f..b683f303d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java
@@ -42,11 +42,14 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.stream.Stream;
import static java.util.Collections.singletonList;
import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isWatermarkEvent;
@@ -57,6 +60,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** MongoDB snapshot split reader test case. */
+@RunWith(Parameterized.class)
public class MongoDBSnapshotSplitReaderTest extends MongoDBSourceTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(MongoDBSnapshotSplitReaderTest.class);
@@ -71,13 +75,22 @@ public class MongoDBSnapshotSplitReaderTest extends
MongoDBSourceTestBase {
private SplitContext splitContext;
+ public MongoDBSnapshotSplitReaderTest(String mongoVersion) {
+ super(mongoVersion);
+ }
+
+ @Parameterized.Parameters(name = "mongoVersion: {0}")
+ public static Object[] parameters() {
+ return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
+ }
+
@Before
public void before() {
- database =
CONTAINER.executeCommandFileInSeparateDatabase("chunk_test");
+ database =
mongoContainer.executeCommandFileInSeparateDatabase("chunk_test");
MongoDBSourceConfigFactory configFactory =
new MongoDBSourceConfigFactory()
- .hosts(CONTAINER.getHostAndPort())
+ .hosts(mongoContainer.getHostAndPort())
.databaseList(database)
.collectionList(database + ".shopping_cart")
.username(FLINK_USER)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java
index 1699a550c..bf781904b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java
@@ -43,12 +43,15 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.stream.Stream;
import static java.util.Collections.singletonList;
import static
org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.FULL_DOCUMENT_FIELD;
@@ -65,6 +68,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** MongoDB stream split reader test case. */
+@RunWith(Parameterized.class)
public class MongoDBStreamSplitReaderTest extends MongoDBSourceTestBase {
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
@@ -85,13 +89,22 @@ public class MongoDBStreamSplitReaderTest extends
MongoDBSourceTestBase {
private BsonDocument startupResumeToken;
+ public MongoDBStreamSplitReaderTest(String mongoVersion) {
+ super(mongoVersion);
+ }
+
+ @Parameterized.Parameters(name = "mongoVersion: {0}")
+ public static Object[] parameters() {
+ return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
+ }
+
@Before
public void before() {
- database =
CONTAINER.executeCommandFileInSeparateDatabase("chunk_test");
+ database =
mongoContainer.executeCommandFileInSeparateDatabase("chunk_test");
MongoDBSourceConfigFactory configFactory =
new MongoDBSourceConfigFactory()
- .hosts(CONTAINER.getHostAndPort())
+ .hosts(mongoContainer.getHostAndPort())
.databaseList(database)
.collectionList(database + ".shopping_cart")
.username(FLINK_USER)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java
index b15ebd1fc..19d47856e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java
@@ -67,13 +67,19 @@ public class MongoDBConnectorITCase extends
MongoDBSourceTestBase {
private final boolean parallelismSnapshot;
- public MongoDBConnectorITCase(boolean parallelismSnapshot) {
+ public MongoDBConnectorITCase(String mongoVersion, boolean
parallelismSnapshot) {
+ super(mongoVersion);
this.parallelismSnapshot = parallelismSnapshot;
}
- @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
+ @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot:
{1}")
public static Object[] parameters() {
- return new Object[][] {new Object[] {false}, new Object[] {true}};
+ return new Object[][] {
+ new Object[] {"6.0.16", true},
+ new Object[] {"6.0.16", false},
+ new Object[] {"7.0.12", true},
+ new Object[] {"7.0.12", false}
+ };
}
@Before
@@ -90,7 +96,7 @@ public class MongoDBConnectorITCase extends
MongoDBSourceTestBase {
@Test
public void testConsumingAllEvents() throws ExecutionException,
InterruptedException {
- String database =
CONTAINER.executeCommandFileInSeparateDatabase("inventory");
+ String database =
mongoContainer.executeCommandFileInSeparateDatabase("inventory");
String sourceDDL =
String.format(
@@ -111,7 +117,7 @@ public class MongoDBConnectorITCase extends
MongoDBSourceTestBase {
+ " 'scan.incremental.snapshot.enabled' =
'%s',"
+ " 'heartbeat.interval.ms' = '1000'"
+ ")",
- CONTAINER.getHostAndPort(),
+ mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
@@ -223,7 +229,7 @@ public class MongoDBConnectorITCase extends
MongoDBSourceTestBase {
@Test
public void testStartupFromTimestamp() throws Exception {
- String database =
CONTAINER.executeCommandFileInSeparateDatabase("inventory");
+ String database =
mongoContainer.executeCommandFileInSeparateDatabase("inventory");
// Unfortunately we have to sleep here to differ initial and
later-generating changes in
// oplog by timestamp
@@ -252,7 +258,7 @@ public class MongoDBConnectorITCase extends
MongoDBSourceTestBase {
+ "',"
+ " 'heartbeat.interval.ms' = '1000'"
+ ")",
- CONTAINER.getHostAndPort(),
+ mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
@@ -302,7 +308,7 @@ public class MongoDBConnectorITCase extends
MongoDBSourceTestBase {
@Test
public void testAllTypes() throws Throwable {
- String database =
CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
+ String database =
mongoContainer.executeCommandFileInSeparateDatabase("column_type_test");
String sourceDDL =
String.format(
@@ -345,7 +351,7 @@ public class MongoDBConnectorITCase extends
MongoDBSourceTestBase {
+ " 'database' = '%s',"
+ " 'collection' = '%s'"
+ ")",
- CONTAINER.getHostAndPort(),
+ mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
@@ -465,7 +471,7 @@ public class MongoDBConnectorITCase extends
MongoDBSourceTestBase {
@Test
public void testMetadataColumns() throws Exception {
- String database =
CONTAINER.executeCommandFileInSeparateDatabase("inventory");
+ String database =
mongoContainer.executeCommandFileInSeparateDatabase("inventory");
String sourceDDL =
String.format(
@@ -487,7 +493,7 @@ public class MongoDBConnectorITCase extends
MongoDBSourceTestBase {
+ " 'collection' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s'"
+ ")",
- CONTAINER.getHostAndPort(),
+ mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
index 317dc7468..3352fe1ef 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
@@ -52,13 +52,19 @@ public class MongoDBRegexFilterITCase extends
MongoDBSourceTestBase {
private final boolean parallelismSnapshot;
- public MongoDBRegexFilterITCase(boolean parallelismSnapshot) {
+ public MongoDBRegexFilterITCase(String mongoVersion, boolean
parallelismSnapshot) {
+ super(mongoVersion);
this.parallelismSnapshot = parallelismSnapshot;
}
- @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
+ @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot:
{1}")
public static Object[] parameters() {
- return new Object[][] {new Object[] {false}, new Object[] {true}};
+ return new Object[][] {
+ new Object[] {"6.0.16", true},
+ new Object[] {"6.0.16", false},
+ new Object[] {"7.0.12", true},
+ new Object[] {"7.0.12", false}
+ };
}
@Before
@@ -77,9 +83,9 @@ public class MongoDBRegexFilterITCase extends
MongoDBSourceTestBase {
public void testMatchMultipleDatabasesAndCollections() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
- String db0 =
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+ String db0 =
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
- String db1 =
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+ String db1 =
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match: collection = ^(db0|db1)\.coll_a\d?$
String collectionRegex = String.format("^(%s|%s)\\.coll_a\\d?$", db0,
db1);
@@ -120,11 +126,11 @@ public class MongoDBRegexFilterITCase extends
MongoDBSourceTestBase {
public void testMatchMultipleDatabases() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
- String db0 =
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+ String db0 =
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
- String db1 =
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+ String db1 =
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
// db2: [coll_a1, coll_a2, coll_b1, coll_b2]
- String db2 =
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+ String db2 =
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match database: ^(db0|db1)$
String databaseRegex = String.format("%s|%s", db0, db1);
@@ -174,9 +180,9 @@ public class MongoDBRegexFilterITCase extends
MongoDBSourceTestBase {
public void testMatchSingleQualifiedCollectionPattern() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
- String db0 =
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+ String db0 =
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
- String db1 =
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+ String db1 =
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match: collection ^(db0|db1)\.coll_a\d?$
String collectionRegex = String.format("^%s\\.coll_b\\d?$", db0);
@@ -213,9 +219,9 @@ public class MongoDBRegexFilterITCase extends
MongoDBSourceTestBase {
public void testMatchSingleDatabaseWithCollectionPattern() throws
Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
- String db0 =
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+ String db0 =
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
- String db1 =
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+ String db1 =
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match: collection .*coll_b\d?
String collectionRegex = ".*coll_b\\d?";
@@ -251,7 +257,7 @@ public class MongoDBRegexFilterITCase extends
MongoDBSourceTestBase {
public void testMatchDatabaseAndCollectionContainsDash() throws Exception {
// 1. Given collections:
// db0: [coll-a1, coll-a2, coll-b1, coll-b2]
- String db0 =
CONTAINER.executeCommandFileInSeparateDatabase("ns-regex");
+ String db0 =
mongoContainer.executeCommandFileInSeparateDatabase("ns-regex");
TableResult result = submitTestCase(db0, "coll-a1");
@@ -271,7 +277,7 @@ public class MongoDBRegexFilterITCase extends
MongoDBSourceTestBase {
public void testMatchCollectionWithDots() throws Exception {
// 1. Given colllections:
// db: [coll.name]
- String db =
CONTAINER.executeCommandFileInSeparateDatabase("ns-dotted");
+ String db =
mongoContainer.executeCommandFileInSeparateDatabase("ns-dotted");
TableResult result = submitTestCase(db, db + "[.]coll[.]name");
@@ -301,7 +307,7 @@ public class MongoDBRegexFilterITCase extends
MongoDBSourceTestBase {
+ " coll_name STRING METADATA FROM 'collection_name'
VIRTUAL,"
+ " PRIMARY KEY (_id) NOT ENFORCED"
+ ") WITH ("
- + ignoreIfNull("hosts", CONTAINER.getHostAndPort())
+ + ignoreIfNull("hosts",
mongoContainer.getHostAndPort())
+ ignoreIfNull("username", FLINK_USER)
+ ignoreIfNull("password", FLINK_USER_PASSWORD)
+ ignoreIfNull("database", database)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java
index c3e7112c2..f9edc73fa 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java
@@ -55,21 +55,25 @@ public class MongoDBTimeZoneITCase extends
MongoDBSourceTestBase {
private final boolean parallelismSnapshot;
- public MongoDBTimeZoneITCase(String localTimeZone, boolean
parallelismSnapshot) {
+ public MongoDBTimeZoneITCase(
+ String mongoVersion, String localTimeZone, boolean
parallelismSnapshot) {
+ super(mongoVersion);
this.localTimeZone = localTimeZone;
this.parallelismSnapshot = parallelismSnapshot;
}
- @Parameterized.Parameters(name = "localTimeZone: {0}, parallelismSnapshot:
{1}")
+ @Parameterized.Parameters(
+ name = "mongoVersion: {0}, localTimeZone: {1},
parallelismSnapshot: {2}")
public static Object[] parameters() {
- return new Object[][] {
- new Object[] {"Asia/Shanghai", false},
- new Object[] {"Europe/Berlin", false},
- new Object[] {"UTC", false},
- new Object[] {"Asia/Shanghai", true},
- new Object[] {"Europe/Berlin", true},
- new Object[] {"UTC", true}
- };
+ List<Object[]> parameterTuples = new ArrayList<>();
+ for (String mongoVersion : MONGO_VERSIONS) {
+ for (String timezone : new String[] {"Asia/Shanghai",
"Europe/Berlin", "UTC"}) {
+ for (boolean parallelismSnapshot : new boolean[] {true,
false}) {
+ parameterTuples.add(new Object[] {mongoVersion, timezone,
parallelismSnapshot});
+ }
+ }
+ }
+ return parameterTuples.toArray();
}
@Before
@@ -87,7 +91,7 @@ public class MongoDBTimeZoneITCase extends
MongoDBSourceTestBase {
public void testTemporalTypesWithTimeZone() throws Exception {
tEnv.getConfig().setLocalTimeZone(ZoneId.of(localTimeZone));
- String database =
CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
+ String database =
mongoContainer.executeCommandFileInSeparateDatabase("column_type_test");
String sourceDDL =
String.format(
@@ -108,7 +112,7 @@ public class MongoDBTimeZoneITCase extends
MongoDBSourceTestBase {
+ " 'database' = '%s',"
+ " 'collection' = '%s'"
+ ")",
- CONTAINER.getHostAndPort(),
+ mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
@@ -160,7 +164,7 @@ public class MongoDBTimeZoneITCase extends
MongoDBSourceTestBase {
public void testDateAndTimestampToStringWithTimeZone() throws Exception {
tEnv.getConfig().setLocalTimeZone(ZoneId.of(localTimeZone));
- String database =
CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
+ String database =
mongoContainer.executeCommandFileInSeparateDatabase("column_type_test");
String sourceDDL =
String.format(
@@ -177,7 +181,7 @@ public class MongoDBTimeZoneITCase extends
MongoDBSourceTestBase {
+ " 'database' = '%s',"
+ " 'collection' = '%s'"
+ ")",
- CONTAINER.getHostAndPort(),
+ mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
diff --git
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
index f3db2624d..e9cf648f9 100644
---
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
@@ -66,22 +66,30 @@ public class MongoE2eITCase extends
FlinkContainerTestEnvironment {
private MongoClient mongoClient;
+ public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"};
+
@Parameterized.Parameter(1)
- public boolean parallelismSnapshot;
+ public String mongoVersion;
@Parameterized.Parameter(2)
+ public boolean parallelismSnapshot;
+
+ @Parameterized.Parameter(3)
public boolean scanFullChangelog;
@Parameterized.Parameters(
- name = "flinkVersion: {0}, parallelismSnapshot: {1},
scanFullChangelog: {2}")
+ name =
+ "flinkVersion: {0}, mongoVersion: {1},
parallelismSnapshot: {2}, scanFullChangelog: {3}")
public static List<Object[]> parameters() {
final List<String> flinkVersions = getFlinkVersion();
List<Object[]> params = new ArrayList<>();
for (String flinkVersion : flinkVersions) {
- params.add(new Object[] {flinkVersion, true, true});
- params.add(new Object[] {flinkVersion, true, false});
- params.add(new Object[] {flinkVersion, false, true});
- params.add(new Object[] {flinkVersion, false, false});
+ for (String mongoVersion : MONGO_VERSIONS) {
+ params.add(new Object[] {flinkVersion, mongoVersion, true,
true});
+ params.add(new Object[] {flinkVersion, mongoVersion, true,
false});
+ params.add(new Object[] {flinkVersion, mongoVersion, false,
true});
+ params.add(new Object[] {flinkVersion, mongoVersion, false,
false});
+ }
}
return params;
}