This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 241eb03a7 [FLINK-35868][cdc-connector][mongodb] Bump dependency 
version to support MongoDB 7.0
241eb03a7 is described below

commit 241eb03a74c521932ee5a4ba92a4ac479f9123a4
Author: yuxiqian <[email protected]>
AuthorDate: Thu Jul 25 19:23:51 2024 +0800

    [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support 
MongoDB 7.0
    
    This closes #3489.
---
 .../docs/connectors/flink-sources/overview.md      |  2 +-
 .../docs/connectors/flink-sources/overview.md      |  2 +-
 .../flink-connector-mongodb-cdc/pom.xml            |  4 +--
 .../mongodb/source/MongoDBFullChangelogITCase.java | 34 ++++++++++++--------
 .../source/MongoDBParallelSourceExampleTest.java   | 24 +++++++++++++--
 .../source/MongoDBParallelSourceITCase.java        | 21 ++++++++++---
 .../mongodb/source/MongoDBSourceTestBase.java      | 30 ++++++++++--------
 .../mongodb/source/NewlyAddedTableITCase.java      | 19 ++++++++++--
 .../reader/MongoDBSnapshotSplitReaderTest.java     | 17 ++++++++--
 .../reader/MongoDBStreamSplitReaderTest.java       | 17 ++++++++--
 .../mongodb/table/MongoDBConnectorITCase.java      | 28 ++++++++++-------
 .../mongodb/table/MongoDBRegexFilterITCase.java    | 36 +++++++++++++---------
 .../mongodb/table/MongoDBTimeZoneITCase.java       | 32 ++++++++++---------
 .../flink/cdc/connectors/tests/MongoE2eITCase.java | 20 ++++++++----
 14 files changed, 197 insertions(+), 89 deletions(-)

diff --git a/docs/content.zh/docs/connectors/flink-sources/overview.md 
b/docs/content.zh/docs/connectors/flink-sources/overview.md
index ee8169035..27b826317 100644
--- a/docs/content.zh/docs/connectors/flink-sources/overview.md
+++ b/docs/content.zh/docs/connectors/flink-sources/overview.md
@@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref 
"docs/connectors/flink-sources/tutorials/b
 
 | Connector                                                                  | 
Database                                                                        
                                                                                
                                                                                
                                                                                
                                                                       | Driver 
                   |
 
|----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
-| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}})     | 
<li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0                          
                                                                                
                                                                                
                                                                                
                                                                       | 
MongoDB Driver: 4.3.4     |
+| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}})     | 
<li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0           
                                                                                
                                                                                
                                                                                
                                                                       | 
MongoDB Driver: 4.11.2    |
 | [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}})         | 
<li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS 
MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB 
MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora 
MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> 
[MariaDB](https://mariadb.org): 10.x <li> [PolarDB 
X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28       |
 | [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | 
<li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase 
EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x                 
                                                                                
                                                                                
                                                                           | 
OceanBase Driver: 2.4.x   |
 | [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}})       | 
<li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21                
                                                                                
                                                                                
                                                                                
                                                                       | Oracle 
Driver: 19.3.0.0   |
diff --git a/docs/content/docs/connectors/flink-sources/overview.md 
b/docs/content/docs/connectors/flink-sources/overview.md
index 56ddd6261..962d02c1c 100644
--- a/docs/content/docs/connectors/flink-sources/overview.md
+++ b/docs/content/docs/connectors/flink-sources/overview.md
@@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref 
"docs/connectors/flink-sources/tutorials/b
 
 | Connector                                                                  | 
Database                                                                        
                                                                                
                                                                                
                                                                                
                                                                       | Driver 
                   |
 
|----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
-| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}})     | 
<li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0                          
                                                                                
                                                                                
                                                                                
                                                                       | 
MongoDB Driver: 4.3.4     |
+| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}})     | 
<li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0           
                                                                                
                                                                                
                                                                                
                                                                       | 
MongoDB Driver: 4.11.2    |
 | [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}})         | 
<li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS 
MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB 
MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora 
MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> 
[MariaDB](https://mariadb.org): 10.x <li> [PolarDB 
X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28       |
 | [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | 
<li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase 
EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x                 
                                                                                
                                                                                
                                                                           | 
OceanBase Driver: 2.4.x   |
 | [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}})       | 
<li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21                
                                                                                
                                                                                
                                                                                
                                                                       | Oracle 
