This is an automated email from the ASF dual-hosted git repository.

jiabaosun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git

commit 5f63e58938a0362ce7bba1dd25832e3b228c409f
Author: Jiabao Sun <[email protected]>
AuthorDate: Thu Aug 15 17:33:41 2024 +0800

    [FLINK-36053][connectors/mongodb] Support multi-version integration test in 
mongodb connector
---
 .github/workflows/push_pr.yml                      |  23 +++--
 .../flink/tests/util/mongodb/MongoE2ECase.java     |   4 +-
 .../connector/mongodb/sink/MongoSinkITCase.java    |   3 +-
 .../mongodb/sink/writer/MongoWriterITCase.java     |   3 +-
 .../mongodb/table/MongoDynamicTableSinkITCase.java |   3 +-
 .../table/MongoDynamicTableSourceITCase.java       |   3 +-
 .../table/MongoPartitionedTableSinkITCase.java     |   2 +
 .../mongodb/testutils/MongoShardedContainers.java  | 103 ++++++++++++++-------
 .../connector/mongodb/testutils/MongoTestUtil.java |  20 ++--
 pom.xml                                            |  40 +++++++-
 10 files changed, 146 insertions(+), 58 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 1402bbd..8725f70 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -28,14 +28,25 @@ jobs:
   compile_and_test:
     strategy:
       matrix:
-        flink: [ 1.18-SNAPSHOT ]
-        jdk: [ '8, 11, 17' ]
+        mongodb: [ mongodb4, mongodb5, mongodb6, mongodb7 ]
+        flink: [ 1.19-SNAPSHOT, 1.20-SNAPSHOT ]
+        jdk: [ '8, 11, 17, 21' ]
         include:
-          - flink: 1.19-SNAPSHOT
-            jdk: '8, 11, 17, 21'
-          - flink: 1.20-SNAPSHOT
-            jdk: '8, 11, 17, 21'
+          - mongodb: mongodb4
+            flink: 1.18-SNAPSHOT
+            jdk: '8, 11, 17'
+          - mongodb: mongodb5
+            flink: 1.18-SNAPSHOT
+            jdk: '8, 11, 17'
+          - mongodb: mongodb6
+            flink: 1.18-SNAPSHOT
+            jdk: '8, 11, 17'
+          - mongodb: mongodb7
+            flink: 1.18-SNAPSHOT
+            jdk: '8, 11, 17'
+
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
       flink_version: ${{ matrix.flink }}
       jdk_version: ${{ matrix.jdk }}
+      optional_maven_profiles: ${{ matrix.mongodb }}
diff --git 
a/flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java
 
b/flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java
index da9d821..49e87be 100644
--- 
a/flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java
+++ 
b/flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.tests.util.mongodb;
 
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
 import org.apache.flink.connector.testframe.container.FlinkContainers;
 import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
 import org.apache.flink.connector.testframe.container.TestcontainersSettings;
