This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 2902e3b1c0e8fd8662352181de0adae6f14b7326 Author: Aleksandr Savonin <[email protected]> AuthorDate: Sun Jan 18 14:51:04 2026 +0100 [FLINK-38937] Migrate all Kafka tests to TestKafkaContainer with KRaft mode --- .../flink-end-to-end-tests-common-kafka/pom.xml | 11 +-- .../tests/util/kafka/KafkaContainerClient.java | 8 +- .../flink/tests/util/kafka/KafkaSinkE2ECase.java | 18 +++-- .../flink/tests/util/kafka/KafkaSourceE2ECase.java | 20 ++--- .../util/kafka/SQLClientSchemaRegistryITCase.java | 59 +++++++------- .../flink/tests/util/kafka/SmokeKafkaITCase.java | 9 +-- .../sink/FlinkKafkaInternalProducerITCase.java | 6 +- .../connector/kafka/sink/KafkaSinkITCase.java | 22 ++--- .../kafka/sink/KafkaTransactionLogITCase.java | 26 +++--- .../connector/kafka/sink/KafkaWriterTestBase.java | 5 +- .../sink/internal/ProducerPoolImplITCase.java | 6 +- .../testutils/KafkaSinkExternalContextFactory.java | 9 +-- .../connector/kafka/source/KafkaSourceITCase.java | 17 ++-- .../kafka/testutils/DockerImageVersions.java | 2 - .../DynamicKafkaSourceExternalContextFactory.java | 13 ++- .../KafkaSourceExternalContextFactory.java | 8 +- .../flink/connector/kafka/testutils/KafkaUtil.java | 11 +-- .../kafka/testutils/TwoKafkaContainers.java | 13 ++- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 94 ++++++---------------- .../metrics/KafkaMetricMutableWrapperTest.java | 5 +- .../connectors/kafka/table/KafkaTableTestBase.java | 5 +- 21 files changed, 153 insertions(+), 214 deletions(-) diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml index acc72be6..98baaaa2 100644 --- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml +++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml @@ -59,10 +59,6 @@ under the License. <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils-junit</artifactId> @@ -107,11 +103,6 @@ under the License. <artifactId>flink-core-api</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-core</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> @@ -262,7 +253,7 @@ under the License. <outputDirectory>${project.build.directory}/dependencies</outputDirectory> </artifactItem> </artifactItems> - <ignoredUnusedDeclaredDependencies>org.apache.flink:flink-streaming-kafka-test,org.apache.flink:flink-sql-avro,org.apache.flink:flink-sql-avro-confluent-registry,org.apache.flink:flink-connector-base,org.apache.flink:flink-sql-connector-kafka + <ignoredUnusedDeclaredDependencies>org.apache.flink:flink-streaming-kafka-test,org.apache.flink:flink-sql-avro,org.apache.flink:flink-sql-avro-confluent-registry,org.apache.flink:flink-connector-base,org.apache.flink:flink-sql-connector-kafka,org.testcontainers:kafka </ignoredUnusedDeclaredDependencies> </configuration> </plugin> diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java index d3f45e0e..e70e1343 100644 --- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java +++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java @@ -19,6 +19,7 @@ package org.apache.flink.tests.util.kafka; import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.kafka.clients.CommonClientConfigs; @@ -42,7 +43,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.KafkaContainer; import java.io.IOException; import java.time.Duration; @@ -53,12 +53,12 @@ import java.util.List; import java.util.Map; import java.util.Properties; -/** A utility class that exposes common methods over a {@link KafkaContainer}. */ +/** A utility class that exposes common methods over a {@link TestKafkaContainer}. */ public class KafkaContainerClient { private static final Logger LOG = LoggerFactory.getLogger(KafkaContainerClient.class); - private final KafkaContainer container; + private final TestKafkaContainer container; - public KafkaContainerClient(KafkaContainer container) { + public KafkaContainerClient(TestKafkaContainer container) { this.container = container; } diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java index ea9a0079..479e3b3b 100644 --- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java +++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java @@ -21,6 +21,7 @@ package org.apache.flink.tests.util.kafka; import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory; import org.apache.flink.connector.kafka.testutils.DockerImageVersions; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment; import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem; import org.apache.flink.connector.testframe.junit.annotations.TestContext; @@ -31,8 +32,7 @@ import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.test.resources.ResourceTestUtils; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.utility.DockerImageName; +import org.testcontainers.containers.GenericContainer; import java.util.Arrays; @@ -50,13 +50,15 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> { // Defines TestEnvironment @TestEnv FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6); + private final TestKafkaContainer kafkaContainer = + new TestKafkaContainer(DockerImageVersions.KAFKA).withNetworkAliases(KAFKA_HOSTNAME); + // Defines ConnectorExternalSystem + @SuppressWarnings({"rawtypes", "unchecked"}) @TestExternalSystem - DefaultContainerizedExternalSystem<KafkaContainer> kafka = + DefaultContainerizedExternalSystem<?> kafka = DefaultContainerizedExternalSystem.builder() - .fromContainer( - new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) - .withNetworkAliases(KAFKA_HOSTNAME)) + .fromContainer((GenericContainer) kafkaContainer.getContainer()) .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager()) .build(); @@ -65,7 +67,7 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> { @TestContext KafkaSinkExternalContextFactory incrementing = new KafkaSinkExternalContextFactory( - kafka.getContainer(), + kafkaContainer, Arrays.asList( ResourceTestUtils.getResource("kafka-connector.jar") .toAbsolutePath() @@ -84,7 +86,7 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> { @TestContext KafkaSinkExternalContextFactory pooling = new KafkaSinkExternalContextFactory( - kafka.getContainer(), + kafkaContainer, Arrays.asList( ResourceTestUtils.getResource("kafka-connector.jar") .toAbsolutePath() diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java index 1a2ac1f2..67f7dfa3 100644 --- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java +++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java @@ -18,8 +18,8 @@ package org.apache.flink.tests.util.kafka; -import org.apache.flink.connector.kafka.testutils.DockerImageVersions; import org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContextFactory; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment; import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem; import org.apache.flink.connector.testframe.junit.annotations.TestContext; @@ -30,11 +30,11 @@ import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.test.resources.ResourceTestUtils; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.utility.DockerImageName; +import org.testcontainers.containers.GenericContainer; import java.util.Arrays; +import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA; import static org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.PARTITION; import static org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.TOPIC; @@ -48,13 +48,15 @@ public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> { // Defines TestEnvironment @TestEnv FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6); + TestKafkaContainer kafkaContainer = + new TestKafkaContainer(KAFKA).withNetworkAliases(KAFKA_HOSTNAME); + // Defines ConnectorExternalSystem + @SuppressWarnings({"rawtypes", "unchecked"}) @TestExternalSystem - DefaultContainerizedExternalSystem<KafkaContainer> kafka = + DefaultContainerizedExternalSystem<?> kafka = DefaultContainerizedExternalSystem.builder() - .fromContainer( - new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) - .withNetworkAliases(KAFKA_HOSTNAME)) + .fromContainer((GenericContainer) kafkaContainer.getContainer()) .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager()) .build(); @@ -64,7 +66,7 @@ public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> { @TestContext KafkaSourceExternalContextFactory singleTopic = new KafkaSourceExternalContextFactory( - kafka.getContainer(), + kafkaContainer, Arrays.asList( ResourceTestUtils.getResource("kafka-connector.jar").toUri().toURL(), ResourceTestUtils.getResource("kafka-clients.jar").toUri().toURL()), @@ -74,7 +76,7 @@ public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> { @TestContext KafkaSourceExternalContextFactory multipleTopic = new KafkaSourceExternalContextFactory( - kafka.getContainer(), + kafkaContainer, Arrays.asList( ResourceTestUtils.getResource("kafka-connector.jar").toUri().toURL(), ResourceTestUtils.getResource("kafka-clients.jar").toUri().toURL()), diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java index 522eff09..986aa982 100644 --- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java +++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.tests.util.kafka; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.connector.kafka.testutils.DockerImageVersions; import org.apache.flink.connector.kafka.testutils.KafkaUtil; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.connector.testframe.container.FlinkContainers; import org.apache.flink.connector.testframe.container.TestcontainersSettings; import org.apache.flink.test.resources.ResourceTestUtils; @@ -36,13 +37,13 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.kafka.common.serialization.StringDeserializer; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.Timeout; -import org.testcontainers.containers.KafkaContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.nio.file.Path; @@ -53,11 +54,12 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** End-to-end test for SQL client using Avro Confluent Registry format. */ -public class SQLClientSchemaRegistryITCase { +@Testcontainers +@Timeout(value = 10, unit = TimeUnit.MINUTES) +class SQLClientSchemaRegistryITCase { public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry"; private static final Path sqlAvroJar = ResourceTestUtils.getResource(".*avro.jar"); @@ -65,29 +67,27 @@ public class SQLClientSchemaRegistryITCase { ResourceTestUtils.getResource(".*avro-confluent.jar"); private final Path sqlConnectorKafkaJar = ResourceTestUtils.getResource(".*kafka.jar"); - @ClassRule public static final Network NETWORK = Network.newNetwork(); + public static final Network NETWORK = Network.newNetwork(); - @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); - - @ClassRule - public static final KafkaContainer KAFKA = + @Container + public static final TestKafkaContainer KAFKA_CONTAINER = KafkaUtil.createKafkaContainer(SQLClientSchemaRegistryITCase.class) .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); - @ClassRule + @Container public static final SchemaRegistryContainer REGISTRY = new SchemaRegistryContainer(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY)) - .withKafka(INTER_CONTAINER_KAFKA_ALIAS + ":9092") + .withKafka(INTER_CONTAINER_KAFKA_ALIAS + ":9093") .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_REGISTRY_ALIAS) - .dependsOn(KAFKA); + .dependsOn(KAFKA_CONTAINER.getContainer()); public final TestcontainersSettings testcontainersSettings = TestcontainersSettings.builder() .network(NETWORK) .logger(KafkaUtil.getLogger("flink", SQLClientSchemaRegistryITCase.class)) - .dependsOn(KAFKA) + .dependsOn(KAFKA_CONTAINER.getContainer()) .build(); public final FlinkContainers flink = @@ -96,14 +96,14 @@ public class SQLClientSchemaRegistryITCase { private KafkaContainerClient kafkaClient; private CachedSchemaRegistryClient registryClient; - @Before + @BeforeEach public void setUp() throws Exception { flink.start(); - kafkaClient = new KafkaContainerClient(KAFKA); + kafkaClient = new KafkaContainerClient(KAFKA_CONTAINER); registryClient = new CachedSchemaRegistryClient(REGISTRY.getSchemaRegistryUrl(), 10); } - @After + @AfterEach public void tearDown() { flink.stop(); } @@ -140,7 +140,7 @@ public class SQLClientSchemaRegistryITCase { " 'connector' = 'kafka',", " 'properties.bootstrap.servers' = '" + INTER_CONTAINER_KAFKA_ALIAS - + ":9092',", + + ":9093',", " 'topic' = '" + testCategoryTopic + "',", " 'scan.startup.mode' = 'earliest-offset',", " 'properties.group.id' = 'test-group',", @@ -158,7 +158,7 @@ public class SQLClientSchemaRegistryITCase { " 'connector' = 'kafka',", " 'properties.bootstrap.servers' = '" + INTER_CONTAINER_KAFKA_ALIAS - + ":9092',", + + ":9093',", " 'properties.group.id' = 'test-group',", " 'topic' = '" + testResultsTopic + "',", " 'format' = 'csv',", @@ -171,7 +171,7 @@ public class SQLClientSchemaRegistryITCase { List<String> categories = kafkaClient.readMessages( 1, "test-group", testResultsTopic, new StringDeserializer()); - assertThat(categories, equalTo(Collections.singletonList("1,electronics,null"))); + assertThat(categories).isEqualTo(Collections.singletonList("1,electronics,null")); } @Test @@ -193,7 +193,7 @@ public class SQLClientSchemaRegistryITCase { " 'connector' = 'kafka',", " 'properties.bootstrap.servers' = '" + INTER_CONTAINER_KAFKA_ALIAS - + ":9092',", + + ":9093',", " 'topic' = '" + testUserBehaviorTopic + "',", " 'format' = 'avro-confluent',", " 'avro-confluent.url' = 'http://" @@ -207,7 +207,7 @@ public class SQLClientSchemaRegistryITCase { executeSqlStatements(sqlLines); List<Integer> versions = getAllVersions(behaviourSubject); - assertThat(versions.size(), equalTo(1)); + assertThat(versions).hasSize(1); List<Object> userBehaviors = kafkaClient.readMessages( 1, @@ -219,9 +219,8 @@ public class SQLClientSchemaRegistryITCase { registryClient.getByVersion(behaviourSubject, versions.get(0), false).getSchema(); Schema userBehaviorSchema = new Schema.Parser().parse(schemaString); GenericRecordBuilder recordBuilder = new GenericRecordBuilder(userBehaviorSchema); - assertThat( - userBehaviors, - equalTo( + assertThat(userBehaviors) + .isEqualTo( Collections.singletonList( recordBuilder .set("user_id", 1L) @@ -229,7 +228,7 @@ public class SQLClientSchemaRegistryITCase { .set("category_id", 1L) .set("behavior", "buy") .set("ts", 1234000L) - .build()))); + .build())); } private List<Integer> getAllVersions(String behaviourSubject) throws Exception { diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java index 2f1ad81a..e0d7d291 100644 --- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java +++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.connector.kafka.testutils.KafkaUtil; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.connector.testframe.container.FlinkContainers; import org.apache.flink.connector.testframe.container.FlinkContainersSettings; import org.apache.flink.connector.testframe.container.TestcontainersSettings; @@ -45,7 +46,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -74,16 +74,15 @@ class SmokeKafkaITCase { private static final String EXAMPLE_JAR_MATCHER = "flink-streaming-kafka-test.*"; @Container - public static final KafkaContainer KAFKA_CONTAINER = + public static final TestKafkaContainer KAFKA_CONTAINER = createKafkaContainer(SmokeKafkaITCase.class) - .withEmbeddedZookeeper() .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = TestcontainersSettings.builder() .logger(KafkaUtil.getLogger("flink", SmokeKafkaITCase.class)) - .dependsOn(KAFKA_CONTAINER) + .dependsOn(KAFKA_CONTAINER.getContainer()) .build(); @RegisterExtension @@ -174,7 +173,7 @@ class SmokeKafkaITCase { String.join( ":", host, - Integer.toString(9092))) + Integer.toString(9093))) .collect(Collectors.joining(",")))) .addArgument("--group.id", "myconsumer") .addArgument("--auto.offset.reset", "earliest") diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java index 6bd6d880..c3ba30dd 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.TestLoggerExtension; @@ -42,7 +43,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -73,8 +73,8 @@ class FlinkKafkaInternalProducerITCase { .build()); @Container - private static final KafkaContainer KAFKA_CONTAINER = - createKafkaContainer(FlinkKafkaInternalProducerITCase.class).withEmbeddedZookeeper(); + private static final TestKafkaContainer KAFKA_CONTAINER = + createKafkaContainer(FlinkKafkaInternalProducerITCase.class); @AfterEach public void check() { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index aa0f97ee..773b13b8 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -42,6 +42,7 @@ import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory; import org.apache.flink.connector.kafka.testutils.DockerImageVersions; import org.apache.flink.connector.kafka.testutils.KafkaUtil; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem; import org.apache.flink.connector.testframe.junit.annotations.TestContext; @@ -95,7 +96,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -151,9 +152,8 @@ public class KafkaSinkITCase extends TestLogger { .build()); @Container - public static final KafkaContainer KAFKA_CONTAINER = + public static final TestKafkaContainer KAFKA_CONTAINER = createKafkaContainer(KafkaSinkITCase.class) - .withEmbeddedZookeeper() .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); @@ -193,13 +193,15 @@ public class KafkaSinkITCase extends TestLogger { // Defines test environment on Flink MiniCluster @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); + private final TestKafkaContainer kafkaContainer = + new TestKafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)); + // Defines external system + @SuppressWarnings({"rawtypes", "unchecked"}) @TestExternalSystem - DefaultContainerizedExternalSystem<KafkaContainer> kafka = + DefaultContainerizedExternalSystem<?> kafka = DefaultContainerizedExternalSystem.builder() - .fromContainer( - new KafkaContainer( - DockerImageName.parse(DockerImageVersions.KAFKA))) + .fromContainer((GenericContainer) kafkaContainer.getContainer()) .build(); @TestSemantics @@ -211,16 +213,14 @@ public class KafkaSinkITCase extends TestLogger { @TestContext KafkaSinkExternalContextFactory incrementing = new KafkaSinkExternalContextFactory( - kafka.getContainer(), + kafkaContainer, Collections.emptyList(), TransactionNamingStrategy.INCREMENTING); @TestContext KafkaSinkExternalContextFactory pooling = new KafkaSinkExternalContextFactory( - kafka.getContainer(), - Collections.emptyList(), - TransactionNamingStrategy.POOLING); + kafkaContainer, Collections.emptyList(), TransactionNamingStrategy.POOLING); } @Test diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java index f11e3c71..5c9a2607 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java @@ -19,9 +19,10 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionRecord; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -29,11 +30,12 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.IntegerSerializer; -import org.junit.After; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; -import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import java.util.ArrayList; import java.util.List; @@ -51,7 +53,9 @@ import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaCo import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link KafkaTransactionLog} to retrieve abortable Kafka transactions. */ -public class KafkaTransactionLogITCase extends TestLogger { +@Testcontainers +@ExtendWith(TestLoggerExtension.class) +class KafkaTransactionLogITCase { private static final String TOPIC_NAME = "kafkaTransactionLogTest"; private static final String TRANSACTIONAL_ID_PREFIX = "kafka-log"; @@ -65,14 +69,14 @@ public class KafkaTransactionLogITCase extends TestLogger { .setConfiguration(new Configuration()) .build()); - @ClassRule - public static final KafkaContainer KAFKA_CONTAINER = - createKafkaContainer(KafkaTransactionLogITCase.class).withEmbeddedZookeeper(); + @Container + public static final TestKafkaContainer KAFKA_CONTAINER = + createKafkaContainer(KafkaTransactionLogITCase.class); private final List<Producer<byte[], Integer>> openProducers = new ArrayList<>(); - @After - public void tearDown() { + @AfterEach + void tearDown() { openProducers.forEach(Producer::close); checkProducerLeak(); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java index 3c13b9ad..16ae534d 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory; import org.apache.flink.connector.kafka.sink.internal.TransactionFinished; import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.metrics.groups.OperatorMetricGroup; @@ -46,7 +47,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -89,9 +89,8 @@ public abstract class KafkaWriterTestBase { protected TriggerTimeService timeService; @Container - public static final KafkaContainer KAFKA_CONTAINER = + public static final TestKafkaContainer KAFKA_CONTAINER = createKafkaContainer(KafkaWriterTestBase.class) - .withEmbeddedZookeeper() .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java index d25aece8..aac6b91c 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.kafka.sink.internal; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -29,7 +30,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -59,8 +59,8 @@ class ProducerPoolImplITCase { public static final String TRANSACTIONAL_ID = "test-transactional-id"; @Container - public static final KafkaContainer KAFKA_CONTAINER = - createKafkaContainer(ProducerPoolImplITCase.class).withEmbeddedZookeeper(); + public static final TestKafkaContainer KAFKA_CONTAINER = + createKafkaContainer(ProducerPoolImplITCase.class); @AfterEach void checkLeak() { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java index bdebed03..8aff2c97 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java @@ -19,10 +19,9 @@ package org.apache.flink.connector.kafka.sink.testutils; import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.connector.testframe.external.ExternalContextFactory; -import org.testcontainers.containers.KafkaContainer; - import java.net.URL; import java.util.List; import java.util.stream.Collectors; @@ -31,12 +30,12 @@ import java.util.stream.Collectors; public class KafkaSinkExternalContextFactory implements ExternalContextFactory<KafkaSinkExternalContext> { - private final KafkaContainer kafkaContainer; + private final TestKafkaContainer kafkaContainer; private final List<URL> connectorJars; private final TransactionNamingStrategy transactionNamingStrategy; public KafkaSinkExternalContextFactory( - KafkaContainer kafkaContainer, + TestKafkaContainer kafkaContainer, List<URL> connectorJars, TransactionNamingStrategy transactionNamingStrategy) { this.kafkaContainer = kafkaContainer; @@ -47,7 +46,7 @@ public class KafkaSinkExternalContextFactory private String getBootstrapServer() { final String internalEndpoints = kafkaContainer.getNetworkAliases().stream() - .map(host -> String.join(":", host, Integer.toString(9092))) + .map(host -> String.join(":", host, Integer.toString(9093))) .collect(Collectors.joining(",")); return String.join(",", kafkaContainer.getBootstrapServers(), internalEndpoints); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java index 02b4f991..c5fe60ee 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDe import org.apache.flink.connector.kafka.testutils.DockerImageVersions; import org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContextFactory; import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem; import org.apache.flink.connector.testframe.junit.annotations.TestContext; @@ -64,7 +65,7 @@ import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.parallel.ResourceLock; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.utility.DockerImageName; import java.io.IOException; @@ -419,13 +420,15 @@ public class KafkaSourceITCase { @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); + private final TestKafkaContainer kafkaContainer = + new TestKafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)); + // Defines external system + @SuppressWarnings({"rawtypes", "unchecked"}) @TestExternalSystem - DefaultContainerizedExternalSystem<KafkaContainer> kafka = + DefaultContainerizedExternalSystem<?> kafka = DefaultContainerizedExternalSystem.builder() - .fromContainer( - new KafkaContainer( - DockerImageName.parse(DockerImageVersions.KAFKA))) + .fromContainer((GenericContainer) kafkaContainer.getContainer()) .build(); // Defines 2 External context Factories, so test cases will be invoked twice using these two @@ -434,13 +437,13 @@ public class KafkaSourceITCase { @TestContext KafkaSourceExternalContextFactory singleTopic = new KafkaSourceExternalContextFactory( - kafka.getContainer(), Collections.emptyList(), PARTITION); + kafkaContainer, Collections.emptyList(), PARTITION); @SuppressWarnings("unused") @TestContext KafkaSourceExternalContextFactory multipleTopic = new KafkaSourceExternalContextFactory( - kafka.getContainer(), Collections.emptyList(), TOPIC); + kafkaContainer, Collections.emptyList(), TOPIC); } // ----------------- diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java index 3704f417..ec1e1535 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java @@ -28,6 +28,4 @@ public class DockerImageVersions { public static final String APACHE_KAFKA = "apache/kafka:4.1.1"; public static final String SCHEMA_REGISTRY = "confluentinc/cp-schema-registry:7.9.2"; - - public static final String ZOOKEEPER = "zookeeper:3.8.4"; } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContextFactory.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContextFactory.java index 71798e18..fa584a56 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContextFactory.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContextFactory.java @@ -22,7 +22,6 @@ import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource; import org.apache.flink.connector.testframe.external.ExternalContextFactory; import com.google.common.collect.ImmutableList; -import org.testcontainers.containers.KafkaContainer; import java.net.URL; import java.util.List; @@ -32,13 +31,13 @@ import java.util.stream.Collectors; public class DynamicKafkaSourceExternalContextFactory implements ExternalContextFactory<DynamicKafkaSourceExternalContext> { - private final KafkaContainer kafkaContainer0; - private final KafkaContainer kafkaContainer1; + private final TestKafkaContainer kafkaContainer0; + private final TestKafkaContainer kafkaContainer1; private final List<URL> connectorJars; public DynamicKafkaSourceExternalContextFactory( - KafkaContainer kafkaContainer0, - KafkaContainer kafkaContainer1, + TestKafkaContainer kafkaContainer0, + TestKafkaContainer kafkaContainer1, List<URL> connectorJars) { this.kafkaContainer0 = kafkaContainer0; this.kafkaContainer1 = kafkaContainer1; @@ -53,10 +52,10 @@ public class DynamicKafkaSourceExternalContextFactory connectorJars); } - private static String getBootstrapServers(KafkaContainer kafkaContainer) { + private static String getBootstrapServers(TestKafkaContainer kafkaContainer) { final String internalEndpoints = kafkaContainer.getNetworkAliases().stream() - .map(host -> String.join(":", host, Integer.toString(9092))) + .map(host -> String.join(":", host, Integer.toString(9093))) .collect(Collectors.joining(",")); return String.join(",", kafkaContainer.getBootstrapServers(), internalEndpoints); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContextFactory.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContextFactory.java index ef9113af..fc9f1ba8 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContextFactory.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContextFactory.java @@ -20,8 +20,6 @@ package org.apache.flink.connector.kafka.testutils; import org.apache.flink.connector.testframe.external.ExternalContextFactory; -import org.testcontainers.containers.KafkaContainer; - import java.net.URL; import java.util.List; import java.util.stream.Collectors; @@ -30,12 +28,12 @@ import java.util.stream.Collectors; public class KafkaSourceExternalContextFactory implements ExternalContextFactory<KafkaSourceExternalContext> { - private final KafkaContainer kafkaContainer; + private final TestKafkaContainer kafkaContainer; private final List<URL> connectorJars; private final KafkaSourceExternalContext.SplitMappingMode splitMappingMode; public KafkaSourceExternalContextFactory( - KafkaContainer kafkaContainer, + TestKafkaContainer kafkaContainer, List<URL> connectorJars, KafkaSourceExternalContext.SplitMappingMode splitMappingMode) { this.kafkaContainer = kafkaContainer; @@ -46,7 +44,7 @@ public class KafkaSourceExternalContextFactory protected String getBootstrapServer() { final String internalEndpoints = kafkaContainer.getNetworkAliases().stream() - .map(host -> String.join(":", host, Integer.toString(9092))) + .map(host -> String.join(":", host, Integer.toString(9093))) .collect(Collectors.joining(",")); return String.join(",", kafkaContainer.getBootstrapServers(), internalEndpoints); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java index 1b4bf77c..934c1c92 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; @@ -52,28 +51,26 @@ public class KafkaUtil { private KafkaUtil() {} /** This method helps to set commonly used Kafka configurations and sets up the logger. */ - public static KafkaContainer createKafkaContainer(Class<?> testCase) { + public static TestKafkaContainer createKafkaContainer(Class<?> testCase) { return createKafkaContainer(getContainerName("kafka", testCase)); } /** This method helps to set commonly used Kafka configurations and sets up the logger. */ - public static KafkaContainer createKafkaContainer(String containerName) { + public static TestKafkaContainer createKafkaContainer(String containerName) { Logger logger = getLogger(containerName); String logLevel = inferLogLevel(logger); Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger, true); - return new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) + return new TestKafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") - .withEnv("KAFKA_LOG4J_ROOT_LOGLEVEL", logLevel) - .withEnv("KAFKA_LOG4J_LOGGERS", "state.change.logger=" + logLevel) + .withKafkaLogLevel(logLevel) .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false") .withEnv( "KAFKA_TRANSACTION_MAX_TIMEOUT_MS", String.valueOf(Duration.ofHours(2).toMillis())) - .withEnv("KAFKA_LOG4J_TOOLS_ROOT_LOGLEVEL", logLevel) .withLogConsumer(logConsumer); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TwoKafkaContainers.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TwoKafkaContainers.java index 6c5036a9..247357e4 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TwoKafkaContainers.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TwoKafkaContainers.java @@ -19,18 +19,17 @@ package org.apache.flink.connector.kafka.testutils; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; /** Wraps 2 Kafka containers into one for test utilities that only accept one container. */ public class TwoKafkaContainers extends GenericContainer<TwoKafkaContainers> { - private final KafkaContainer kafka0; - private final KafkaContainer kafka1; + private final TestKafkaContainer kafka0; + private final TestKafkaContainer kafka1; public TwoKafkaContainers() { DockerImageName dockerImageName = DockerImageName.parse(DockerImageVersions.KAFKA); - this.kafka0 = new KafkaContainer(dockerImageName); - this.kafka1 = new KafkaContainer(dockerImageName); + this.kafka0 = new TestKafkaContainer(dockerImageName); + this.kafka1 = new TestKafkaContainer(dockerImageName); } @Override @@ -50,11 +49,11 @@ public class TwoKafkaContainers extends GenericContainer<TwoKafkaContainers> { kafka1.stop(); } - public KafkaContainer getKafka0() { + public TestKafkaContainer getKafka0() { return kafka0; } - public KafkaContainer getKafka1() { + public TestKafkaContainer getKafka1() { return kafka1; } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 9c360467..a4d1a8b8 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.connector.kafka.testutils.DockerImageVersions; import org.apache.flink.connector.kafka.testutils.KafkaUtil; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.kafka.clients.admin.AdminClient; @@ -31,10 +32,7 @@ import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.DockerClientFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; -import org.testcontainers.utility.DockerImageName; import javax.annotation.Nullable; @@ -58,28 +56,20 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); - private static final String ZOOKEEPER_HOSTNAME = "zookeeper"; - private static final int ZOOKEEPER_PORT = 2181; - - private final Map<Integer, KafkaContainer> brokers = new HashMap<>(); + private final Map<Integer, TestKafkaContainer> brokers = new HashMap<>(); private final Set<Integer> pausedBroker = new HashSet<>(); - private @Nullable GenericContainer<?> zookeeper; private @Nullable Network network; private String brokerConnectionString = ""; private Properties standardProps; - // 6 seconds is default. Seems to be too small for travis. 30 seconds - private int zkTimeout = 30000; private Config config; private static final int REQUEST_TIMEOUT_SECONDS = 30; @Override public void prepare(Config config) throws Exception { - // increase the timeout since in Travis ZK connection takes long time for secure connection. if (config.isSecureMode()) { - // run only one kafka server to avoid multiple ZK connections from many instances - - // Travis timeout + // run only one kafka server to avoid multiple connections from many instances - Travis + // timeout config.setKafkaServersNumber(1); - zkTimeout = zkTimeout * 15; } this.config = config; brokers.clear(); @@ -92,8 +82,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); standardProps.setProperty("enable.auto.commit", "false"); - standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); - standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. standardProps.setProperty( "max.partition.fetch.bytes", @@ -198,10 +186,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("sasl.kerberos.service.name", "kafka"); - - // add special timeout for Travis - prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); - prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); prop.setProperty("metadata.fetch.timeout.ms", "120000"); } return prop; @@ -251,13 +235,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { @Override public void shutdown() throws Exception { - brokers.values().forEach(GenericContainer::stop); + brokers.values().forEach(TestKafkaContainer::stop); brokers.clear(); - if (zookeeper != null) { - zookeeper.stop(); - } - if (network != null) { network.close(); } @@ -307,62 +287,34 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } private void startKafkaContainerCluster(int numBrokers) { - if (numBrokers > 1) { - network = Network.newNetwork(); - zookeeper = createZookeeperContainer(network); - zookeeper.start(); - LOG.info("Zookeeper container started"); - } + network = Network.newNetwork(); for (int brokerID = 0; brokerID < numBrokers; brokerID++) { - KafkaContainer broker = createKafkaContainer(brokerID, zookeeper); + TestKafkaContainer broker = createKafkaContainer(brokerID); brokers.put(brokerID, broker); } - new ArrayList<>(brokers.values()).parallelStream().forEach(GenericContainer::start); + new ArrayList<>(brokers.values()).parallelStream().forEach(TestKafkaContainer::start); LOG.info("{} brokers started", numBrokers); brokerConnectionString = brokers.values().stream() - .map(KafkaContainer::getBootstrapServers) - // Here we have URL like "PLAINTEXT://127.0.0.1:15213", and we only keep the - // "127.0.0.1:15213" part in broker connection string - .map(server -> server.split("://")[1]) + .map(TestKafkaContainer::getBootstrapServers) + // Here we have URL like "PLAINTEXT://127.0.0.1:15213" (Confluent Kafka), + // and we only keep the "127.0.0.1:15213" part. + // Apache Kafka may return "host:port" directly without protocol prefix, + // so we handle both cases. + .map(server -> server.contains("://") ? server.split("://", 2)[1] : server) .collect(Collectors.joining(",")); + LOG.info("Broker connection string: {}", brokerConnectionString); } - private GenericContainer<?> createZookeeperContainer(Network network) { - return new GenericContainer<>(DockerImageName.parse(DockerImageVersions.ZOOKEEPER)) - .withNetwork(network) - .withNetworkAliases(ZOOKEEPER_HOSTNAME) - .withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(ZOOKEEPER_PORT)); - } - - private KafkaContainer createKafkaContainer( - int brokerID, @Nullable GenericContainer<?> zookeeper) { + private TestKafkaContainer createKafkaContainer(int brokerID) { String brokerName = String.format("Kafka-%d", brokerID); - KafkaContainer broker = - KafkaUtil.createKafkaContainer(brokerName) - .withNetworkAliases(brokerName) - .withEnv("KAFKA_BROKER_ID", String.valueOf(brokerID)) - .withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(50 * 1024 * 1024)) - .withEnv("KAFKA_REPLICA_FETCH_MAX_BYTES", String.valueOf(50 * 1024 * 1024)) - .withEnv( - "KAFKA_TRANSACTION_MAX_TIMEOUT_MS", - Integer.toString(1000 * 60 * 60 * 2)) - // Disable log deletion to prevent records from being deleted during test - // run - .withEnv("KAFKA_LOG_RETENTION_MS", "-1") - .withEnv("KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS", String.valueOf(zkTimeout)) - .withEnv( - "KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS", String.valueOf(zkTimeout)); - - if (zookeeper != null) { - broker.dependsOn(zookeeper) - .withNetwork(zookeeper.getNetwork()) - .withExternalZookeeper( - String.format("%s:%d", ZOOKEEPER_HOSTNAME, ZOOKEEPER_PORT)); - } else { - broker.withEmbeddedZookeeper(); - } - return broker; + return KafkaUtil.createKafkaContainer(brokerName) + .withNetworkAliases(brokerName) + .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") + .withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(50 * 1024 * 1024)) + .withEnv("KAFKA_REPLICA_FETCH_MAX_BYTES", String.valueOf(50 * 1024 * 1024)) + .withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", Integer.toString(1000 * 60 * 60 * 2)) + .withEnv("KAFKA_LOG_RETENTION_MS", "-1"); } private void pause(int brokerId) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java index c0ea690d..0d516c28 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.internals.metrics; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.metrics.Gauge; import org.apache.flink.util.TestLoggerExtension; @@ -28,7 +29,6 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -50,9 +50,8 @@ class KafkaMetricMutableWrapperTest { private static final Network NETWORK = Network.newNetwork(); @Container - public static final KafkaContainer KAFKA_CONTAINER = + public static final TestKafkaContainer KAFKA_CONTAINER = createKafkaContainer(KafkaMetricMutableWrapperTest.class) - .withEmbeddedZookeeper() .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java index 5999df09..85e20cab 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.connector.kafka.testutils.KafkaUtil; +import org.apache.flink.connector.kafka.testutils.TestKafkaContainer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.AbstractTestBase; @@ -41,7 +42,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -66,9 +66,8 @@ abstract class KafkaTableTestBase extends AbstractTestBase { private static final int zkTimeoutMills = 30000; @Container - public static final KafkaContainer KAFKA_CONTAINER = + public static final TestKafkaContainer KAFKA_CONTAINER = KafkaUtil.createKafkaContainer(KafkaTableTestBase.class) - .withEmbeddedZookeeper() .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS) .withEnv( "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
