This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit adee56117778f6962d7892a26b2b507b67ec707a Author: Qingsheng Ren <[email protected]> AuthorDate: Thu Aug 12 18:01:11 2021 +0800 [FLINK-19554][connector/testing-framework] KafkaSource IT and E2E case based on connector testing framework --- flink-connectors/flink-connector-kafka/pom.xml | 12 +- .../connector/kafka/source/KafkaSourceITCase.java | 313 ++++++++++++--------- .../source/enumerator/KafkaEnumeratorTest.java | 2 +- .../initializer/OffsetsInitializerTest.java | 2 +- .../enumerator/subscriber/KafkaSubscriberTest.java | 2 +- .../reader/KafkaPartitionSplitReaderTest.java | 2 +- .../kafka/source/reader/KafkaSourceReaderTest.java | 2 +- .../KafkaMultipleTopicExternalContext.java | 124 ++++++++ .../source/testutils/KafkaPartitionDataWriter.java | 60 ++++ .../testutils/KafkaSingleTopicExternalContext.java | 242 ++++++++++++++++ .../source/{ => testutils}/KafkaSourceTestEnv.java | 2 +- .../flink-end-to-end-tests-common-kafka/pom.xml | 29 ++ .../flink/tests/util/kafka/KafkaSourceE2ECase.java | 69 +++++ .../modules-skipping-deployment.modulelist | 1 + 14 files changed, 721 insertions(+), 141 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml index 2301a06..0da6052 100644 --- a/flink-connectors/flink-connector-kafka/pom.xml +++ b/flink-connectors/flink-connector-kafka/pom.xml @@ -210,6 +210,12 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-testing_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -224,7 +230,8 @@ under the License. </goals> <configuration> <includes> - <include>**/KafkaTestEnvironmentImpl*</include> + <include>**/KafkaTestEnvironment*</include> + <include>**/testutils/*</include> <include>META-INF/LICENSE</include> <include>META-INF/NOTICE</include> </includes> @@ -247,7 +254,8 @@ under the License. <addMavenDescriptor>false</addMavenDescriptor> </archive> <includes> - <include>**/KafkaTestEnvironmentImpl*</include> + <include>**/KafkaTestEnvironment*</include> + <include>**/testutils/*</include> <include>META-INF/LICENSE</include> <include>META-INF/NOTICE</include> </includes> diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java index 89ce39c..5c2107a 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java @@ -26,6 +26,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.connector.kafka.source.testutils.KafkaMultipleTopicExternalContext; +import org.apache.flink.connector.kafka.source.testutils.KafkaSingleTopicExternalContext; +import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv; +import org.apache.flink.connectors.test.common.environment.MiniClusterTestEnvironment; +import org.apache.flink.connectors.test.common.external.DefaultContainerizedExternalSystem; +import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory; +import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem; +import org.apache.flink.connectors.test.common.junit.annotations.TestEnv; +import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; @@ -40,9 +49,14 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.io.Serializable; @@ -54,147 +68,181 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; /** Unite test class for {@link KafkaSource}. */ public class KafkaSourceITCase { private static final String TOPIC1 = "topic1"; private static final String TOPIC2 = "topic2"; - @BeforeClass - public static void setup() throws Throwable { - KafkaSourceTestEnv.setup(); - KafkaSourceTestEnv.setupTopic(TOPIC1, true, true); - KafkaSourceTestEnv.setupTopic(TOPIC2, true, true); - } - - @AfterClass - public static void tearDown() throws Exception { - KafkaSourceTestEnv.tearDown(); - } - - @Test - public void testTimestamp() throws Throwable { - final String topic = "testTimestamp"; - KafkaSourceTestEnv.createTestTopic(topic, 1, 1); - KafkaSourceTestEnv.produceToKafka( - Arrays.asList( - new ProducerRecord<>(topic, 0, 1L, "key0", 0), - new ProducerRecord<>(topic, 0, 2L, "key1", 1), - new ProducerRecord<>(topic, 0, 3L, "key2", 2))); - - KafkaSource<PartitionAndValue> source = - KafkaSource.<PartitionAndValue>builder() - .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) - .setGroupId("testTimestampAndWatermark") - .setTopics(topic) - .setDeserializer(new TestingKafkaRecordDeserializationSchema()) - .setStartingOffsets(OffsetsInitializer.earliest()) - .setBounded(OffsetsInitializer.latest()) - .build(); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - DataStream<PartitionAndValue> stream = - env.fromSource(source, WatermarkStrategy.noWatermarks(), "testTimestamp"); + @Nested + @TestInstance(Lifecycle.PER_CLASS) + class KafkaSpecificTests { + @BeforeAll + public void setup() throws Throwable { + KafkaSourceTestEnv.setup(); + KafkaSourceTestEnv.setupTopic(TOPIC1, true, true); + KafkaSourceTestEnv.setupTopic(TOPIC2, true, true); + } - // Verify that the timestamp and watermark are working fine. - stream.transform( - "timestampVerifier", - TypeInformation.of(PartitionAndValue.class), - new WatermarkVerifyingOperator(v -> v)); - stream.addSink(new DiscardingSink<>()); - JobExecutionResult result = env.execute(); + @AfterAll + public void tearDown() throws Exception { + KafkaSourceTestEnv.tearDown(); + } - assertEquals(Arrays.asList(1L, 2L, 3L), result.getAccumulatorResult("timestamp")); - } + @Test + public void testTimestamp() throws Throwable { + final String topic = "testTimestamp"; + KafkaSourceTestEnv.createTestTopic(topic, 1, 1); + KafkaSourceTestEnv.produceToKafka( + Arrays.asList( + new ProducerRecord<>(topic, 0, 1L, "key0", 0), + new ProducerRecord<>(topic, 0, 2L, "key1", 1), + new ProducerRecord<>(topic, 0, 3L, "key2", 2))); + + KafkaSource<PartitionAndValue> source = + KafkaSource.<PartitionAndValue>builder() + .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) + .setGroupId("testTimestampAndWatermark") + .setTopics(topic) + .setDeserializer(new TestingKafkaRecordDeserializationSchema()) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setBounded(OffsetsInitializer.latest()) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + DataStream<PartitionAndValue> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "testTimestamp"); + + // Verify that the timestamp and watermark are working fine. + stream.transform( + "timestampVerifier", + TypeInformation.of(PartitionAndValue.class), + new WatermarkVerifyingOperator(v -> v)); + stream.addSink(new DiscardingSink<>()); + JobExecutionResult result = env.execute(); + + assertEquals(Arrays.asList(1L, 2L, 3L), result.getAccumulatorResult("timestamp")); + } - @Test - public void testBasicRead() throws Exception { - KafkaSource<PartitionAndValue> source = - KafkaSource.<PartitionAndValue>builder() - .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) - .setGroupId("testBasicRead") - .setTopics(Arrays.asList(TOPIC1, TOPIC2)) - .setDeserializer(new TestingKafkaRecordDeserializationSchema()) - .setStartingOffsets(OffsetsInitializer.earliest()) - .setBounded(OffsetsInitializer.latest()) - .build(); + @Test + public void testBasicRead() throws Exception { + KafkaSource<PartitionAndValue> source = + KafkaSource.<PartitionAndValue>builder() + .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) + .setGroupId("testBasicRead") + .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + .setDeserializer(new TestingKafkaRecordDeserializationSchema()) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setBounded(OffsetsInitializer.latest()) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + DataStream<PartitionAndValue> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "testBasicRead"); + executeAndVerify(env, stream); + } - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - DataStream<PartitionAndValue> stream = - env.fromSource(source, WatermarkStrategy.noWatermarks(), "testBasicRead"); - executeAndVerify(env, stream); - } + @Test + public void testValueOnlyDeserializer() throws Exception { + KafkaSource<Integer> source = + KafkaSource.<Integer>builder() + .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) + .setGroupId("testValueOnlyDeserializer") + .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + .setDeserializer( + KafkaRecordDeserializationSchema.valueOnly( + IntegerDeserializer.class)) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setBounded(OffsetsInitializer.latest()) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + final CloseableIterator<Integer> resultIterator = + env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "testValueOnlyDeserializer") + .executeAndCollect(); + + AtomicInteger actualSum = new AtomicInteger(); + resultIterator.forEachRemaining(actualSum::addAndGet); + + // Calculate the actual sum of values + // Values in a partition should start from partition ID, and end with + // (NUM_RECORDS_PER_PARTITION - 1) + // e.g. Values in partition 5 should be {5, 6, 7, 8, 9} + int expectedSum = 0; + for (int partition = 0; partition < KafkaSourceTestEnv.NUM_PARTITIONS; partition++) { + for (int value = partition; + value < KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION; + value++) { + expectedSum += value; + } + } - @Test - public void testValueOnlyDeserializer() throws Exception { - KafkaSource<Integer> source = - KafkaSource.<Integer>builder() - .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) - .setGroupId("testValueOnlyDeserializer") - .setTopics(Arrays.asList(TOPIC1, TOPIC2)) - .setDeserializer( - KafkaRecordDeserializationSchema.valueOnly( - IntegerDeserializer.class)) - .setStartingOffsets(OffsetsInitializer.earliest()) - .setBounded(OffsetsInitializer.latest()) - .build(); + // Since we have two topics, the expected sum value should be doubled + expectedSum *= 2; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - final CloseableIterator<Integer> resultIterator = - env.fromSource( - source, - WatermarkStrategy.noWatermarks(), - "testValueOnlyDeserializer") - .executeAndCollect(); - - AtomicInteger actualSum = new AtomicInteger(); - resultIterator.forEachRemaining(actualSum::addAndGet); - - // Calculate the actual sum of values - // Values in a partition should start from partition ID, and end with - // (NUM_RECORDS_PER_PARTITION - 1) - // e.g. Values in partition 5 should be {5, 6, 7, 8, 9} - int expectedSum = 0; - for (int partition = 0; partition < KafkaSourceTestEnv.NUM_PARTITIONS; partition++) { - for (int value = partition; - value < KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION; - value++) { - expectedSum += value; - } + assertEquals(expectedSum, actualSum.get()); } - // Since we have two topics, the expected sum value should be doubled - expectedSum *= 2; - - assertEquals(expectedSum, actualSum.get()); + @Test + public void testRedundantParallelism() throws Exception { + KafkaSource<PartitionAndValue> source = + KafkaSource.<PartitionAndValue>builder() + .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) + .setGroupId("testRedundantParallelism") + .setTopics(Collections.singletonList(TOPIC1)) + .setDeserializer(new TestingKafkaRecordDeserializationSchema()) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setBounded(OffsetsInitializer.latest()) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Here we use (NUM_PARTITION + 1) as the parallelism, so one SourceReader will not be + // assigned with any splits. The redundant SourceReader should also be signaled with a + // NoMoreSplitsEvent and eventually spins to FINISHED state. + env.setParallelism(KafkaSourceTestEnv.NUM_PARTITIONS + 1); + DataStream<PartitionAndValue> stream = + env.fromSource( + source, WatermarkStrategy.noWatermarks(), "testRedundantParallelism"); + executeAndVerify(env, stream); + } } - @Test(timeout = 30000L) - public void testRedundantParallelism() throws Exception { - KafkaSource<PartitionAndValue> source = - KafkaSource.<PartitionAndValue>builder() - .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) - .setGroupId("testRedundantParallelism") - .setTopics(Collections.singletonList(TOPIC1)) - .setDeserializer(new TestingKafkaRecordDeserializationSchema()) - .setStartingOffsets(OffsetsInitializer.earliest()) - .setBounded(OffsetsInitializer.latest()) + /** Integration test based on connector testing framework. */ + @Nested + class IntegrationTests extends SourceTestSuiteBase<String> { + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:5.5.2"; + + // Defines test environment on Flink MiniCluster + @SuppressWarnings("unused") + @TestEnv + MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); + + // Defines external system + @ExternalSystem + DefaultContainerizedExternalSystem<KafkaContainer> kafka = + DefaultContainerizedExternalSystem.builder() + .fromContainer(new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))) .build(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // Here we use (NUM_PARTITION + 1) as the parallelism, so one SourceReader will not be - // assigned with any splits. The redundant SourceReader should also be signaled with a - // NoMoreSplitsEvent and eventually spins to FINISHED state. - env.setParallelism(KafkaSourceTestEnv.NUM_PARTITIONS + 1); - DataStream<PartitionAndValue> stream = - env.fromSource( - source, WatermarkStrategy.noWatermarks(), "testRedundantParallelism"); - executeAndVerify(env, stream); + // Defines 2 External context Factories, so test cases will be invoked twice using these two + // kinds of external contexts. + @SuppressWarnings("unused") + @ExternalContextFactory + KafkaSingleTopicExternalContext.Factory singleTopic = + new KafkaSingleTopicExternalContext.Factory(kafka.getContainer()); + + @SuppressWarnings("unused") + @ExternalContextFactory + KafkaMultipleTopicExternalContext.Factory multipleTopic = + new KafkaMultipleTopicExternalContext.Factory(kafka.getContainer()); } // ----------------- @@ -250,24 +298,23 @@ public class KafkaSourceITCase { } @Override - public void processElement(StreamRecord<PartitionAndValue> element) throws Exception { + public void processElement(StreamRecord<PartitionAndValue> element) { getRuntimeContext().getAccumulator("timestamp").add(element.getTimestamp()); } } - @SuppressWarnings("serial") private void executeAndVerify( StreamExecutionEnvironment env, DataStream<PartitionAndValue> stream) throws Exception { stream.addSink( new RichSinkFunction<PartitionAndValue>() { @Override - public void open(Configuration parameters) throws Exception { + public void open(Configuration parameters) { getRuntimeContext() .addAccumulator("result", new ListAccumulator<PartitionAndValue>()); } @Override - public void invoke(PartitionAndValue value, Context context) throws Exception { + public void invoke(PartitionAndValue value, Context context) { getRuntimeContext().getAccumulator("result").add(value); } }); @@ -283,10 +330,10 @@ public class KafkaSourceITCase { int firstExpectedValue = Integer.parseInt(tp.substring(tp.indexOf('-') + 1)); for (int i = 0; i < values.size(); i++) { assertEquals( - String.format( - "The %d-th value for partition %s should be %d", i, tp, i), firstExpectedValue + i, - (int) values.get(i)); + (int) values.get(i), + String.format( + "The %d-th value for partition %s should be %d", i, tp, i)); } }); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index 54e5e47..423584f 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -21,11 +21,11 @@ package org.apache.flink.connector.kafka.source.enumerator; import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; -import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv; import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; +import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv; import org.apache.flink.mock.Whitebox; import org.apache.kafka.clients.admin.AdminClient; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java index 0e84882..4bc9769 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java @@ -18,9 +18,9 @@ package org.apache.flink.connector.kafka.source.enumerator.initializer; -import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; +import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java index 0fd9f1a..fc0f7de 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.kafka.source.enumerator.subscriber; -import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv; +import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv; import org.apache.flink.util.ExceptionUtils; import org.apache.kafka.clients.admin.AdminClient; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java index 2a477dd..175c6ef 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java @@ -22,10 +22,10 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; -import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv; import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; +import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv; import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java index 18a024b..037c080 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java @@ -25,10 +25,10 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; -import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; +import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv; import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaMultipleTopicExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaMultipleTopicExternalContext.java new file mode 100644 index 0000000..1548a3e --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaMultipleTopicExternalContext.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source.testutils; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.connectors.test.common.external.ExternalContext; +import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.testcontainers.containers.KafkaContainer; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Pattern; + +/** + * Kafka external context that will create multiple topics with only one partitions as source + * splits. + */ +public class KafkaMultipleTopicExternalContext extends KafkaSingleTopicExternalContext { + + private int numTopics = 0; + + private final String topicPattern; + + private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters = + new HashMap<>(); + + public KafkaMultipleTopicExternalContext(String bootstrapServers) { + super(bootstrapServers); + this.topicPattern = + "kafka-multiple-topic-[0-9]+-" + + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + } + + @Override + public SourceSplitDataWriter<String> createSourceSplitDataWriter() { + String topicName = getTopicName(); + createTopic(topicName, 1, (short) 1); + final KafkaPartitionDataWriter splitWriter = + new KafkaPartitionDataWriter( + getKafkaProducerProperties(numTopics), new TopicPartition(topicName, 0)); + topicNameToSplitWriters.put(topicName, splitWriter); + numTopics++; + return splitWriter; + } + + @Override + public Source<String, ?, ?> createSource(Boundedness boundedness) { + KafkaSourceBuilder<String> builder = KafkaSource.builder(); + + if (boundedness == Boundedness.BOUNDED) { + builder = builder.setBounded(OffsetsInitializer.latest()); + } + + return builder.setGroupId("flink-kafka-multiple-topic-test") + .setBootstrapServers(bootstrapServers) + .setTopicPattern(Pattern.compile(topicPattern)) + .setDeserializer( + KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) + .build(); + } + + @Override + public void close() { + topicNameToSplitWriters.forEach( + (topicName, splitWriter) -> { + try { + splitWriter.close(); + deleteTopic(topicName); + } catch (Exception e) { + kafkaAdminClient.close(); + throw new RuntimeException("Cannot close split writer", e); + } + }); + topicNameToSplitWriters.clear(); + kafkaAdminClient.close(); + } + + private String getTopicName() { + return topicPattern.replace("[0-9]+", String.valueOf(numTopics)); + } + + @Override + public String toString() { + return "Multiple-topics Kafka"; + } + + /** Factory of {@link KafkaSingleTopicExternalContext}. */ + public static class Factory extends KafkaSingleTopicExternalContext.Factory { + + public Factory(KafkaContainer kafkaContainer) { + super(kafkaContainer); + } + + @Override + public ExternalContext<String> createExternalContext() { + return new KafkaMultipleTopicExternalContext(getBootstrapServer()); + } + } +} diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaPartitionDataWriter.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaPartitionDataWriter.java new file mode 100644 index 0000000..5485137 --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaPartitionDataWriter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source.testutils; + +import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Properties; + +/** Source split data writer for writing test data into Kafka topic partitions. */ +public class KafkaPartitionDataWriter implements SourceSplitDataWriter<String> { + + private final KafkaProducer<byte[], byte[]> kafkaProducer; + private final TopicPartition topicPartition; + + public KafkaPartitionDataWriter(Properties producerProperties, TopicPartition topicPartition) { + this.kafkaProducer = new KafkaProducer<>(producerProperties); + this.topicPartition = topicPartition; + } + + @Override + public void writeRecords(Collection<String> records) { + for (String record : records) { + ProducerRecord<byte[], byte[]> producerRecord = + new ProducerRecord<>( + topicPartition.topic(), + topicPartition.partition(), + null, + record.getBytes(StandardCharsets.UTF_8)); + kafkaProducer.send(producerRecord); + } + kafkaProducer.flush(); + } + + @Override + public void close() { + kafkaProducer.close(); + } +} diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java new file mode 100644 index 0000000..81240cf --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source.testutils; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.connectors.test.common.external.ExternalContext; +import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * A Kafka external context that will create only one topic and use partitions in that topic as + * source splits. + */ +public class KafkaSingleTopicExternalContext implements ExternalContext<String> { + + private static final Logger LOG = + LoggerFactory.getLogger(KafkaSingleTopicExternalContext.class); + + private static final String TOPIC_NAME_PREFIX = "kafka-single-topic"; + private static final int DEFAULT_TIMEOUT = 30; + private static final int NUM_RECORDS_UPPER_BOUND = 500; + private static final int NUM_RECORDS_LOWER_BOUND = 100; + + protected String bootstrapServers; + private final String topicName; + + private final Map<Integer, SourceSplitDataWriter<String>> partitionToSplitWriter = + new HashMap<>(); + + private int numSplits = 0; + + protected final AdminClient kafkaAdminClient; + + public KafkaSingleTopicExternalContext(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + this.topicName = + TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + kafkaAdminClient = createAdminClient(); + } + + protected void createTopic(String topicName, int numPartitions, short replicationFactor) { + LOG.debug( + "Creating new Kafka topic {} with {} partitions and {} replicas", + topicName, + numPartitions, + replicationFactor); + NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor); + try { + kafkaAdminClient + .createTopics(Collections.singletonList(newTopic)) + .all() + .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(String.format("Cannot create topic '%s'", topicName), e); + } + } + + protected void deleteTopic(String topicName) { + LOG.debug("Deleting Kafka topic {}", topicName); + try { + kafkaAdminClient + .deleteTopics(Collections.singletonList(topicName)) + .all() + .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS); + } catch (Exception e) { + if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) { + throw new RuntimeException(String.format("Cannot delete topic '%s'", topicName), e); + } + } + } + + private AdminClient createAdminClient() { + Properties config = new Properties(); + config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + return AdminClient.create(config); + } + + @Override + public Source<String, ?, ?> createSource(Boundedness boundedness) { + KafkaSourceBuilder<String> builder = KafkaSource.builder(); + + if (boundedness == Boundedness.BOUNDED) { + builder = builder.setBounded(OffsetsInitializer.latest()); + } + return builder.setGroupId("flink-kafka-test") + .setDeserializer( + KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) + .setTopics(topicName) + .setBootstrapServers(bootstrapServers) + .build(); + } + + @Override + public SourceSplitDataWriter<String> createSourceSplitDataWriter() { + if (numSplits == 0) { + createTopic(topicName, 1, (short) 1); + numSplits++; + } else { + LOG.debug("Creating new partition for topic {}", topicName); + kafkaAdminClient.createPartitions( + Collections.singletonMap(topicName, NewPartitions.increaseTo(++numSplits))); + } + KafkaPartitionDataWriter splitWriter = + new KafkaPartitionDataWriter( + getKafkaProducerProperties(numSplits - 1), + new TopicPartition(topicName, numSplits - 1)); + partitionToSplitWriter.put(numSplits - 1, splitWriter); + return splitWriter; + } + + @Override + public Collection<String> generateTestData(long seed) { + Random random = new Random(seed); + List<String> randomStringRecords = new ArrayList<>(); + int recordNum = + random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND) + + NUM_RECORDS_LOWER_BOUND; + for (int i = 0; i < recordNum; i++) { + int stringLength = random.nextInt(50) + 1; + randomStringRecords.add(generateRandomString(stringLength, random)); + } + return randomStringRecords; + } + + private String generateRandomString(int length, Random random) { + String alphaNumericString = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789"; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; ++i) { + sb.append(alphaNumericString.charAt(random.nextInt(alphaNumericString.length()))); + } + return sb.toString(); + } + + protected Properties getKafkaProducerProperties(int producerId) { + Properties kafkaProducerProperties = new Properties(); + kafkaProducerProperties.setProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + kafkaProducerProperties.setProperty( + ProducerConfig.CLIENT_ID_CONFIG, + String.join( + "-", + "flink-kafka-split-writer", + Integer.toString(producerId), + Long.toString(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)))); + kafkaProducerProperties.setProperty( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProducerProperties.setProperty( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + return kafkaProducerProperties; + } + + @Override + public void close() { + deleteTopic(topicName); + partitionToSplitWriter.forEach( + (partitionId, splitWriter) -> { + try { + splitWriter.close(); + } catch (Exception e) { + kafkaAdminClient.close(); + throw new RuntimeException("Cannot close split writer", e); + } + }); + partitionToSplitWriter.clear(); + kafkaAdminClient.close(); + } + + @Override + public String toString() { + return "Single-topic Kafka"; + } + + /** Factory of {@link KafkaSingleTopicExternalContext}. */ + public static class Factory implements ExternalContext.Factory<String> { + + private final KafkaContainer kafkaContainer; + + public Factory(KafkaContainer kafkaContainer) { + this.kafkaContainer = kafkaContainer; + } + + protected String getBootstrapServer() { + final String internalEndpoints = + kafkaContainer.getNetworkAliases().stream() + .map(host -> String.join(":", host, Integer.toString(9092))) + .collect(Collectors.joining(",")); + return String.join(",", kafkaContainer.getBootstrapServers(), internalEndpoints); + } + + @Override + public ExternalContext<String> createExternalContext() { + return new KafkaSingleTopicExternalContext(getBootstrapServer()); + } + } +} diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestEnv.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java similarity index 99% rename from flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestEnv.java rename to flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java index 6262e34..df5d8ea 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestEnv.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.kafka.source; +package org.apache.flink.connector.kafka.source.testutils; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.streaming.connectors.kafka.KafkaTestBase; diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml index 8c3b90d..a13ed11 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml @@ -133,6 +133,19 @@ under the License. <version>1.10.0</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-testing_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> </dependencies> <build> @@ -192,6 +205,22 @@ under the License. <type>jar</type> <outputDirectory>${project.build.directory}/dependencies</outputDirectory> </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <destFileName>kafka-connector.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>2.4.1</version> + <destFileName>kafka-clients.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> </artifactItems> </configuration> </plugin> diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java new file mode 100644 index 0000000..f220008 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.kafka; + +import org.apache.flink.connector.kafka.source.testutils.KafkaMultipleTopicExternalContext; +import org.apache.flink.connector.kafka.source.testutils.KafkaSingleTopicExternalContext; +import org.apache.flink.connectors.test.common.external.DefaultContainerizedExternalSystem; +import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory; +import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem; +import org.apache.flink.connectors.test.common.junit.annotations.TestEnv; +import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment; + +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +/** Kafka E2E test based on connector testing framework. */ +public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> { + private static final String KAFKA_HOSTNAME = "kafka"; + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:5.5.2"; + + // Defines TestEnvironment + @TestEnv + FlinkContainerTestEnvironment flink = + new FlinkContainerTestEnvironment( + 1, + 6, + TestUtils.getResource("kafka-connector.jar").toAbsolutePath().toString(), + TestUtils.getResource("kafka-clients.jar").toAbsolutePath().toString()); + + // Defines ConnectorExternalSystem + @ExternalSystem + DefaultContainerizedExternalSystem<KafkaContainer> kafka = + DefaultContainerizedExternalSystem.builder() + .fromContainer( + new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetworkAliases(KAFKA_HOSTNAME)) + .bindWithFlinkContainer(flink.getFlinkContainer()) + .build(); + + // Defines 2 External context Factories, so test cases will be invoked twice using these two + // kinds of external contexts. + @SuppressWarnings("unused") + @ExternalContextFactory + KafkaSingleTopicExternalContext.Factory singleTopic = + new KafkaSingleTopicExternalContext.Factory(kafka.getContainer()); + + @SuppressWarnings("unused") + @ExternalContextFactory + KafkaMultipleTopicExternalContext.Factory multipleTopic = + new KafkaMultipleTopicExternalContext.Factory(kafka.getContainer()); +} diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist index 89e5b58..521d055 100644 --- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist +++ b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist @@ -38,3 +38,4 @@ flink-streaming-kafka-test-base flink-heavy-deployment-stress-test flink-elasticsearch5-test flink-high-parallelism-iterations-test +flink-end-to-end-tests-common-kafka