@@ -51,7 +52,6 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.MONGODB_HOSTNAME;
-import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.MONGO_4_0;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** End-to-end test for the MongoDB connectors. */
@@ -69,7 +69,7 @@ class MongoE2ECase {
 
     @Container
     private static final MongoDBContainer MONGO_CONTAINER =
-            new MongoDBContainer(MONGO_4_0)
+            MongoTestUtil.createMongoDBContainer()
                     .withLogConsumer(new Slf4jLogConsumer(LOG))
                     .withNetwork(NETWORK)
                     .withNetworkAliases(MONGODB_HOSTNAME);
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
index 2673b04..867f1f0 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
@@ -47,6 +47,7 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
@@ -63,7 +64,7 @@ class MongoSinkITCase {
 
     @Container
     private static final MongoDBContainer MONGO_CONTAINER =
-            MongoTestUtil.createMongoDBContainer(LOG);
+            MongoTestUtil.createMongoDBContainer().withLogConsumer(new 
Slf4jLogConsumer(LOG));
 
     @RegisterExtension
     private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index 5aacb20..daa5b8b 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -47,6 +47,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
@@ -76,7 +77,7 @@ class MongoWriterITCase {
 
     @Container
     private static final MongoDBContainer MONGO_CONTAINER =
-            MongoTestUtil.createMongoDBContainer(LOG);
+            MongoTestUtil.createMongoDBContainer().withLogConsumer(new 
Slf4jLogConsumer(LOG));
 
     private static MongoClient mongoClient;
     private static TestSinkInitContext sinkInitContext;
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java
index a1dcbc6..a9d53b1 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java
@@ -68,6 +68,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
@@ -93,7 +94,7 @@ class MongoDynamicTableSinkITCase {
 
     @Container
     private static final MongoDBContainer MONGO_CONTAINER =
-            MongoTestUtil.createMongoDBContainer(LOG);
+            MongoTestUtil.createMongoDBContainer().withLogConsumer(new 
Slf4jLogConsumer(LOG));
 
     @RegisterExtension
     private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
index f50a35a..6091c54 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
@@ -65,6 +65,7 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
@@ -101,7 +102,7 @@ class MongoDynamicTableSourceITCase {
 
     @Container
     private static final MongoDBContainer MONGO_CONTAINER =
-            MongoTestUtil.createMongoDBContainer(LOG);
+            MongoTestUtil.createMongoDBContainer().withLogConsumer(new 
Slf4jLogConsumer(LOG));
 
     private static final String TEST_DATABASE = "test";
     private static final String TEST_COLLECTION = "mongo_table_source";
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoPartitionedTableSinkITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoPartitionedTableSinkITCase.java
index abe2546..69e4ba2 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoPartitionedTableSinkITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoPartitionedTableSinkITCase.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 
@@ -51,6 +52,7 @@ import static org.apache.flink.table.api.Expressions.row;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+/** IT tests for {@link SupportsPartitioning} feature of {@link 
MongoDynamicTableSink}. */
 class MongoPartitionedTableSinkITCase {
 
     @RegisterExtension
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoShardedContainers.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoShardedContainers.java
index 3ea42e8..4fbdbe9 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoShardedContainers.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoShardedContainers.java
@@ -23,6 +23,7 @@ import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.MongoDBContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -52,25 +53,29 @@ public class MongoShardedContainers implements 
BeforeAllCallback, AfterAllCallba
         Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(LOG);
         this.configSrv =
                 new MongoDBContainer(dockerImageName)
-                        .withCreateContainerCmdModifier(it -> 
it.withHostName(CONFIG_HOSTNAME))
-                        .withCommand(
-                                "-configsvr",
-                                "--replSet",
-                                CONFIG_REPLICA_SET_NAME,
-                                "--port",
-                                String.valueOf(MONGODB_INTERNAL_PORT))
+                        .withCreateContainerCmdModifier(
+                                it ->
+                                        it.withCmd(
+                                                        "-configsvr",
+                                                        "--replSet",
+                                                        
CONFIG_REPLICA_SET_NAME,
+                                                        "--port",
+                                                        
String.valueOf(MONGODB_INTERNAL_PORT))
+                                                .withHostName(CONFIG_HOSTNAME))
                         .withNetwork(network)
                         .withNetworkAliases(CONFIG_HOSTNAME)
                         .withLogConsumer(logConsumer);
         this.shardSrv =
                 new MongoDBContainer(dockerImageName)
-                        .withCreateContainerCmdModifier(it -> 
it.withHostName(SHARD_HOSTNAME))
-                        .withCommand(
-                                "-shardsvr",
-                                "--replSet",
-                                SHARD_REPLICA_SET_NAME,
-                                "--port",
-                                String.valueOf(MONGODB_INTERNAL_PORT))
+                        .withCreateContainerCmdModifier(
+                                it ->
+                                        it.withCmd(
+                                                        "-shardsvr",
+                                                        "--replSet",
+                                                        SHARD_REPLICA_SET_NAME,
+                                                        "--port",
+                                                        
String.valueOf(MONGODB_INTERNAL_PORT))
+                                                .withHostName(SHARD_HOSTNAME))
                         .withNetwork(network)
                         .withNetworkAliases(SHARD_HOSTNAME)
                         .withLogConsumer(logConsumer);
@@ -105,38 +110,64 @@ public class MongoShardedContainers implements 
BeforeAllCallback, AfterAllCallba
                 "mongodb://%s:%d", router.getHost(), 
router.getMappedPort(MONGODB_INTERNAL_PORT));
     }
 
+    public void executeCommand(String command) {
+        executeCommand(router, command);
+    }
+
+    private static void executeCommand(MongoDBContainer container, String 
command) {
+        try {
+            ExecResult execResult = 
container.execInContainer(buildMongoEvalCommand(command));
+            LOG.info(execResult.getStdout());
+            if (execResult.getExitCode() != 0) {
+                throw new IllegalStateException(
+                        "Execute mongo command failed " + 
execResult.getStderr());
+            }
+        } catch (InterruptedException | IOException e) {
+            throw new IllegalStateException("Execute mongo command failed", e);
+        }
+    }
+
+    private static String[] buildMongoEvalCommand(final String command) {
+        return new String[] {
+            "sh",
+            "-c",
+            "mongosh mongo --eval \"" + command + "\"  || mongo --eval \"" + 
command + "\"",
+        };
+    }
+
     private static class MongoRouterContainer extends MongoDBContainer {
+
         private MongoRouterContainer(DockerImageName dockerImageName) {
             super(dockerImageName);
-            withCommand(
-                    "mongos",
-                    "--bind_ip_all",
-                    "--configdb",
-                    String.format(
-                            "%s/%s:%d",
-                            CONFIG_REPLICA_SET_NAME, CONFIG_HOSTNAME, 
MONGODB_INTERNAL_PORT));
+            withCreateContainerCmdModifier(
+                    it ->
+                            it.withCmd(
+                                    "mongos",
+                                    "--bind_ip_all",
+                                    "--configdb",
+                                    String.format(
+                                            "%s/%s:%d",
+                                            CONFIG_REPLICA_SET_NAME,
+                                            CONFIG_HOSTNAME,
+                                            MONGODB_INTERNAL_PORT)));
         }
 
         @Override
-        protected void containerIsStarted(InspectContainerResponse 
containerInfo) {
+        protected void containerIsStarted(InspectContainerResponse 
containerInfo, boolean reused) {
             addShard();
         }
 
         private void addShard() {
-            try {
-                String addShardCommand =
-                        String.format(
-                                "sh.addShard('%s/%s:%d')",
-                                SHARD_REPLICA_SET_NAME, SHARD_HOSTNAME, 
MONGODB_INTERNAL_PORT);
-                ExecResult execResult = execInContainer("mongo", "--eval", 
addShardCommand);
-                LOG.info(execResult.getStdout());
-                if (execResult.getExitCode() != 0) {
-                    throw new IllegalStateException(
-                            "Execute mongo command failed " + 
execResult.getStdout());
-                }
-            } catch (InterruptedException | IOException e) {
-                throw new IllegalStateException("Execute mongo command 
failed", e);
-            }
+            String addShardCommand =
+                    String.format(
+                            "sh.addShard('%s/%s:%d');",
+                            SHARD_REPLICA_SET_NAME, SHARD_HOSTNAME, 
MONGODB_INTERNAL_PORT);
+            executeCommand(this, addShardCommand);
+
+            String addShardToZoneCommand =
+                    "sh.addShardToZone('${shard}', 'zone-0');"
+                            + "sh.addShardToZone('${shard}', 'zone-1');";
+            executeCommand(this, addShardToZoneCommand.replace("${shard}", 
SHARD_REPLICA_SET_NAME));
         }
     }
 }
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
index e41a71e..d6c5bf4 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
@@ -28,10 +28,8 @@ import com.mongodb.client.model.Filters;
 import com.mongodb.client.model.IndexOptions;
 import org.bson.Document;
 import org.bson.conversions.Bson;
-import org.slf4j.Logger;
 import org.testcontainers.containers.MongoDBContainer;
 import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 
 import java.util.ArrayList;
@@ -45,7 +43,7 @@ public class MongoTestUtil {
 
     public static final String MONGODB_HOSTNAME = "mongodb";
 
-    public static final String MONGO_4_0 = "mongo:4.0.10";
+    public static final String MONGO_IMAGE_PREFIX = "mongo:";
 
     public static final String ADMIN_DATABASE = "admin";
     public static final String CONFIG_DATABASE = "config";
@@ -58,12 +56,10 @@ public class MongoTestUtil {
     /**
      * Creates a preconfigured {@link MongoDBContainer}.
      *
-     * @param logger for test containers
      * @return configured MongoDB container
      */
-    public static MongoDBContainer createMongoDBContainer(Logger logger) {
-        return new MongoDBContainer(DockerImageName.parse(MONGO_4_0))
-                .withLogConsumer(new Slf4jLogConsumer(logger));
+    public static MongoDBContainer createMongoDBContainer() {
+        return new MongoDBContainer(mongoDockerImageName());
     }
 
     /**
@@ -73,7 +69,15 @@ public class MongoTestUtil {
      * @return configured MongoDB sharded containers
      */
     public static MongoShardedContainers 
createMongoDBShardedContainers(Network network) {
-        return new MongoShardedContainers(DockerImageName.parse(MONGO_4_0), 
network);
+        return new MongoShardedContainers(mongoDockerImageName(), network);
+    }
+
+    public static DockerImageName mongoDockerImageName() {
+        return DockerImageName.parse(MONGO_IMAGE_PREFIX + mongoVersion());
+    }
+
+    public static String mongoVersion() {
+        return System.getProperty("mongodb.version");
     }
 
     public static void assertThatIdsAreNotWritten(MongoCollection<Document> 
coll, Integer... ids) {
diff --git a/pom.xml b/pom.xml
index 05a81d6..49096ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@ under the License.
        </scm>
 
        <properties>
-               <mongodb.version>5.1.1</mongodb.version>
+               <mongodb.driver.version>5.1.1</mongodb.driver.version>
 
                <flink.version>1.18.0</flink.version>
                <scala.binary.version>2.12</scala.binary.version>
@@ -159,7 +159,7 @@ under the License.
                        <dependency>
                                <groupId>org.mongodb</groupId>
                                <artifactId>mongodb-driver-sync</artifactId>
-                               <version>${mongodb.version}</version>
+                               <version>${mongodb.driver.version}</version>
                        </dependency>
 
                        <!-- Flink dependencies -->
@@ -367,6 +367,11 @@ under the License.
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <systemPropertyVariables>
+                                               
<mongodb.version>${mongodb.version}</mongodb.version>
+                                       </systemPropertyVariables>
+                               </configuration>
                        </plugin>
 
                        <plugin>
@@ -399,6 +404,37 @@ under the License.
                                <spotless.skip>true</spotless.skip>
                        </properties>
                </profile>
+
+               <profile>
+                       <id>mongodb4</id>
+                       <activation>
+                               <activeByDefault>true</activeByDefault>
+                       </activation>
+                       <properties>
+                               <mongodb.version>4.0.28</mongodb.version>
+                       </properties>
+               </profile>
+
+               <profile>
+                       <id>mongodb5</id>
+                       <properties>
+                               <mongodb.version>5.0.28</mongodb.version>
+                       </properties>
+               </profile>
+
+               <profile>
+                       <id>mongodb6</id>
+                       <properties>
+                               <mongodb.version>6.0.16</mongodb.version>
+                       </properties>
+               </profile>
+
+               <profile>
+                       <id>mongodb7</id>
+                       <properties>
+                               <mongodb.version>7.0.12</mongodb.version>
+                       </properties>
+               </profile>
        </profiles>
 
 </project>

Reply via email to