This is an automated email from the ASF dual-hosted git repository. jiabaosun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
commit 5f63e58938a0362ce7bba1dd25832e3b228c409f Author: Jiabao Sun <[email protected]> AuthorDate: Thu Aug 15 17:33:41 2024 +0800 [FLINK-36053][connectors/mongodb] Support multi-version integration test in mongodb connector --- .github/workflows/push_pr.yml | 23 +++-- .../flink/tests/util/mongodb/MongoE2ECase.java | 4 +- .../connector/mongodb/sink/MongoSinkITCase.java | 3 +- .../mongodb/sink/writer/MongoWriterITCase.java | 3 +- .../mongodb/table/MongoDynamicTableSinkITCase.java | 3 +- .../table/MongoDynamicTableSourceITCase.java | 3 +- .../table/MongoPartitionedTableSinkITCase.java | 2 + .../mongodb/testutils/MongoShardedContainers.java | 103 ++++++++++++++------- .../connector/mongodb/testutils/MongoTestUtil.java | 20 ++-- pom.xml | 40 +++++++- 10 files changed, 146 insertions(+), 58 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 1402bbd..8725f70 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -28,14 +28,25 @@ jobs: compile_and_test: strategy: matrix: - flink: [ 1.18-SNAPSHOT ] - jdk: [ '8, 11, 17' ] + mongodb: [ mongodb4, mongodb5, mongodb6, mongodb7 ] + flink: [ 1.19-SNAPSHOT, 1.20-SNAPSHOT ] + jdk: [ '8, 11, 17, 21' ] include: - - flink: 1.19-SNAPSHOT - jdk: '8, 11, 17, 21' - - flink: 1.20-SNAPSHOT - jdk: '8, 11, 17, 21' + - mongodb: mongodb4 + flink: 1.18-SNAPSHOT + jdk: '8, 11, 17' + - mongodb: mongodb5 + flink: 1.18-SNAPSHOT + jdk: '8, 11, 17' + - mongodb: mongodb6 + flink: 1.18-SNAPSHOT + jdk: '8, 11, 17' + - mongodb: mongodb7 + flink: 1.18-SNAPSHOT + jdk: '8, 11, 17' + uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} jdk_version: ${{ matrix.jdk }} + optional_maven_profiles: ${{ matrix.mongodb }} diff --git a/flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java b/flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java index da9d821..49e87be 100644 --- a/flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java +++ b/flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java @@ -19,6 +19,7 @@ package org.apache.flink.tests.util.mongodb; import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.connector.mongodb.testutils.MongoTestUtil; import org.apache.flink.connector.testframe.container.FlinkContainers; import org.apache.flink.connector.testframe.container.FlinkContainersSettings; import org.apache.flink.connector.testframe.container.TestcontainersSettings; @@ -51,7 +52,6 @@ import java.util.List; import java.util.stream.Collectors; import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.MONGODB_HOSTNAME; -import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.MONGO_4_0; import static org.assertj.core.api.Assertions.assertThat; /** End-to-end test for the MongoDB connectors. */ @@ -69,7 +69,7 @@ class MongoE2ECase { @Container private static final MongoDBContainer MONGO_CONTAINER = - new MongoDBContainer(MONGO_4_0) + MongoTestUtil.createMongoDBContainer() .withLogConsumer(new Slf4jLogConsumer(LOG)) .withNetwork(NETWORK) .withNetworkAliases(MONGODB_HOSTNAME); diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java index 2673b04..867f1f0 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java @@ -47,6 +47,7 @@ import org.junit.jupiter.params.provider.EnumSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -63,7 +64,7 @@ class MongoSinkITCase { @Container private static final MongoDBContainer MONGO_CONTAINER = - MongoTestUtil.createMongoDBContainer(LOG); + MongoTestUtil.createMongoDBContainer().withLogConsumer(new Slf4jLogConsumer(LOG)); @RegisterExtension private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java index 5aacb20..daa5b8b 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java @@ -47,6 +47,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -76,7 +77,7 @@ class MongoWriterITCase { @Container private static final MongoDBContainer MONGO_CONTAINER = - MongoTestUtil.createMongoDBContainer(LOG); + MongoTestUtil.createMongoDBContainer().withLogConsumer(new Slf4jLogConsumer(LOG)); private static MongoClient mongoClient; private static TestSinkInitContext sinkInitContext; diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java index a1dcbc6..a9d53b1 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java @@ -68,6 +68,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -93,7 +94,7 @@ class MongoDynamicTableSinkITCase { @Container private static final MongoDBContainer MONGO_CONTAINER = - MongoTestUtil.createMongoDBContainer(LOG); + MongoTestUtil.createMongoDBContainer().withLogConsumer(new Slf4jLogConsumer(LOG)); @RegisterExtension private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java index f50a35a..6091c54 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java @@ -65,6 +65,7 @@ import org.junit.jupiter.params.provider.EnumSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -101,7 +102,7 @@ class MongoDynamicTableSourceITCase { @Container private static final MongoDBContainer MONGO_CONTAINER = - MongoTestUtil.createMongoDBContainer(LOG); + MongoTestUtil.createMongoDBContainer().withLogConsumer(new Slf4jLogConsumer(LOG)); private static final String TEST_DATABASE = "test"; private static final String TEST_COLLECTION = "mongo_table_source"; diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoPartitionedTableSinkITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoPartitionedTableSinkITCase.java index abe2546..69e4ba2 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoPartitionedTableSinkITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoPartitionedTableSinkITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.expressions.Expression; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -51,6 +52,7 @@ import static org.apache.flink.table.api.Expressions.row; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +/** IT tests for {@link SupportsPartitioning} feature of {@link MongoDynamicTableSink}. */ class MongoPartitionedTableSinkITCase { @RegisterExtension diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoShardedContainers.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoShardedContainers.java index 3ea42e8..4fbdbe9 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoShardedContainers.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoShardedContainers.java @@ -23,6 +23,7 @@ import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -52,25 +53,29 @@ public class MongoShardedContainers implements BeforeAllCallback, AfterAllCallba Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(LOG); this.configSrv = new MongoDBContainer(dockerImageName) - .withCreateContainerCmdModifier(it -> it.withHostName(CONFIG_HOSTNAME)) - .withCommand( - "-configsvr", - "--replSet", - CONFIG_REPLICA_SET_NAME, - "--port", - String.valueOf(MONGODB_INTERNAL_PORT)) + .withCreateContainerCmdModifier( + it -> + it.withCmd( + "-configsvr", + "--replSet", + CONFIG_REPLICA_SET_NAME, + "--port", + String.valueOf(MONGODB_INTERNAL_PORT)) + .withHostName(CONFIG_HOSTNAME)) .withNetwork(network) .withNetworkAliases(CONFIG_HOSTNAME) .withLogConsumer(logConsumer); this.shardSrv = new MongoDBContainer(dockerImageName) - .withCreateContainerCmdModifier(it -> it.withHostName(SHARD_HOSTNAME)) - .withCommand( - "-shardsvr", - "--replSet", - SHARD_REPLICA_SET_NAME, - "--port", - String.valueOf(MONGODB_INTERNAL_PORT)) + .withCreateContainerCmdModifier( + it -> + it.withCmd( + "-shardsvr", + "--replSet", + SHARD_REPLICA_SET_NAME, + "--port", + String.valueOf(MONGODB_INTERNAL_PORT)) + .withHostName(SHARD_HOSTNAME)) .withNetwork(network) .withNetworkAliases(SHARD_HOSTNAME) .withLogConsumer(logConsumer); @@ -105,38 +110,64 @@ public class MongoShardedContainers implements BeforeAllCallback, AfterAllCallba "mongodb://%s:%d", router.getHost(), router.getMappedPort(MONGODB_INTERNAL_PORT)); } + public void executeCommand(String command) { + executeCommand(router, command); + } + + private static void executeCommand(MongoDBContainer container, String command) { + try { + ExecResult execResult = container.execInContainer(buildMongoEvalCommand(command)); + LOG.info(execResult.getStdout()); + if (execResult.getExitCode() != 0) { + throw new IllegalStateException( + "Execute mongo command failed " + execResult.getStderr()); + } + } catch (InterruptedException | IOException e) { + throw new IllegalStateException("Execute mongo command failed", e); + } + } + + private static String[] buildMongoEvalCommand(final String command) { + return new String[] { + "sh", + "-c", + "mongosh mongo --eval \"" + command + "\" || mongo --eval \"" + command + "\"", + }; + } + private static class MongoRouterContainer extends MongoDBContainer { + private MongoRouterContainer(DockerImageName dockerImageName) { super(dockerImageName); - withCommand( - "mongos", - "--bind_ip_all", - "--configdb", - String.format( - "%s/%s:%d", - CONFIG_REPLICA_SET_NAME, CONFIG_HOSTNAME, MONGODB_INTERNAL_PORT)); + withCreateContainerCmdModifier( + it -> + it.withCmd( + "mongos", + "--bind_ip_all", + "--configdb", + String.format( + "%s/%s:%d", + CONFIG_REPLICA_SET_NAME, + CONFIG_HOSTNAME, + MONGODB_INTERNAL_PORT))); } @Override - protected void containerIsStarted(InspectContainerResponse containerInfo) { + protected void containerIsStarted(InspectContainerResponse containerInfo, boolean reused) { addShard(); } private void addShard() { - try { - String addShardCommand = - String.format( - "sh.addShard('%s/%s:%d')", - SHARD_REPLICA_SET_NAME, SHARD_HOSTNAME, MONGODB_INTERNAL_PORT); - ExecResult execResult = execInContainer("mongo", "--eval", addShardCommand); - LOG.info(execResult.getStdout()); - if (execResult.getExitCode() != 0) { - throw new IllegalStateException( - "Execute mongo command failed " + execResult.getStdout()); - } - } catch (InterruptedException | IOException e) { - throw new IllegalStateException("Execute mongo command failed", e); - } + String addShardCommand = + String.format( + "sh.addShard('%s/%s:%d');", + SHARD_REPLICA_SET_NAME, SHARD_HOSTNAME, MONGODB_INTERNAL_PORT); + executeCommand(this, addShardCommand); + + String addShardToZoneCommand = + "sh.addShardToZone('${shard}', 'zone-0');" + + "sh.addShardToZone('${shard}', 'zone-1');"; + executeCommand(this, addShardToZoneCommand.replace("${shard}", SHARD_REPLICA_SET_NAME)); } } } diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java index e41a71e..d6c5bf4 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java @@ -28,10 +28,8 @@ import com.mongodb.client.model.Filters; import com.mongodb.client.model.IndexOptions; import org.bson.Document; import org.bson.conversions.Bson; -import org.slf4j.Logger; import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; import java.util.ArrayList; @@ -45,7 +43,7 @@ public class MongoTestUtil { public static final String MONGODB_HOSTNAME = "mongodb"; - public static final String MONGO_4_0 = "mongo:4.0.10"; + public static final String MONGO_IMAGE_PREFIX = "mongo:"; public static final String ADMIN_DATABASE = "admin"; public static final String CONFIG_DATABASE = "config"; @@ -58,12 +56,10 @@ public class MongoTestUtil { /** * Creates a preconfigured {@link MongoDBContainer}. * - * @param logger for test containers * @return configured MongoDB container */ - public static MongoDBContainer createMongoDBContainer(Logger logger) { - return new MongoDBContainer(DockerImageName.parse(MONGO_4_0)) - .withLogConsumer(new Slf4jLogConsumer(logger)); + public static MongoDBContainer createMongoDBContainer() { + return new MongoDBContainer(mongoDockerImageName()); } /** @@ -73,7 +69,15 @@ public class MongoTestUtil { * @return configured MongoDB sharded containers */ public static MongoShardedContainers createMongoDBShardedContainers(Network network) { - return new MongoShardedContainers(DockerImageName.parse(MONGO_4_0), network); + return new MongoShardedContainers(mongoDockerImageName(), network); + } + + public static DockerImageName mongoDockerImageName() { + return DockerImageName.parse(MONGO_IMAGE_PREFIX + mongoVersion()); + } + + public static String mongoVersion() { + return System.getProperty("mongodb.version"); } public static void assertThatIdsAreNotWritten(MongoCollection<Document> coll, Integer... ids) { diff --git a/pom.xml b/pom.xml index 05a81d6..49096ca 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ under the License. </scm> <properties> - <mongodb.version>5.1.1</mongodb.version> + <mongodb.driver.version>5.1.1</mongodb.driver.version> <flink.version>1.18.0</flink.version> <scala.binary.version>2.12</scala.binary.version> @@ -159,7 +159,7 @@ under the License. <dependency> <groupId>org.mongodb</groupId> <artifactId>mongodb-driver-sync</artifactId> - <version>${mongodb.version}</version> + <version>${mongodb.driver.version}</version> </dependency> <!-- Flink dependencies --> @@ -367,6 +367,11 @@ under the License. <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <mongodb.version>${mongodb.version}</mongodb.version> + </systemPropertyVariables> + </configuration> </plugin> <plugin> @@ -399,6 +404,37 @@ under the License. <spotless.skip>true</spotless.skip> </properties> </profile> + + <profile> + <id>mongodb4</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <mongodb.version>4.0.28</mongodb.version> + </properties> + </profile> + + <profile> + <id>mongodb5</id> + <properties> + <mongodb.version>5.0.28</mongodb.version> + </properties> + </profile> + + <profile> + <id>mongodb6</id> + <properties> + <mongodb.version>6.0.16</mongodb.version> + </properties> + </profile> + + <profile> + <id>mongodb7</id> + <properties> + <mongodb.version>7.0.12</mongodb.version> + </properties> + </profile> </profiles> </project>