Driver: 19.3.0.0   |
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml
index 3c8efc046..2b58e6778 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml
@@ -53,7 +53,7 @@ limitations under the License.
         <dependency>
             <groupId>org.mongodb.kafka</groupId>
             <artifactId>mongo-kafka-connect</artifactId>
-            <version>1.10.1</version>
+            <version>1.13.0</version>
             <exclusions>
                 <exclusion>
                     <artifactId>mongodb-driver-sync</artifactId>
@@ -69,7 +69,7 @@ limitations under the License.
         <dependency>
             <groupId>org.mongodb</groupId>
             <artifactId>mongodb-driver-sync</artifactId>
-            <version>4.9.1</version>
+            <version>4.11.2</version>
         </dependency>
 
         <!-- test dependencies on Flink -->
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
index 02d6a82d1..8d8047fa7 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
@@ -78,28 +78,36 @@ public class MongoDBFullChangelogITCase extends 
MongoDBSourceTestBase {
 
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
 
+    private final String mongoVersion;
     private final boolean parallelismSnapshot;
 
-    public MongoDBFullChangelogITCase(boolean parallelismSnapshot) {
+    public MongoDBFullChangelogITCase(String mongoVersion, boolean 
parallelismSnapshot) {
+        super(mongoVersion);
+        this.mongoVersion = mongoVersion;
         this.parallelismSnapshot = parallelismSnapshot;
     }
 
-    @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
+    @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: 
{1}")
     public static Object[] parameters() {
-        return new Object[][] {new Object[] {false}, new Object[] {true}};
+        List<Object[]> parameterTuples = new ArrayList<>();
+        for (String mongoVersion : MONGO_VERSIONS) {
+            parameterTuples.add(new Object[] {mongoVersion, true});
+            parameterTuples.add(new Object[] {mongoVersion, false});
+        }
+        return parameterTuples.toArray();
     }
 
     @Test
     public void testGetMongoDBVersion() {
         MongoDBSourceConfig config =
                 new MongoDBSourceConfigFactory()
-                        .hosts(CONTAINER.getHostAndPort())
+                        .hosts(mongoContainer.getHostAndPort())
                         .splitSizeMB(1)
                         .samplesPerChunk(10)
                         .pollAwaitTimeMillis(500)
                         .create(0);
 
-        assertEquals(MongoUtils.getMongoVersion(config), "6.0.9");
+        assertEquals(MongoUtils.getMongoVersion(config), mongoVersion);
     }
 
     @Test
@@ -499,16 +507,16 @@ public class MongoDBFullChangelogITCase extends 
MongoDBSourceTestBase {
                 "customer_" + Integer.toUnsignedString(new Random().nextInt(), 
36);
 
         // A - enable system-level fulldoc pre & post image feature
-        CONTAINER.executeCommand(
+        mongoContainer.executeCommand(
                 "use admin; db.runCommand({ setClusterParameter: { 
changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
 
         // B - enable collection-level fulldoc pre & post image for change 
capture collection
-        CONTAINER.executeCommandInDatabase(
+        mongoContainer.executeCommandInDatabase(
                 String.format(
                         "db.createCollection('%s'); db.runCommand({ collMod: 
'%s', changeStreamPreAndPostImages: { enabled: true } })",
                         "customers", "customers"),
                 customerDatabase);
-        CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
+        mongoContainer.executeCommandFileInDatabase("customer", 
customerDatabase);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(1000);
@@ -526,7 +534,7 @@ public class MongoDBFullChangelogITCase extends 
MongoDBSourceTestBase {
         TestTable customerTable = new TestTable(customerDatabase, "customers", 
customersSchema);
         MongoDBSource source =
                 new MongoDBSourceBuilder()
-                        .hosts(CONTAINER.getHostAndPort())
+                        .hosts(mongoContainer.getHostAndPort())
                         .databaseList(customerDatabase)
                         .username(FLINK_USER)
                         .password(FLINK_USER_PASSWORD)
@@ -613,12 +621,12 @@ public class MongoDBFullChangelogITCase extends 
MongoDBSourceTestBase {
                 "customer_" + Integer.toUnsignedString(new Random().nextInt(), 
36);
 
         // A - enable system-level fulldoc pre & post image feature
-        CONTAINER.executeCommand(
+        mongoContainer.executeCommand(
                 "use admin; db.runCommand({ setClusterParameter: { 
changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
 
         // B - enable collection-level fulldoc pre & post image for change 
capture collection
         for (String collectionName : captureCustomerCollections) {
-            CONTAINER.executeCommandInDatabase(
+            mongoContainer.executeCommandInDatabase(
                     String.format(
                             "db.createCollection('%s'); db.runCommand({ 
collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
                             collectionName, collectionName),
@@ -654,14 +662,14 @@ public class MongoDBFullChangelogITCase extends 
MongoDBSourceTestBase {
                                 + " 'scan.incremental.snapshot.backfill.skip' 
= '%s'"
                                 + ")",
                         parallelismSnapshot ? "true" : "false",
-                        CONTAINER.getHostAndPort(),
+                        mongoContainer.getHostAndPort(),
                         FLINK_USER,
                         FLINK_USER_PASSWORD,
                         customerDatabase,
                         getCollectionNameRegex(customerDatabase, 
captureCustomerCollections),
                         skipSnapshotBackfill);
 
-        CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
+        mongoContainer.executeCommandFileInDatabase("customer", 
customerDatabase);
 
         // first step: check the snapshot data
         String[] snapshotForSingleTable =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java
index dd2134136..a285fe4a3 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java
@@ -24,22 +24,42 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.List;
 
 import static 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
 import static 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
 import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;
 
 /** Example Tests for {@link MongoDBSource}. */
+@RunWith(Parameterized.class)
 public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase {
 
+    @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: 
{1}")
+    public static Object[] parameters() {
+        List<Object[]> parameterTuples = new ArrayList<>();
+        for (String mongoVersion : MONGO_VERSIONS) {
+            parameterTuples.add(new Object[] {mongoVersion, true});
+            parameterTuples.add(new Object[] {mongoVersion, false});
+        }
+        return parameterTuples.toArray();
+    }
+
+    public MongoDBParallelSourceExampleTest(String mongoVersion) {
+        super(mongoVersion);
+    }
+
     @Test
     @Ignore("Test ignored because it won't stop and is used for manual test")
     public void testMongoDBExampleSource() throws Exception {
-        String database = 
CONTAINER.executeCommandFileInSeparateDatabase("inventory");
+        String database = 
mongoContainer.executeCommandFileInSeparateDatabase("inventory");
 
         MongoDBSource<String> mongoSource =
                 MongoDBSource.<String>builder()
-                        .hosts(CONTAINER.getHostAndPort())
+                        .hosts(mongoContainer.getHostAndPort())
                         .databaseList(database)
                         .collectionList(database + ".products")
                         .username(FLINK_USER)
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java
index 11332eb74..25918fd8b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java
@@ -46,12 +46,15 @@ import org.bson.Document;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
 import static 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@@ -65,11 +68,21 @@ import static 
org.apache.flink.table.catalog.Column.physical;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** IT tests for {@link MongoDBSource}. */
+@RunWith(Parameterized.class)
 public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
     private static final int USE_POST_LOWWATERMARK_HOOK = 1;
     private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
     private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
 
+    public MongoDBParallelSourceITCase(String mongoVersion) {
+        super(mongoVersion);
+    }
+
+    @Parameterized.Parameters(name = "mongoVersion: {0}")
+    public static Object[] parameters() {
+        return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
+    }
+
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
 
     @Test
@@ -406,7 +419,7 @@ public class MongoDBParallelSourceITCase extends 
MongoDBSourceTestBase {
     private List<String> testBackfillWhenWritingEvents(
             boolean skipBackFill, int fetchSize, int hookType, StartupOptions 
startupOptions)
             throws Exception {
-        String customerDatabase = 
CONTAINER.executeCommandFileInSeparateDatabase("customer");
+        String customerDatabase = 
mongoContainer.executeCommandFileInSeparateDatabase("customer");
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(1000);
         env.setParallelism(1);
@@ -423,7 +436,7 @@ public class MongoDBParallelSourceITCase extends 
MongoDBSourceTestBase {
         TestTable customerTable = new TestTable(customerDatabase, "customers", 
customersSchema);
         MongoDBSource source =
                 new MongoDBSourceBuilder()
-                        .hosts(CONTAINER.getHostAndPort())
+                        .hosts(mongoContainer.getHostAndPort())
                         .databaseList(customerDatabase)
                         .username(FLINK_USER)
                         .password(FLINK_USER_PASSWORD)
@@ -507,7 +520,7 @@ public class MongoDBParallelSourceITCase extends 
MongoDBSourceTestBase {
             boolean skipSnapshotBackfill)
             throws Exception {
 
-        String customerDatabase = 
CONTAINER.executeCommandFileInSeparateDatabase("customer");
+        String customerDatabase = 
mongoContainer.executeCommandFileInSeparateDatabase("customer");
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -535,7 +548,7 @@ public class MongoDBParallelSourceITCase extends 
MongoDBSourceTestBase {
                                 + " 'heartbeat.interval.ms' = '500',"
                                 + " 'scan.incremental.snapshot.backfill.skip' 
= '%s'"
                                 + ")",
-                        CONTAINER.getHostAndPort(),
+                        mongoContainer.getHostAndPort(),
                         FLINK_USER,
                         FLINK_USER_PASSWORD,
                         customerDatabase,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java
index 6bfcca824..49a362969 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java
@@ -26,8 +26,7 @@ import com.mongodb.ConnectionString;
 import com.mongodb.MongoClientSettings;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
+import org.junit.Before;
 import org.junit.Rule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,10 +38,21 @@ import java.util.stream.Stream;
 /** MongoDBSourceTestBase for MongoDB >= 5.0.3. */
 public class MongoDBSourceTestBase {
 
-    protected static MongoClient mongodbClient;
+    public MongoDBSourceTestBase(String mongoVersion) {
+        this.mongoContainer =
+                new MongoDBContainer("mongo:" + mongoVersion)
+                        .withSharding()
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+    }
+
+    public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"};
 
     protected static final int DEFAULT_PARALLELISM = 4;
 
+    @Rule public final MongoDBContainer mongoContainer;
+
+    protected MongoClient mongodbClient;
+
     @Rule
     public final MiniClusterWithClientResource miniClusterResource =
             new MiniClusterWithClientResource(
@@ -53,15 +63,15 @@ public class MongoDBSourceTestBase {
                             .withHaLeadershipControl()
                             .build());
 
-    @BeforeClass
-    public static void startContainers() {
+    @Before
+    public void startContainers() {
         LOG.info("Starting containers...");
-        Startables.deepStart(Stream.of(CONTAINER)).join();
+        Startables.deepStart(Stream.of(mongoContainer)).join();
 
         MongoClientSettings settings =
                 MongoClientSettings.builder()
                         .applyConnectionString(
-                                new 
ConnectionString(CONTAINER.getConnectionString()))
+                                new 
ConnectionString(mongoContainer.getConnectionString()))
                         .build();
         mongodbClient = MongoClients.create(settings);
 
@@ -69,10 +79,4 @@ public class MongoDBSourceTestBase {
     }
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MongoDBSourceTestBase.class);
-
-    @ClassRule
-    public static final MongoDBContainer CONTAINER =
-            new MongoDBContainer("mongo:6.0.9")
-                    .withSharding()
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
index 4ffc74cbf..69900b4e9 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
@@ -41,6 +41,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -55,6 +57,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.lang.String.format;
 import static 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@@ -62,6 +65,7 @@ import static 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLI
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** IT tests to cover various newly added collections during capture process. 
*/
+@RunWith(Parameterized.class)
 public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
 
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(500);
@@ -69,6 +73,15 @@ public class NewlyAddedTableITCase extends 
MongoDBSourceTestBase {
     private String customerDatabase;
     protected static final int DEFAULT_PARALLELISM = 4;
 
+    public NewlyAddedTableITCase(String mongoVersion) {
+        super(mongoVersion);
+    }
+
+    @Parameterized.Parameters(name = "mongoVersion: {0}")
+    public static Object[] parameters() {
+        return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
+    }
+
     private final ScheduledExecutorService mockChangelogExecutor =
             Executors.newScheduledThreadPool(1);
 
@@ -79,7 +92,7 @@ public class NewlyAddedTableITCase extends 
MongoDBSourceTestBase {
         // prepare initial data for given collection
         String collectionName = "produce_changelog";
         // enable system-level fulldoc pre & post image feature
-        CONTAINER.executeCommand(
+        mongoContainer.executeCommand(
                 "use admin; db.runCommand({ setClusterParameter: { 
changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
 
         // mock continuous changelog during the newly added collections 
capturing process
@@ -846,7 +859,7 @@ public class NewlyAddedTableITCase extends 
MongoDBSourceTestBase {
             // make initial data for given collection.
             String cityName = collectionName.split("_")[1];
             // B - enable collection-level fulldoc pre & post image for change 
capture collection
-            CONTAINER.executeCommandInDatabase(
+            mongoContainer.executeCommandInDatabase(
                     String.format(
                             "db.createCollection('%s'); db.runCommand({ 
collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
                             collectionName, collectionName),
@@ -983,7 +996,7 @@ public class NewlyAddedTableITCase extends 
MongoDBSourceTestBase {
                         + " 'scan.newly-added-table.enabled' = 'true'"
                         + " %s"
                         + ")",
-                CONTAINER.getHostAndPort(),
+                mongoContainer.getHostAndPort(),
                 FLINK_USER,
                 FLINK_USER_PASSWORD,
                 customerDatabase,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java
index e439c098f..b683f303d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java
@@ -42,11 +42,14 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.bson.BsonDocument;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.stream.Stream;
 
 import static java.util.Collections.singletonList;
 import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isWatermarkEvent;
@@ -57,6 +60,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /** MongoDB snapshot split reader test case. */
+@RunWith(Parameterized.class)
 public class MongoDBSnapshotSplitReaderTest extends MongoDBSourceTestBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MongoDBSnapshotSplitReaderTest.class);
@@ -71,13 +75,22 @@ public class MongoDBSnapshotSplitReaderTest extends 
MongoDBSourceTestBase {
 
     private SplitContext splitContext;
 
+    public MongoDBSnapshotSplitReaderTest(String mongoVersion) {
+        super(mongoVersion);
+    }
+
+    @Parameterized.Parameters(name = "mongoVersion: {0}")
+    public static Object[] parameters() {
+        return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
+    }
+
     @Before
     public void before() {
-        database = 
CONTAINER.executeCommandFileInSeparateDatabase("chunk_test");
+        database = 
mongoContainer.executeCommandFileInSeparateDatabase("chunk_test");
 
         MongoDBSourceConfigFactory configFactory =
                 new MongoDBSourceConfigFactory()
-                        .hosts(CONTAINER.getHostAndPort())
+                        .hosts(mongoContainer.getHostAndPort())
                         .databaseList(database)
                         .collectionList(database + ".shopping_cart")
                         .username(FLINK_USER)
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java
index 1699a550c..bf781904b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java
@@ -43,12 +43,15 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Stream;
 
 import static java.util.Collections.singletonList;
 import static 
org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.FULL_DOCUMENT_FIELD;
@@ -65,6 +68,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /** MongoDB stream split reader test case. */
+@RunWith(Parameterized.class)
 public class MongoDBStreamSplitReaderTest extends MongoDBSourceTestBase {
 
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
@@ -85,13 +89,22 @@ public class MongoDBStreamSplitReaderTest extends 
MongoDBSourceTestBase {
 
     private BsonDocument startupResumeToken;
 
+    public MongoDBStreamSplitReaderTest(String mongoVersion) {
+        super(mongoVersion);
+    }
+
+    @Parameterized.Parameters(name = "mongoVersion: {0}")
+    public static Object[] parameters() {
+        return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
+    }
+
     @Before
     public void before() {
-        database = 
CONTAINER.executeCommandFileInSeparateDatabase("chunk_test");
+        database = 
mongoContainer.executeCommandFileInSeparateDatabase("chunk_test");
 
         MongoDBSourceConfigFactory configFactory =
                 new MongoDBSourceConfigFactory()
-                        .hosts(CONTAINER.getHostAndPort())
+                        .hosts(mongoContainer.getHostAndPort())
                         .databaseList(database)
                         .collectionList(database + ".shopping_cart")
                         .username(FLINK_USER)
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java
index b15ebd1fc..19d47856e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java
@@ -67,13 +67,19 @@ public class MongoDBConnectorITCase extends 
MongoDBSourceTestBase {
 
     private final boolean parallelismSnapshot;
 
-    public MongoDBConnectorITCase(boolean parallelismSnapshot) {
+    public MongoDBConnectorITCase(String mongoVersion, boolean 
parallelismSnapshot) {
+        super(mongoVersion);
         this.parallelismSnapshot = parallelismSnapshot;
     }
 
-    @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
+    @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: 
{1}")
     public static Object[] parameters() {
-        return new Object[][] {new Object[] {false}, new Object[] {true}};
+        return new Object[][] {
+            new Object[] {"6.0.16", true},
+            new Object[] {"6.0.16", false},
+            new Object[] {"7.0.12", true},
+            new Object[] {"7.0.12", false}
+        };
     }
 
     @Before
@@ -90,7 +96,7 @@ public class MongoDBConnectorITCase extends 
MongoDBSourceTestBase {
 
     @Test
     public void testConsumingAllEvents() throws ExecutionException, 
InterruptedException {
-        String database = 
CONTAINER.executeCommandFileInSeparateDatabase("inventory");
+        String database = 
mongoContainer.executeCommandFileInSeparateDatabase("inventory");
 
         String sourceDDL =
                 String.format(
@@ -111,7 +117,7 @@ public class MongoDBConnectorITCase extends 
MongoDBSourceTestBase {
                                 + " 'scan.incremental.snapshot.enabled' = 
'%s',"
                                 + " 'heartbeat.interval.ms' = '1000'"
                                 + ")",
-                        CONTAINER.getHostAndPort(),
+                        mongoContainer.getHostAndPort(),
                         FLINK_USER,
                         FLINK_USER_PASSWORD,
                         database,
@@ -223,7 +229,7 @@ public class MongoDBConnectorITCase extends 
MongoDBSourceTestBase {
 
     @Test
     public void testStartupFromTimestamp() throws Exception {
-        String database = 
CONTAINER.executeCommandFileInSeparateDatabase("inventory");
+        String database = 
mongoContainer.executeCommandFileInSeparateDatabase("inventory");
 
         // Unfortunately we have to sleep here to differ initial and 
later-generating changes in
         // oplog by timestamp
@@ -252,7 +258,7 @@ public class MongoDBConnectorITCase extends 
MongoDBSourceTestBase {
                                 + "',"
                                 + " 'heartbeat.interval.ms' = '1000'"
                                 + ")",
-                        CONTAINER.getHostAndPort(),
+                        mongoContainer.getHostAndPort(),
                         FLINK_USER,
                         FLINK_USER_PASSWORD,
                         database,
@@ -302,7 +308,7 @@ public class MongoDBConnectorITCase extends 
MongoDBSourceTestBase {
 
     @Test
     public void testAllTypes() throws Throwable {
-        String database = 
CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
+        String database = 
mongoContainer.executeCommandFileInSeparateDatabase("column_type_test");
 
         String sourceDDL =
                 String.format(
@@ -345,7 +351,7 @@ public class MongoDBConnectorITCase extends 
MongoDBSourceTestBase {
                                 + " 'database' = '%s',"
                                 + " 'collection' = '%s'"
                                 + ")",
-                        CONTAINER.getHostAndPort(),
+                        mongoContainer.getHostAndPort(),
                         FLINK_USER,
                         FLINK_USER_PASSWORD,
                         database,
@@ -465,7 +471,7 @@ public class MongoDBConnectorITCase extends 
MongoDBSourceTestBase {
 
     @Test
     public void testMetadataColumns() throws Exception {
-        String database = 
CONTAINER.executeCommandFileInSeparateDatabase("inventory");
+        String database = 
mongoContainer.executeCommandFileInSeparateDatabase("inventory");
 
         String sourceDDL =
                 String.format(
@@ -487,7 +493,7 @@ public class MongoDBConnectorITCase extends 
MongoDBSourceTestBase {
                                 + " 'collection' = '%s',"
                                 + " 'scan.incremental.snapshot.enabled' = '%s'"
                                 + ")",
-                        CONTAINER.getHostAndPort(),
+                        mongoContainer.getHostAndPort(),
                         FLINK_USER,
                         FLINK_USER_PASSWORD,
                         database,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
index 317dc7468..3352fe1ef 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
@@ -52,13 +52,19 @@ public class MongoDBRegexFilterITCase extends 
MongoDBSourceTestBase {
 
     private final boolean parallelismSnapshot;
 
-    public MongoDBRegexFilterITCase(boolean parallelismSnapshot) {
+    public MongoDBRegexFilterITCase(String mongoVersion, boolean 
parallelismSnapshot) {
+        super(mongoVersion);
         this.parallelismSnapshot = parallelismSnapshot;
     }
 
-    @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
+    @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: 
{1}")
     public static Object[] parameters() {
-        return new Object[][] {new Object[] {false}, new Object[] {true}};
+        return new Object[][] {
+            new Object[] {"6.0.16", true},
+            new Object[] {"6.0.16", false},
+            new Object[] {"7.0.12", true},
+            new Object[] {"7.0.12", false}
+        };
     }
 
     @Before
@@ -77,9 +83,9 @@ public class MongoDBRegexFilterITCase extends 
MongoDBSourceTestBase {
     public void testMatchMultipleDatabasesAndCollections() throws Exception {
         // 1. Given collections:
         // db0: [coll_a1, coll_a2, coll_b1, coll_b2]
-        String db0 = 
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+        String db0 = 
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
         // db1: [coll_a1, coll_a2, coll_b1, coll_b2]
-        String db1 = 
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+        String db1 = 
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
 
         // 2. Test match: collection = ^(db0|db1)\.coll_a\d?$
         String collectionRegex = String.format("^(%s|%s)\\.coll_a\\d?$", db0, 
db1);
@@ -120,11 +126,11 @@ public class MongoDBRegexFilterITCase extends 
MongoDBSourceTestBase {
     public void testMatchMultipleDatabases() throws Exception {
         // 1. Given collections:
         // db0: [coll_a1, coll_a2, coll_b1, coll_b2]
-        String db0 = 
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+        String db0 = 
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
         // db1: [coll_a1, coll_a2, coll_b1, coll_b2]
-        String db1 = 
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+        String db1 = 
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
         // db2: [coll_a1, coll_a2, coll_b1, coll_b2]
-        String db2 = 
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+        String db2 = 
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
 
         // 2. Test match database: ^(db0|db1)$
         String databaseRegex = String.format("%s|%s", db0, db1);
@@ -174,9 +180,9 @@ public class MongoDBRegexFilterITCase extends 
MongoDBSourceTestBase {
     public void testMatchSingleQualifiedCollectionPattern() throws Exception {
         // 1. Given collections:
         // db0: [coll_a1, coll_a2, coll_b1, coll_b2]
-        String db0 = 
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+        String db0 = 
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
         // db1: [coll_a1, coll_a2, coll_b1, coll_b2]
-        String db1 = 
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+        String db1 = 
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
 
         // 2. Test match: collection ^(db0|db1)\.coll_a\d?$
         String collectionRegex = String.format("^%s\\.coll_b\\d?$", db0);
@@ -213,9 +219,9 @@ public class MongoDBRegexFilterITCase extends 
MongoDBSourceTestBase {
     public void testMatchSingleDatabaseWithCollectionPattern() throws 
Exception {
         // 1. Given collections:
         // db0: [coll_a1, coll_a2, coll_b1, coll_b2]
-        String db0 = 
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+        String db0 = 
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
         // db1: [coll_a1, coll_a2, coll_b1, coll_b2]
-        String db1 = 
CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
+        String db1 = 
mongoContainer.executeCommandFileInSeparateDatabase("ns_regex");
 
         // 2. Test match: collection .*coll_b\d?
         String collectionRegex = ".*coll_b\\d?";
@@ -251,7 +257,7 @@ public class MongoDBRegexFilterITCase extends 
MongoDBSourceTestBase {
     public void testMatchDatabaseAndCollectionContainsDash() throws Exception {
         // 1. Given collections:
         // db0: [coll-a1, coll-a2, coll-b1, coll-b2]
-        String db0 = 
CONTAINER.executeCommandFileInSeparateDatabase("ns-regex");
+        String db0 = 
mongoContainer.executeCommandFileInSeparateDatabase("ns-regex");
 
         TableResult result = submitTestCase(db0, "coll-a1");
 
@@ -271,7 +277,7 @@ public class MongoDBRegexFilterITCase extends 
MongoDBSourceTestBase {
     public void testMatchCollectionWithDots() throws Exception {
         // 1. Given colllections:
         // db: [coll.name]
-        String db = 
CONTAINER.executeCommandFileInSeparateDatabase("ns-dotted");
+        String db = 
mongoContainer.executeCommandFileInSeparateDatabase("ns-dotted");
 
         TableResult result = submitTestCase(db, db + "[.]coll[.]name");
 
@@ -301,7 +307,7 @@ public class MongoDBRegexFilterITCase extends 
MongoDBSourceTestBase {
                         + " coll_name STRING METADATA FROM 'collection_name' 
VIRTUAL,"
                         + " PRIMARY KEY (_id) NOT ENFORCED"
                         + ") WITH ("
-                        + ignoreIfNull("hosts", CONTAINER.getHostAndPort())
+                        + ignoreIfNull("hosts", 
mongoContainer.getHostAndPort())
                         + ignoreIfNull("username", FLINK_USER)
                         + ignoreIfNull("password", FLINK_USER_PASSWORD)
                         + ignoreIfNull("database", database)
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java
index c3e7112c2..f9edc73fa 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java
@@ -55,21 +55,25 @@ public class MongoDBTimeZoneITCase extends 
MongoDBSourceTestBase {
 
     private final boolean parallelismSnapshot;
 
-    public MongoDBTimeZoneITCase(String localTimeZone, boolean 
parallelismSnapshot) {
+    public MongoDBTimeZoneITCase(
+            String mongoVersion, String localTimeZone, boolean 
parallelismSnapshot) {
+        super(mongoVersion);
         this.localTimeZone = localTimeZone;
         this.parallelismSnapshot = parallelismSnapshot;
     }
 
-    @Parameterized.Parameters(name = "localTimeZone: {0}, parallelismSnapshot: 
{1}")
+    @Parameterized.Parameters(
+            name = "mongoVersion: {0}, localTimeZone: {1}, 
parallelismSnapshot: {2}")
     public static Object[] parameters() {
-        return new Object[][] {
-            new Object[] {"Asia/Shanghai", false},
-            new Object[] {"Europe/Berlin", false},
-            new Object[] {"UTC", false},
-            new Object[] {"Asia/Shanghai", true},
-            new Object[] {"Europe/Berlin", true},
-            new Object[] {"UTC", true}
-        };
+        List<Object[]> parameterTuples = new ArrayList<>();
+        for (String mongoVersion : MONGO_VERSIONS) {
+            for (String timezone : new String[] {"Asia/Shanghai", 
"Europe/Berlin", "UTC"}) {
+                for (boolean parallelismSnapshot : new boolean[] {true, 
false}) {
+                    parameterTuples.add(new Object[] {mongoVersion, timezone, 
parallelismSnapshot});
+                }
+            }
+        }
+        return parameterTuples.toArray();
     }
 
     @Before
@@ -87,7 +91,7 @@ public class MongoDBTimeZoneITCase extends 
MongoDBSourceTestBase {
     public void testTemporalTypesWithTimeZone() throws Exception {
         tEnv.getConfig().setLocalTimeZone(ZoneId.of(localTimeZone));
 
-        String database = 
CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
+        String database = 
mongoContainer.executeCommandFileInSeparateDatabase("column_type_test");
 
         String sourceDDL =
                 String.format(
@@ -108,7 +112,7 @@ public class MongoDBTimeZoneITCase extends 
MongoDBSourceTestBase {
                                 + " 'database' = '%s',"
                                 + " 'collection' = '%s'"
                                 + ")",
-                        CONTAINER.getHostAndPort(),
+                        mongoContainer.getHostAndPort(),
                         FLINK_USER,
                         FLINK_USER_PASSWORD,
                         database,
@@ -160,7 +164,7 @@ public class MongoDBTimeZoneITCase extends 
MongoDBSourceTestBase {
     public void testDateAndTimestampToStringWithTimeZone() throws Exception {
         tEnv.getConfig().setLocalTimeZone(ZoneId.of(localTimeZone));
 
-        String database = 
CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
+        String database = 
mongoContainer.executeCommandFileInSeparateDatabase("column_type_test");
 
         String sourceDDL =
                 String.format(
@@ -177,7 +181,7 @@ public class MongoDBTimeZoneITCase extends 
MongoDBSourceTestBase {
                                 + " 'database' = '%s',"
                                 + " 'collection' = '%s'"
                                 + ")",
-                        CONTAINER.getHostAndPort(),
+                        mongoContainer.getHostAndPort(),
                         FLINK_USER,
                         FLINK_USER_PASSWORD,
                         database,
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
index f3db2624d..e9cf648f9 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java
@@ -66,22 +66,30 @@ public class MongoE2eITCase extends 
FlinkContainerTestEnvironment {
 
     private MongoClient mongoClient;
 
+    public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"};
+
     @Parameterized.Parameter(1)
-    public boolean parallelismSnapshot;
+    public String mongoVersion;
 
     @Parameterized.Parameter(2)
+    public boolean parallelismSnapshot;
+
+    @Parameterized.Parameter(3)
     public boolean scanFullChangelog;
 
     @Parameterized.Parameters(
-            name = "flinkVersion: {0}, parallelismSnapshot: {1}, 
scanFullChangelog: {2}")
+            name =
+                    "flinkVersion: {0}, mongoVersion: {1}, 
parallelismSnapshot: {2}, scanFullChangelog: {3}")
     public static List<Object[]> parameters() {
         final List<String> flinkVersions = getFlinkVersion();
         List<Object[]> params = new ArrayList<>();
         for (String flinkVersion : flinkVersions) {
-            params.add(new Object[] {flinkVersion, true, true});
-            params.add(new Object[] {flinkVersion, true, false});
-            params.add(new Object[] {flinkVersion, false, true});
-            params.add(new Object[] {flinkVersion, false, false});
+            for (String mongoVersion : MONGO_VERSIONS) {
+                params.add(new Object[] {flinkVersion, mongoVersion, true, 
true});
+                params.add(new Object[] {flinkVersion, mongoVersion, true, 
false});
+                params.add(new Object[] {flinkVersion, mongoVersion, false, 
true});
+                params.add(new Object[] {flinkVersion, mongoVersion, false, 
false});
+            }
         }
         return params;
     }

Reply via email to