This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit adee56117778f6962d7892a26b2b507b67ec707a
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Aug 12 18:01:11 2021 +0800

    [FLINK-19554][connector/testing-framework] KafkaSource IT and E2E case 
based on connector testing framework
---
 flink-connectors/flink-connector-kafka/pom.xml     |  12 +-
 .../connector/kafka/source/KafkaSourceITCase.java  | 313 ++++++++++++---------
 .../source/enumerator/KafkaEnumeratorTest.java     |   2 +-
 .../initializer/OffsetsInitializerTest.java        |   2 +-
 .../enumerator/subscriber/KafkaSubscriberTest.java |   2 +-
 .../reader/KafkaPartitionSplitReaderTest.java      |   2 +-
 .../kafka/source/reader/KafkaSourceReaderTest.java |   2 +-
 .../KafkaMultipleTopicExternalContext.java         | 124 ++++++++
 .../source/testutils/KafkaPartitionDataWriter.java |  60 ++++
 .../testutils/KafkaSingleTopicExternalContext.java | 242 ++++++++++++++++
 .../source/{ => testutils}/KafkaSourceTestEnv.java |   2 +-
 .../flink-end-to-end-tests-common-kafka/pom.xml    |  29 ++
 .../flink/tests/util/kafka/KafkaSourceE2ECase.java |  69 +++++
 .../modules-skipping-deployment.modulelist         |   1 +
 14 files changed, 721 insertions(+), 141 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/pom.xml 
b/flink-connectors/flink-connector-kafka/pom.xml
index 2301a06..0da6052 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -210,6 +210,12 @@ under the License.
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-testing_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
@@ -224,7 +230,8 @@ under the License.
                                                </goals>
                                                <configuration>
                                                        <includes>
-                                                               
<include>**/KafkaTestEnvironmentImpl*</include>
+                                                               
<include>**/KafkaTestEnvironment*</include>
+                                                               
<include>**/testutils/*</include>
                                                                
<include>META-INF/LICENSE</include>
                                                                
<include>META-INF/NOTICE</include>
                                                        </includes>
@@ -247,7 +254,8 @@ under the License.
                                                                
<addMavenDescriptor>false</addMavenDescriptor>
                                                        </archive>
                                                        <includes>
-                                                               
<include>**/KafkaTestEnvironmentImpl*</include>
+                                                               
<include>**/KafkaTestEnvironment*</include>
+                                                               
<include>**/testutils/*</include>
                                                                
<include>META-INF/LICENSE</include>
                                                                
<include>META-INF/NOTICE</include>
                                                        </includes>
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 89ce39c..5c2107a 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -26,6 +26,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import 
org.apache.flink.connector.kafka.source.testutils.KafkaMultipleTopicExternalContext;
+import 
org.apache.flink.connector.kafka.source.testutils.KafkaSingleTopicExternalContext;
+import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
+import 
org.apache.flink.connectors.test.common.environment.MiniClusterTestEnvironment;
+import 
org.apache.flink.connectors.test.common.external.DefaultContainerizedExternalSystem;
+import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
+import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
+import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
+import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -40,9 +49,14 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -54,147 +68,181 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Unite test class for {@link KafkaSource}. */
 public class KafkaSourceITCase {
     private static final String TOPIC1 = "topic1";
     private static final String TOPIC2 = "topic2";
 
-    @BeforeClass
-    public static void setup() throws Throwable {
-        KafkaSourceTestEnv.setup();
-        KafkaSourceTestEnv.setupTopic(TOPIC1, true, true);
-        KafkaSourceTestEnv.setupTopic(TOPIC2, true, true);
-    }
-
-    @AfterClass
-    public static void tearDown() throws Exception {
-        KafkaSourceTestEnv.tearDown();
-    }
-
-    @Test
-    public void testTimestamp() throws Throwable {
-        final String topic = "testTimestamp";
-        KafkaSourceTestEnv.createTestTopic(topic, 1, 1);
-        KafkaSourceTestEnv.produceToKafka(
-                Arrays.asList(
-                        new ProducerRecord<>(topic, 0, 1L, "key0", 0),
-                        new ProducerRecord<>(topic, 0, 2L, "key1", 1),
-                        new ProducerRecord<>(topic, 0, 3L, "key2", 2)));
-
-        KafkaSource<PartitionAndValue> source =
-                KafkaSource.<PartitionAndValue>builder()
-                        
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
-                        .setGroupId("testTimestampAndWatermark")
-                        .setTopics(topic)
-                        .setDeserializer(new 
TestingKafkaRecordDeserializationSchema())
-                        .setStartingOffsets(OffsetsInitializer.earliest())
-                        .setBounded(OffsetsInitializer.latest())
-                        .build();
-
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        DataStream<PartitionAndValue> stream =
-                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"testTimestamp");
+    @Nested
+    @TestInstance(Lifecycle.PER_CLASS)
+    class KafkaSpecificTests {
+        @BeforeAll
+        public void setup() throws Throwable {
+            KafkaSourceTestEnv.setup();
+            KafkaSourceTestEnv.setupTopic(TOPIC1, true, true);
+            KafkaSourceTestEnv.setupTopic(TOPIC2, true, true);
+        }
 
-        // Verify that the timestamp and watermark are working fine.
-        stream.transform(
-                "timestampVerifier",
-                TypeInformation.of(PartitionAndValue.class),
-                new WatermarkVerifyingOperator(v -> v));
-        stream.addSink(new DiscardingSink<>());
-        JobExecutionResult result = env.execute();
+        @AfterAll
+        public void tearDown() throws Exception {
+            KafkaSourceTestEnv.tearDown();
+        }
 
-        assertEquals(Arrays.asList(1L, 2L, 3L), 
result.getAccumulatorResult("timestamp"));
-    }
+        @Test
+        public void testTimestamp() throws Throwable {
+            final String topic = "testTimestamp";
+            KafkaSourceTestEnv.createTestTopic(topic, 1, 1);
+            KafkaSourceTestEnv.produceToKafka(
+                    Arrays.asList(
+                            new ProducerRecord<>(topic, 0, 1L, "key0", 0),
+                            new ProducerRecord<>(topic, 0, 2L, "key1", 1),
+                            new ProducerRecord<>(topic, 0, 3L, "key2", 2)));
+
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setGroupId("testTimestampAndWatermark")
+                            .setTopics(topic)
+                            .setDeserializer(new 
TestingKafkaRecordDeserializationSchema())
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
+            DataStream<PartitionAndValue> stream =
+                    env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"testTimestamp");
+
+            // Verify that the timestamp and watermark are working fine.
+            stream.transform(
+                    "timestampVerifier",
+                    TypeInformation.of(PartitionAndValue.class),
+                    new WatermarkVerifyingOperator(v -> v));
+            stream.addSink(new DiscardingSink<>());
+            JobExecutionResult result = env.execute();
+
+            assertEquals(Arrays.asList(1L, 2L, 3L), 
result.getAccumulatorResult("timestamp"));
+        }
 
-    @Test
-    public void testBasicRead() throws Exception {
-        KafkaSource<PartitionAndValue> source =
-                KafkaSource.<PartitionAndValue>builder()
-                        
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
-                        .setGroupId("testBasicRead")
-                        .setTopics(Arrays.asList(TOPIC1, TOPIC2))
-                        .setDeserializer(new 
TestingKafkaRecordDeserializationSchema())
-                        .setStartingOffsets(OffsetsInitializer.earliest())
-                        .setBounded(OffsetsInitializer.latest())
-                        .build();
+        @Test
+        public void testBasicRead() throws Exception {
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setGroupId("testBasicRead")
+                            .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+                            .setDeserializer(new 
TestingKafkaRecordDeserializationSchema())
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
+            DataStream<PartitionAndValue> stream =
+                    env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"testBasicRead");
+            executeAndVerify(env, stream);
+        }
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        DataStream<PartitionAndValue> stream =
-                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"testBasicRead");
-        executeAndVerify(env, stream);
-    }
+        @Test
+        public void testValueOnlyDeserializer() throws Exception {
+            KafkaSource<Integer> source =
+                    KafkaSource.<Integer>builder()
+                            
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setGroupId("testValueOnlyDeserializer")
+                            .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+                            .setDeserializer(
+                                    KafkaRecordDeserializationSchema.valueOnly(
+                                            IntegerDeserializer.class))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
+            final CloseableIterator<Integer> resultIterator =
+                    env.fromSource(
+                                    source,
+                                    WatermarkStrategy.noWatermarks(),
+                                    "testValueOnlyDeserializer")
+                            .executeAndCollect();
+
+            AtomicInteger actualSum = new AtomicInteger();
+            resultIterator.forEachRemaining(actualSum::addAndGet);
+
+            // Calculate the actual sum of values
+            // Values in a partition should start from partition ID, and end 
with
+            // (NUM_RECORDS_PER_PARTITION - 1)
+            // e.g. Values in partition 5 should be {5, 6, 7, 8, 9}
+            int expectedSum = 0;
+            for (int partition = 0; partition < 
KafkaSourceTestEnv.NUM_PARTITIONS; partition++) {
+                for (int value = partition;
+                        value < KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
+                        value++) {
+                    expectedSum += value;
+                }
+            }
 
-    @Test
-    public void testValueOnlyDeserializer() throws Exception {
-        KafkaSource<Integer> source =
-                KafkaSource.<Integer>builder()
-                        
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
-                        .setGroupId("testValueOnlyDeserializer")
-                        .setTopics(Arrays.asList(TOPIC1, TOPIC2))
-                        .setDeserializer(
-                                KafkaRecordDeserializationSchema.valueOnly(
-                                        IntegerDeserializer.class))
-                        .setStartingOffsets(OffsetsInitializer.earliest())
-                        .setBounded(OffsetsInitializer.latest())
-                        .build();
+            // Since we have two topics, the expected sum value should be 
doubled
+            expectedSum *= 2;
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        final CloseableIterator<Integer> resultIterator =
-                env.fromSource(
-                                source,
-                                WatermarkStrategy.noWatermarks(),
-                                "testValueOnlyDeserializer")
-                        .executeAndCollect();
-
-        AtomicInteger actualSum = new AtomicInteger();
-        resultIterator.forEachRemaining(actualSum::addAndGet);
-
-        // Calculate the actual sum of values
-        // Values in a partition should start from partition ID, and end with
-        // (NUM_RECORDS_PER_PARTITION - 1)
-        // e.g. Values in partition 5 should be {5, 6, 7, 8, 9}
-        int expectedSum = 0;
-        for (int partition = 0; partition < KafkaSourceTestEnv.NUM_PARTITIONS; 
partition++) {
-            for (int value = partition;
-                    value < KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
-                    value++) {
-                expectedSum += value;
-            }
+            assertEquals(expectedSum, actualSum.get());
         }
 
-        // Since we have two topics, the expected sum value should be doubled
-        expectedSum *= 2;
-
-        assertEquals(expectedSum, actualSum.get());
+        @Test
+        public void testRedundantParallelism() throws Exception {
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setGroupId("testRedundantParallelism")
+                            .setTopics(Collections.singletonList(TOPIC1))
+                            .setDeserializer(new 
TestingKafkaRecordDeserializationSchema())
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+            // Here we use (NUM_PARTITION + 1) as the parallelism, so one 
SourceReader will not be
+            // assigned with any splits. The redundant SourceReader should 
also be signaled with a
+            // NoMoreSplitsEvent and eventually spins to FINISHED state.
+            env.setParallelism(KafkaSourceTestEnv.NUM_PARTITIONS + 1);
+            DataStream<PartitionAndValue> stream =
+                    env.fromSource(
+                            source, WatermarkStrategy.noWatermarks(), 
"testRedundantParallelism");
+            executeAndVerify(env, stream);
+        }
     }
 
-    @Test(timeout = 30000L)
-    public void testRedundantParallelism() throws Exception {
-        KafkaSource<PartitionAndValue> source =
-                KafkaSource.<PartitionAndValue>builder()
-                        
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
-                        .setGroupId("testRedundantParallelism")
-                        .setTopics(Collections.singletonList(TOPIC1))
-                        .setDeserializer(new 
TestingKafkaRecordDeserializationSchema())
-                        .setStartingOffsets(OffsetsInitializer.earliest())
-                        .setBounded(OffsetsInitializer.latest())
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTests extends SourceTestSuiteBase<String> {
+        private static final String KAFKA_IMAGE_NAME = 
"confluentinc/cp-kafka:5.5.2";
+
+        // Defines test environment on Flink MiniCluster
+        @SuppressWarnings("unused")
+        @TestEnv
+        MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        // Defines external system
+        @ExternalSystem
+        DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+                DefaultContainerizedExternalSystem.builder()
+                        .fromContainer(new 
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)))
                         .build();
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-        // Here we use (NUM_PARTITION + 1) as the parallelism, so one 
SourceReader will not be
-        // assigned with any splits. The redundant SourceReader should also be 
signaled with a
-        // NoMoreSplitsEvent and eventually spins to FINISHED state.
-        env.setParallelism(KafkaSourceTestEnv.NUM_PARTITIONS + 1);
-        DataStream<PartitionAndValue> stream =
-                env.fromSource(
-                        source, WatermarkStrategy.noWatermarks(), 
"testRedundantParallelism");
-        executeAndVerify(env, stream);
+        // Defines 2 External context Factories, so test cases will be invoked 
twice using these two
+        // kinds of external contexts.
+        @SuppressWarnings("unused")
+        @ExternalContextFactory
+        KafkaSingleTopicExternalContext.Factory singleTopic =
+                new 
KafkaSingleTopicExternalContext.Factory(kafka.getContainer());
+
+        @SuppressWarnings("unused")
+        @ExternalContextFactory
+        KafkaMultipleTopicExternalContext.Factory multipleTopic =
+                new 
KafkaMultipleTopicExternalContext.Factory(kafka.getContainer());
     }
 
     // -----------------
@@ -250,24 +298,23 @@ public class KafkaSourceITCase {
         }
 
         @Override
-        public void processElement(StreamRecord<PartitionAndValue> element) 
throws Exception {
+        public void processElement(StreamRecord<PartitionAndValue> element) {
             
getRuntimeContext().getAccumulator("timestamp").add(element.getTimestamp());
         }
     }
 
-    @SuppressWarnings("serial")
     private void executeAndVerify(
             StreamExecutionEnvironment env, DataStream<PartitionAndValue> 
stream) throws Exception {
         stream.addSink(
                 new RichSinkFunction<PartitionAndValue>() {
                     @Override
-                    public void open(Configuration parameters) throws 
Exception {
+                    public void open(Configuration parameters) {
                         getRuntimeContext()
                                 .addAccumulator("result", new 
ListAccumulator<PartitionAndValue>());
                     }
 
                     @Override
-                    public void invoke(PartitionAndValue value, Context 
context) throws Exception {
+                    public void invoke(PartitionAndValue value, Context 
context) {
                         
getRuntimeContext().getAccumulator("result").add(value);
                     }
                 });
@@ -283,10 +330,10 @@ public class KafkaSourceITCase {
                     int firstExpectedValue = 
Integer.parseInt(tp.substring(tp.indexOf('-') + 1));
                     for (int i = 0; i < values.size(); i++) {
                         assertEquals(
-                                String.format(
-                                        "The %d-th value for partition %s 
should be %d", i, tp, i),
                                 firstExpectedValue + i,
-                                (int) values.get(i));
+                                (int) values.get(i),
+                                String.format(
+                                        "The %d-th value for partition %s 
should be %d", i, tp, i));
                     }
                 });
     }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index 54e5e47..423584f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.connector.kafka.source.enumerator;
 import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
-import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
 import org.apache.flink.mock.Whitebox;
 
 import org.apache.kafka.clients.admin.AdminClient;
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
index 0e84882..4bc9769 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.initializer;
 
-import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
 import 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
index 0fd9f1a..fc0f7de 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.subscriber;
 
-import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
+import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.kafka.clients.admin.AdminClient;
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index 2a477dd..175c6ef 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -22,10 +22,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
-import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
 import 
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
 import 
org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index 18a024b..037c080 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -25,10 +25,10 @@ import 
org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
-import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
 import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaMultipleTopicExternalContext.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaMultipleTopicExternalContext.java
new file mode 100644
index 0000000..1548a3e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaMultipleTopicExternalContext.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.testutils;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.testcontainers.containers.KafkaContainer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Pattern;
+
+/**
+ * Kafka external context that will create multiple topics with only one 
partitions as source
+ * splits.
+ */
+public class KafkaMultipleTopicExternalContext extends 
KafkaSingleTopicExternalContext {
+
+    private int numTopics = 0;
+
+    private final String topicPattern;
+
+    private final Map<String, SourceSplitDataWriter<String>> 
topicNameToSplitWriters =
+            new HashMap<>();
+
+    public KafkaMultipleTopicExternalContext(String bootstrapServers) {
+        super(bootstrapServers);
+        this.topicPattern =
+                "kafka-multiple-topic-[0-9]+-"
+                        + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    }
+
+    @Override
+    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
+        String topicName = getTopicName();
+        createTopic(topicName, 1, (short) 1);
+        final KafkaPartitionDataWriter splitWriter =
+                new KafkaPartitionDataWriter(
+                        getKafkaProducerProperties(numTopics), new 
TopicPartition(topicName, 0));
+        topicNameToSplitWriters.put(topicName, splitWriter);
+        numTopics++;
+        return splitWriter;
+    }
+
+    @Override
+    public Source<String, ?, ?> createSource(Boundedness boundedness) {
+        KafkaSourceBuilder<String> builder = KafkaSource.builder();
+
+        if (boundedness == Boundedness.BOUNDED) {
+            builder = builder.setBounded(OffsetsInitializer.latest());
+        }
+
+        return builder.setGroupId("flink-kafka-multiple-topic-test")
+                .setBootstrapServers(bootstrapServers)
+                .setTopicPattern(Pattern.compile(topicPattern))
+                .setDeserializer(
+                        
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+                .build();
+    }
+
+    @Override
+    public void close() {
+        topicNameToSplitWriters.forEach(
+                (topicName, splitWriter) -> {
+                    try {
+                        splitWriter.close();
+                        deleteTopic(topicName);
+                    } catch (Exception e) {
+                        kafkaAdminClient.close();
+                        throw new RuntimeException("Cannot close split 
writer", e);
+                    }
+                });
+        topicNameToSplitWriters.clear();
+        kafkaAdminClient.close();
+    }
+
+    private String getTopicName() {
+        return topicPattern.replace("[0-9]+", String.valueOf(numTopics));
+    }
+
+    @Override
+    public String toString() {
+        return "Multiple-topics Kafka";
+    }
+
+    /** Factory of {@link KafkaSingleTopicExternalContext}. */
+    public static class Factory extends 
KafkaSingleTopicExternalContext.Factory {
+
+        public Factory(KafkaContainer kafkaContainer) {
+            super(kafkaContainer);
+        }
+
+        @Override
+        public ExternalContext<String> createExternalContext() {
+            return new KafkaMultipleTopicExternalContext(getBootstrapServer());
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaPartitionDataWriter.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaPartitionDataWriter.java
new file mode 100644
index 0000000..5485137
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaPartitionDataWriter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.testutils;
+
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Properties;
+
+/** Source split data writer for writing test data into Kafka topic 
partitions. */
+public class KafkaPartitionDataWriter implements SourceSplitDataWriter<String> 
{
+
+    private final KafkaProducer<byte[], byte[]> kafkaProducer;
+    private final TopicPartition topicPartition;
+
+    public KafkaPartitionDataWriter(Properties producerProperties, 
TopicPartition topicPartition) {
+        this.kafkaProducer = new KafkaProducer<>(producerProperties);
+        this.topicPartition = topicPartition;
+    }
+
+    @Override
+    public void writeRecords(Collection<String> records) {
+        for (String record : records) {
+            ProducerRecord<byte[], byte[]> producerRecord =
+                    new ProducerRecord<>(
+                            topicPartition.topic(),
+                            topicPartition.partition(),
+                            null,
+                            record.getBytes(StandardCharsets.UTF_8));
+            kafkaProducer.send(producerRecord);
+        }
+        kafkaProducer.flush();
+    }
+
+    @Override
+    public void close() {
+        kafkaProducer.close();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
new file mode 100644
index 0000000..81240cf
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.testutils;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * A Kafka external context that will create only one topic and use partitions 
in that topic as
+ * source splits.
+ */
+public class KafkaSingleTopicExternalContext implements 
ExternalContext<String> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(KafkaSingleTopicExternalContext.class);
+
+    private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
+    private static final int DEFAULT_TIMEOUT = 30;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+
+    protected String bootstrapServers;
+    private final String topicName;
+
+    private final Map<Integer, SourceSplitDataWriter<String>> 
partitionToSplitWriter =
+            new HashMap<>();
+
+    private int numSplits = 0;
+
+    protected final AdminClient kafkaAdminClient;
+
+    public KafkaSingleTopicExternalContext(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+        this.topicName =
+                TOPIC_NAME_PREFIX + "-" + 
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        kafkaAdminClient = createAdminClient();
+    }
+
+    protected void createTopic(String topicName, int numPartitions, short 
replicationFactor) {
+        LOG.debug(
+                "Creating new Kafka topic {} with {} partitions and {} 
replicas",
+                topicName,
+                numPartitions,
+                replicationFactor);
+        NewTopic newTopic = new NewTopic(topicName, numPartitions, 
replicationFactor);
+        try {
+            kafkaAdminClient
+                    .createTopics(Collections.singletonList(newTopic))
+                    .all()
+                    .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Cannot create topic 
'%s'", topicName), e);
+        }
+    }
+
+    protected void deleteTopic(String topicName) {
+        LOG.debug("Deleting Kafka topic {}", topicName);
+        try {
+            kafkaAdminClient
+                    .deleteTopics(Collections.singletonList(topicName))
+                    .all()
+                    .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            if (ExceptionUtils.getRootCause(e) instanceof 
UnknownTopicOrPartitionException) {
+                throw new RuntimeException(String.format("Cannot delete topic 
'%s'", topicName), e);
+            }
+        }
+    }
+
+    private AdminClient createAdminClient() {
+        Properties config = new Properties();
+        config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        return AdminClient.create(config);
+    }
+
+    @Override
+    public Source<String, ?, ?> createSource(Boundedness boundedness) {
+        KafkaSourceBuilder<String> builder = KafkaSource.builder();
+
+        if (boundedness == Boundedness.BOUNDED) {
+            builder = builder.setBounded(OffsetsInitializer.latest());
+        }
+        return builder.setGroupId("flink-kafka-test")
+                .setDeserializer(
+                        
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+                .setTopics(topicName)
+                .setBootstrapServers(bootstrapServers)
+                .build();
+    }
+
+    @Override
+    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
+        if (numSplits == 0) {
+            createTopic(topicName, 1, (short) 1);
+            numSplits++;
+        } else {
+            LOG.debug("Creating new partition for topic {}", topicName);
+            kafkaAdminClient.createPartitions(
+                    Collections.singletonMap(topicName, 
NewPartitions.increaseTo(++numSplits)));
+        }
+        KafkaPartitionDataWriter splitWriter =
+                new KafkaPartitionDataWriter(
+                        getKafkaProducerProperties(numSplits - 1),
+                        new TopicPartition(topicName, numSplits - 1));
+        partitionToSplitWriter.put(numSplits - 1, splitWriter);
+        return splitWriter;
+    }
+
+    @Override
+    public Collection<String> generateTestData(long seed) {
+        Random random = new Random(seed);
+        List<String> randomStringRecords = new ArrayList<>();
+        int recordNum =
+                random.nextInt(NUM_RECORDS_UPPER_BOUND - 
NUM_RECORDS_LOWER_BOUND)
+                        + NUM_RECORDS_LOWER_BOUND;
+        for (int i = 0; i < recordNum; i++) {
+            int stringLength = random.nextInt(50) + 1;
+            randomStringRecords.add(generateRandomString(stringLength, 
random));
+        }
+        return randomStringRecords;
+    }
+
+    private String generateRandomString(int length, Random random) {
+        String alphaNumericString =
+                "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + 
"0123456789";
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < length; ++i) {
+            
sb.append(alphaNumericString.charAt(random.nextInt(alphaNumericString.length())));
+        }
+        return sb.toString();
+    }
+
+    protected Properties getKafkaProducerProperties(int producerId) {
+        Properties kafkaProducerProperties = new Properties();
+        kafkaProducerProperties.setProperty(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        kafkaProducerProperties.setProperty(
+                ProducerConfig.CLIENT_ID_CONFIG,
+                String.join(
+                        "-",
+                        "flink-kafka-split-writer",
+                        Integer.toString(producerId),
+                        
Long.toString(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE))));
+        kafkaProducerProperties.setProperty(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProducerProperties.setProperty(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        return kafkaProducerProperties;
+    }
+
+    @Override
+    public void close() {
+        deleteTopic(topicName);
+        partitionToSplitWriter.forEach(
+                (partitionId, splitWriter) -> {
+                    try {
+                        splitWriter.close();
+                    } catch (Exception e) {
+                        kafkaAdminClient.close();
+                        throw new RuntimeException("Cannot close split 
writer", e);
+                    }
+                });
+        partitionToSplitWriter.clear();
+        kafkaAdminClient.close();
+    }
+
+    @Override
+    public String toString() {
+        return "Single-topic Kafka";
+    }
+
+    /** Factory of {@link KafkaSingleTopicExternalContext}. */
+    public static class Factory implements ExternalContext.Factory<String> {
+
+        private final KafkaContainer kafkaContainer;
+
+        public Factory(KafkaContainer kafkaContainer) {
+            this.kafkaContainer = kafkaContainer;
+        }
+
+        protected String getBootstrapServer() {
+            final String internalEndpoints =
+                    kafkaContainer.getNetworkAliases().stream()
+                            .map(host -> String.join(":", host, 
Integer.toString(9092)))
+                            .collect(Collectors.joining(","));
+            return String.join(",", kafkaContainer.getBootstrapServers(), 
internalEndpoints);
+        }
+
+        @Override
+        public ExternalContext<String> createExternalContext() {
+            return new KafkaSingleTopicExternalContext(getBootstrapServer());
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestEnv.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
similarity index 99%
rename from 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestEnv.java
rename to 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
index 6262e34..df5d8ea 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestEnv.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kafka.source;
+package org.apache.flink.connector.kafka.source.testutils;
 
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
index 8c3b90d..a13ed11 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
@@ -133,6 +133,19 @@ under the License.
                        <version>1.10.0</version>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-testing_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+               </dependency>
        </dependencies>
 
        <build>
@@ -192,6 +205,22 @@ under the License.
                                                        <type>jar</type>
                                                        
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                                                </artifactItem>
+                                               <artifactItem>
+                                                       
<groupId>org.apache.flink</groupId>
+                                                       
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+                                                       
<version>${project.version}</version>
+                                                       
<destFileName>kafka-connector.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </artifactItem>
+                                               <artifactItem>
+                                                       
<groupId>org.apache.kafka</groupId>
+                                                       
<artifactId>kafka-clients</artifactId>
+                                                       <version>2.4.1</version>
+                                                       
<destFileName>kafka-clients.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </artifactItem>
                                        </artifactItems>
                                </configuration>
                        </plugin>
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
new file mode 100644
index 0000000..f220008
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.kafka;
+
+import 
org.apache.flink.connector.kafka.source.testutils.KafkaMultipleTopicExternalContext;
+import 
org.apache.flink.connector.kafka.source.testutils.KafkaSingleTopicExternalContext;
+import 
org.apache.flink.connectors.test.common.external.DefaultContainerizedExternalSystem;
+import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
+import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
+import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
+import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/** Kafka E2E test based on connector testing framework. */
+public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> {
+    private static final String KAFKA_HOSTNAME = "kafka";
+    private static final String KAFKA_IMAGE_NAME = 
"confluentinc/cp-kafka:5.5.2";
+
+    // Defines TestEnvironment
+    @TestEnv
+    FlinkContainerTestEnvironment flink =
+            new FlinkContainerTestEnvironment(
+                    1,
+                    6,
+                    
TestUtils.getResource("kafka-connector.jar").toAbsolutePath().toString(),
+                    
TestUtils.getResource("kafka-clients.jar").toAbsolutePath().toString());
+
+    // Defines ConnectorExternalSystem
+    @ExternalSystem
+    DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+            DefaultContainerizedExternalSystem.builder()
+                    .fromContainer(
+                            new 
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+                                    .withNetworkAliases(KAFKA_HOSTNAME))
+                    .bindWithFlinkContainer(flink.getFlinkContainer())
+                    .build();
+
+    // Defines 2 External context Factories, so test cases will be invoked 
twice using these two
+    // kinds of external contexts.
+    @SuppressWarnings("unused")
+    @ExternalContextFactory
+    KafkaSingleTopicExternalContext.Factory singleTopic =
+            new KafkaSingleTopicExternalContext.Factory(kafka.getContainer());
+
+    @SuppressWarnings("unused")
+    @ExternalContextFactory
+    KafkaMultipleTopicExternalContext.Factory multipleTopic =
+            new 
KafkaMultipleTopicExternalContext.Factory(kafka.getContainer());
+}
diff --git 
a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
 
b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
index 89e5b58..521d055 100644
--- 
a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
+++ 
b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
@@ -38,3 +38,4 @@ flink-streaming-kafka-test-base
 flink-heavy-deployment-stress-test
 flink-elasticsearch5-test
 flink-high-parallelism-iterations-test
+flink-end-to-end-tests-common-kafka

Reply via email to