AHeise commented on a change in pull request #16590:
URL: https://github.com/apache/flink/pull/16590#discussion_r685814603



##########
File path: flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
##########
@@ -49,8 +49,15 @@ under the License.
                        <scope>compile</scope>
                </dependency>
                <dependency>
-                       <groupId>junit</groupId>
-                       <artifactId>junit</artifactId>
+                       <groupId>org.junit.jupiter</groupId>

Review comment:
       Don't forget to remove this PR once #16551 is merged. (It still uses 
migrationsupport)

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/TestResource.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common;
+
+/** Test resources. */
+public interface TestResource {

Review comment:
       Please add documentation to this and all other classes that are supposed 
to be used/extended by connector developers.

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.external;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * Context of the test interacting with external system.
+ *
+ * <p>This context is responsible for providing:
+ *
+ * <ul>
+ *   <li>Instance of sources connecting to external system
+ *   <li>{@link SourceSplitDataWriter} for creating splits and writing test 
data into the created
+ *       split
+ *   <li>Test data to write into the external system
+ * </ul>
+ *
+ * @param <T> Type of elements after deserialization by source
+ */
+public interface ExternalContext<T> extends Serializable, AutoCloseable {
+
+    /**
+     * Create a new instance of connector source implemented in {@link Source}.
+     *
+     * @return A new instance of Source
+     */
+    Source<T, ?, ?> createSource(Boundedness boundedness);
+
+    /**
+     * Create a new split in the external system.
+     *
+     * @return A data writer for the created split.
+     */
+    SourceSplitDataWriter<T> createSourceSplit();
+
+    /**
+     * Generate test data.
+     *
+     * @return Collection of generated test data.
+     */
+    Collection<T> generateTestData();

Review comment:
       Maybe add seed or even `Random` as a parameter? That could also help us 
to have reproducible tests in case of issues with data generation or handling 
of special cases.

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/TestResource.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common;
+
+/** Test resources. */
+public interface TestResource {
+
+    void startUp() throws Exception;
+
+    void tearDown() throws Exception;
+
+    boolean isStarted();

Review comment:
       Do we actually need this method? We could define `startUp` as idempotent 
and allow `tearDown` without `startUp`. `MiniClusterTestEnvironment` and 
`RemoteClusterTestEnvironment` don't really need it.

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ContainerizedExternalSystem.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.external;
+
+import org.testcontainers.containers.GenericContainer;
+
+/** External system using <a 
href="https://www.testcontainers.org/";>Testcontainers</a>. */
+public interface ContainerizedExternalSystem<C extends 
ContainerizedExternalSystem<?>>

Review comment:
       I think we should also provide some 
`AbstractContainerizedExternalSystem`. I see lots of things in 
`KafkaContainerizedExternalSystem` probably need to be replicated for other 
containers.
   
   Maybe it would even be better to just use one 
`DefaultContainerizedExternalSystem` and let the connectors use a builder to 
configure that. Except for `getBootstrapServer` the kafka container is not 
adding any specific behavior and that could probably also be moved in 
`ExternalContext`.
   I guess it depends on how you want to position the two concepts 
`ExternalSystem` and `ExternalContext`. I think a natural separation would be 
that `ExternalSystem` is only responsible for life-cycle and `ExternalContext` 
for anything that you can do with a live system.

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalSystem.java
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.external;
+
+import org.apache.flink.connectors.test.common.TestResource;
+
+/** General external system for end-to-end testing. */
+public interface ExternalSystem extends TestResource {}

Review comment:
       Link to annotation or else this interface looks quite useless.

##########
File path: flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
##########
@@ -151,6 +151,23 @@ under the License.
                                        </systemPropertyVariables>
                                </configuration>
                        </plugin>
+
+                       <!-- Skip dependency convergence check because of guava 
version -->
+                       <plugin>

Review comment:
       I'm assuming we still want to converge here or at least only exclude 
Guava.
   
   Could you please check if this legit? @zentol 
   

##########
File path: flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
##########
@@ -65,6 +65,11 @@ under the License.
                        <version>${project.version}</version>
                        <scope>compile</scope>
                </dependency>
+               <dependency>

Review comment:
       Could you please update the commit message?
   I'd also splits the container modifications and the test environments into 2 
commits.

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.external;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * Context of the test interacting with external system.
+ *
+ * <p>This context is responsible for providing:
+ *
+ * <ul>
+ *   <li>Instance of sources connecting to external system
+ *   <li>{@link SourceSplitDataWriter} for creating splits and writing test 
data into the created
+ *       split
+ *   <li>Test data to write into the external system
+ * </ul>
+ *
+ * @param <T> Type of elements after deserialization by source
+ */
+public interface ExternalContext<T> extends Serializable, AutoCloseable {
+
+    /**
+     * Create a new instance of connector source implemented in {@link Source}.
+     *
+     * @return A new instance of Source
+     */
+    Source<T, ?, ?> createSource(Boundedness boundedness);
+
+    /**
+     * Create a new split in the external system.
+     *
+     * @return A data writer for the created split.
+     */
+    SourceSplitDataWriter<T> createSourceSplit();

Review comment:
       `createSourceSplitDataWriter`? Or maybe it's actually more about writing 
partitions.
   `SourceSplit`s don't make any sense outside of the `Source`.

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({ConnectorTestingExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class SourceTestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SourceTestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+
+        try (CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test")) {
+            // Check test result
+            checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        }
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 4 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+
+        try (final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test")) {
+            // Check test result
+            checkMultipleSplitRecords(
+                    new MultipleSplitTestData(testRecordCollections), 
resultIterator);
+        }
+    }
+
+    /**
+     * Test connector source with an idle reader.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testIdleReader(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        try (CloseableIterator<T> resultIterator =
+                testEnv.createExecutionEnvironment()
+                        .fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber + 1)
+                        .executeAndCollect("Redundant Parallelism Test")) {
+            checkMultipleSplitRecords(
+                    new MultipleSplitTestData(testRecordCollections), 
resultIterator);
+        }
+    }
+
+    /**
+     * Test connector source with task manager failover.
+     *
+     * <p>This test will create 1 split in the external system, write test 
record set A into the
+     * split, restart task manager to trigger job failover, write test record 
set B into the split,
+     * and terminate the Flink job finally.
+     *
+     * <p>The number and order of records consumed by Flink should be 
identical to A before the
+     * failover and B after the failover in order to pass the test.
+     *
+     * <p>An unbounded source is required for this test, since TaskManager 
failover will be
+     * triggered in the middle of the test.
+     */
+    @Case
+    @Tag("failover")

Review comment:
       constant for tag?

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.kafka;
+
+import 
org.apache.flink.connector.kafka.source.testutils.KafkaContainerizedExternalSystem;
+import 
org.apache.flink.connector.kafka.source.testutils.KafkaMultipleTopicExternalContext;
+import 
org.apache.flink.connector.kafka.source.testutils.KafkaSingleTopicExternalContext;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithExternalContextFactory;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithExternalSystem;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithTestEnvironment;
+import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+
+/** Kafka E2E test based on connector testing framework. */
+public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> {
+
+    // Defines TestEnvironment
+    @WithTestEnvironment public FlinkContainerTestEnvironment flink;
+
+    // Defines ExternalSystem
+    @WithExternalSystem public KafkaContainerizedExternalSystem kafka;
+
+    // Defines 2 External context Factories, so test cases will be invoked 
twice using these two
+    // kinds of external contexts.
+    @WithExternalContextFactory public KafkaSingleTopicExternalContext.Factory 
singleTopic;
+    @WithExternalContextFactory public 
KafkaMultipleTopicExternalContext.Factory multipleTopic;
+
+    /** Instantiate and preparing test resources. */
+    public KafkaSourceE2ECase() {

Review comment:
       I'm wondering if initialization in ctor is really the best option (In 
general, I like to initialize as much as possible in ctor and make the fields 
final).
   
   We could also have an abstract method in `SourceTestSuiteBase` that needs to 
be overwritten. 

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.kafka;
+
+import 
org.apache.flink.connector.kafka.source.testutils.KafkaContainerizedExternalSystem;
+import 
org.apache.flink.connector.kafka.source.testutils.KafkaMultipleTopicExternalContext;
+import 
org.apache.flink.connector.kafka.source.testutils.KafkaSingleTopicExternalContext;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithExternalContextFactory;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithExternalSystem;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithTestEnvironment;
+import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+
+/** Kafka E2E test based on connector testing framework. */
+public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> {
+
+    // Defines TestEnvironment
+    @WithTestEnvironment public FlinkContainerTestEnvironment flink;

Review comment:
       Afaik these fields don't need to be public anymore in JUnit5. Could you 
double-check and adjust?

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/resources/META-INF/NOTICE
##########
@@ -0,0 +1,9 @@
+flink-end-to-end-test-common-kafka

Review comment:
       Are we bundling `kafka-clients` anywhere? (Do we take any part of the 
jar and put it into ours?)

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceSplitDataWriter.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.testutils;
+
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Properties;
+
+/** Source split data writer for writing test data into Kafka topic 
partitions. */
+public class KafkaSourceSplitDataWriter implements 
SourceSplitDataWriter<String> {

Review comment:
       Yay I think `PartitionDataWriter` and `KafkaParittionDataWriter` make 
more sense.

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/annotations/WithTestEnvironment.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.junit.annotations;
+
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks the field in test class defining {@link TestEnvironment}.
+ *
+ * <p>Only one field can be annotated as test environment in a test class.
+ *
+ * <p>The lifecycle of {@link TestEnvironment} will be PER-CLASS for 
performance, because launching
+ * and tearing down Flink cluster could be relatively a heavy operation.
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface WithTestEnvironment {}

Review comment:
       In the actual test, `WithTestEnvironment` reads weird to me. 
   `@WithTestEnvironment public FlinkContainerTestEnvironment flink;`
   I have also not found any popular extension that uses `With...` but I have 
also only limited experience.
   Ideally we would just call it `TestEnvironment`.
   `@TestEnvironment public FlinkContainerTestEnvironment flink;`
   However, that may be confusing. Another Option inspired by `@TempDir` would 
be
   `@TestEnv public FlinkContainerTestEnvironment flink;`
   
   I have the same concerns for the other annotations where I'd drop the `With`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
##########
@@ -197,6 +207,45 @@ public void testRedundantParallelism() throws Exception {
         executeAndVerify(env, stream);
     }
 
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class OnConnectorTestingFramework extends SourceTestSuiteBase<String> {

Review comment:
       The name here is weird. Could we simply call the class 
`IntegrationTests`?

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({ConnectorTestingExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class SourceTestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SourceTestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+
+        try (CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test")) {
+            // Check test result
+            checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        }
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 4 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+
+        try (final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test")) {
+            // Check test result
+            checkMultipleSplitRecords(
+                    new MultipleSplitTestData(testRecordCollections), 
resultIterator);
+        }
+    }
+
+    /**
+     * Test connector source with an idle reader.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testIdleReader(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        try (CloseableIterator<T> resultIterator =
+                testEnv.createExecutionEnvironment()
+                        .fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber + 1)
+                        .executeAndCollect("Redundant Parallelism Test")) {
+            checkMultipleSplitRecords(
+                    new MultipleSplitTestData(testRecordCollections), 
resultIterator);
+        }
+    }
+
+    /**
+     * Test connector source with task manager failover.
+     *
+     * <p>This test will create 1 split in the external system, write test 
record set A into the
+     * split, restart task manager to trigger job failover, write test record 
set B into the split,
+     * and terminate the Flink job finally.
+     *
+     * <p>The number and order of records consumed by Flink should be 
identical to A before the
+     * failover and B after the failover in order to pass the test.
+     *
+     * <p>An unbounded source is required for this test, since TaskManager 
failover will be
+     * triggered in the middle of the test.
+     */
+    @Case
+    @Tag("failover")
+    @DisplayName("Test TaskManager failure")
+    public void testTaskManagerFailure(
+            TestEnvironment testEnv,
+            ExternalContext<T> externalContext,
+            ClusterControllable controller)
+            throws Exception {
+
+        final Collection<T> testRecordsBeforeFailure = 
externalContext.generateTestData();
+        final SourceSplitDataWriter<T> sourceSplitDataWriter = 
externalContext.createSourceSplit();
+        sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
+
+        final StreamExecutionEnvironment env = 
testEnv.createExecutionEnvironment();
+
+        env.enableCheckpointing(50);
+        final DataStreamSource<T> dataStreamSource =
+                env.fromSource(
+                                
externalContext.createSource(Boundedness.CONTINUOUS_UNBOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1);
+
+        // Since DataStream API doesn't expose job client for 
executeAndCollect(), we have
+        // to reuse these part of code to get both job client and result 
iterator :-(
+        // ------------------------------------ START 
---------------------------------------------
+        TypeSerializer<T> serializer = 
dataStreamSource.getType().createSerializer(env.getConfig());
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectSinkOperatorFactory<T> factory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
factory.getOperator();
+        CollectResultIterator<T> iterator =
+                new CollectResultIterator<>(
+                        operator.getOperatorIdFuture(),
+                        serializer,
+                        accumulatorName,
+                        env.getCheckpointConfig());
+        CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, 
factory);
+        sink.name("Data stream collect sink");
+        env.addOperator(sink.getTransformation());
+        final JobClient jobClient = env.executeAsync("TaskManager Failover 
Test");
+        iterator.setJobClient(jobClient);
+        // -------------------------------------- END 
---------------------------------------------
+
+        checkSingleSplitRecords(
+                testRecordsBeforeFailure.iterator(), iterator, 
testRecordsBeforeFailure.size());
+
+        // -------------------------------- Trigger failover 
---------------------------------------
+        controller.triggerTaskManagerFailover(jobClient, () -> {});
+
+        CommonTestUtils.waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.RUNNING),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+
+        final Collection<T> testRecordsAfterFailure = 
externalContext.generateTestData();
+        sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);
+        checkSingleSplitRecords(
+                testRecordsAfterFailure.iterator(), iterator, 
testRecordsAfterFailure.size());
+
+        iterator.close();
+        CommonTestUtils.terminateJob(jobClient, Duration.ofSeconds(30));
+        CommonTestUtils.waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.CANCELED),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+    }
+
+    // ----------------------------- Helper Functions 
---------------------------------
+
+    /**
+     * Generate a set of test records and write it to the given split writer.
+     *
+     * @param externalContext External context
+     * @return Collection of generated test records
+     */
+    protected Collection<T> generateAndWriteTestData(ExternalContext<T> 
externalContext) {
+        final Collection<T> testRecordCollection = 
externalContext.generateTestData();
+        LOG.debug("Writing {} records to external system", 
testRecordCollection.size());
+        externalContext.createSourceSplit().writeRecords(testRecordCollection);
+        return testRecordCollection;
+    }
+
+    /**
+     * Check if records consumed by Flink are identical to test records 
written into the single
+     * split within the limit of record number.
+     *
+     * @param testRecordIterator Iterator of test records
+     * @param resultRecordIterator Iterator of result records consumed by Flink
+     * @param limit Number of records to check
+     */
+    protected void checkSingleSplitRecords(
+            Iterator<T> testRecordIterator, Iterator<T> resultRecordIterator, 
int limit) {
+        for (int i = 0; i < limit; i++) {
+            T testRecord = testRecordIterator.next();
+            T resultRecord = resultRecordIterator.next();
+            assertEquals(testRecord, resultRecord);
+        }
+        LOG.debug("{} records are validated", limit);
+    }
+
+    /**
+     * Check if records consumed by Flink are identical to test records 
written into the single
+     * split.
+     *
+     * @param testRecordIterator Iterator of test records
+     * @param resultRecordIterator Iterator of result records consumed by Flink
+     */
+    protected void checkSingleSplitRecords(
+            Iterator<T> testRecordIterator, Iterator<T> resultRecordIterator) {
+        int recordCounter = 0;
+
+        while (testRecordIterator.hasNext()) {
+            T testRecord = testRecordIterator.next();
+            T resultRecord = resultRecordIterator.next();
+            assertEquals(testRecord, resultRecord);
+            recordCounter++;
+        }
+
+        assertFalse(resultRecordIterator.hasNext());
+        LOG.debug("{} records are validated", recordCounter);
+    }
+
+    /**
+     * Check if records consumed by Flink is identical to test records written 
into multiple splits.
+     *
+     * <p>The order of records across different splits can be arbitrary, but 
should be identical
+     * within a split.
+     *
+     * @param testData Test data in all splits
+     * @param resultRecordIterator Iterator of result records consumed by Flink
+     */
+    protected void checkMultipleSplitRecords(
+            MultipleSplitTestData testData, Iterator<T> resultRecordIterator) {
+        int recordCounter = 0;
+
+        while (resultRecordIterator.hasNext()) {
+            testData.matchThenNext(resultRecordIterator.next());
+            recordCounter++;
+        }
+
+        assertTrue(
+                testData.hasReachedEnd(),
+                "Records received by Flink is less than records sent to 
external system.");
+
+        LOG.debug("{} records are validated", recordCounter);
+    }
+
+    /**
+     * Wrapper class for validating test data in multiple splits.
+     *
+     * <p>Each collection has a pointer (iterator) pointing to current 
checking record. When a
+     * record is received in the stream, it will be compared to all current 
pointing records in
+     * collections, and the pointer to the identical record will move forward.
+     *
+     * <p>If the stream preserves the correctness and order of records in all 
splits, all pointers
+     * should reach the end of the collection finally.
+     */
+    public class MultipleSplitTestData {

Review comment:
       I think this should be a matcher instead. Then we could have
   `assertThat(resultIterator, matchesSplits(testRecordCollections));`
   If you change it, then it would be nice to have it on top-level for easier 
reuse.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaContainerizedExternalSystem.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.testutils;
+
+import 
org.apache.flink.connectors.test.common.external.ContainerizedExternalSystem;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Properties;
+
+/**
+ * Kafka external system based on {@link KafkaContainer} from <a
+ * href="https://www.testcontainers.org/";>Testcontainers</a>.
+ *
+ * <p>This external system is also integrated with a Kafka admin client for 
topic management.
+ */
+public class KafkaContainerizedExternalSystem
+        implements 
ContainerizedExternalSystem<KafkaContainerizedExternalSystem> {
+
+    private GenericContainer<?> flink;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(KafkaContainerizedExternalSystem.class);
+
+    // Kafka container and its related AdminClient
+    private final KafkaContainer kafka;
+    private AdminClient kafkaAdminClient;

Review comment:
       This seems completely unused.

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({ConnectorTestingExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class SourceTestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SourceTestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+
+        try (CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test")) {
+            // Check test result
+            checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        }
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 4 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+
+        try (final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test")) {
+            // Check test result
+            checkMultipleSplitRecords(
+                    new MultipleSplitTestData(testRecordCollections), 
resultIterator);
+        }
+    }
+
+    /**
+     * Test connector source with an idle reader.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testIdleReader(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        try (CloseableIterator<T> resultIterator =
+                testEnv.createExecutionEnvironment()
+                        .fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber + 1)
+                        .executeAndCollect("Redundant Parallelism Test")) {
+            checkMultipleSplitRecords(
+                    new MultipleSplitTestData(testRecordCollections), 
resultIterator);
+        }
+    }
+
+    /**
+     * Test connector source with task manager failover.
+     *
+     * <p>This test will create 1 split in the external system, write test 
record set A into the
+     * split, restart task manager to trigger job failover, write test record 
set B into the split,
+     * and terminate the Flink job finally.
+     *
+     * <p>The number and order of records consumed by Flink should be 
identical to A before the
+     * failover and B after the failover in order to pass the test.
+     *
+     * <p>An unbounded source is required for this test, since TaskManager 
failover will be
+     * triggered in the middle of the test.
+     */
+    @Case
+    @Tag("failover")
+    @DisplayName("Test TaskManager failure")
+    public void testTaskManagerFailure(
+            TestEnvironment testEnv,
+            ExternalContext<T> externalContext,
+            ClusterControllable controller)
+            throws Exception {
+
+        final Collection<T> testRecordsBeforeFailure = 
externalContext.generateTestData();
+        final SourceSplitDataWriter<T> sourceSplitDataWriter = 
externalContext.createSourceSplit();
+        sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
+
+        final StreamExecutionEnvironment env = 
testEnv.createExecutionEnvironment();
+
+        env.enableCheckpointing(50);
+        final DataStreamSource<T> dataStreamSource =
+                env.fromSource(
+                                
externalContext.createSource(Boundedness.CONTINUOUS_UNBOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1);
+
+        // Since DataStream API doesn't expose job client for 
executeAndCollect(), we have
+        // to reuse these part of code to get both job client and result 
iterator :-(
+        // ------------------------------------ START 
---------------------------------------------
+        TypeSerializer<T> serializer = 
dataStreamSource.getType().createSerializer(env.getConfig());
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectSinkOperatorFactory<T> factory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
factory.getOperator();
+        CollectResultIterator<T> iterator =
+                new CollectResultIterator<>(
+                        operator.getOperatorIdFuture(),
+                        serializer,
+                        accumulatorName,
+                        env.getCheckpointConfig());
+        CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, 
factory);
+        sink.name("Data stream collect sink");
+        env.addOperator(sink.getTransformation());
+        final JobClient jobClient = env.executeAsync("TaskManager Failover 
Test");
+        iterator.setJobClient(jobClient);
+        // -------------------------------------- END 
---------------------------------------------
+
+        checkSingleSplitRecords(
+                testRecordsBeforeFailure.iterator(), iterator, 
testRecordsBeforeFailure.size());
+
+        // -------------------------------- Trigger failover 
---------------------------------------
+        controller.triggerTaskManagerFailover(jobClient, () -> {});
+
+        CommonTestUtils.waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.RUNNING),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+
+        final Collection<T> testRecordsAfterFailure = 
externalContext.generateTestData();
+        sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);
+        checkSingleSplitRecords(
+                testRecordsAfterFailure.iterator(), iterator, 
testRecordsAfterFailure.size());
+
+        iterator.close();
+        CommonTestUtils.terminateJob(jobClient, Duration.ofSeconds(30));
+        CommonTestUtils.waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.CANCELED),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+    }
+
+    // ----------------------------- Helper Functions 
---------------------------------
+
+    /**
+     * Generate a set of test records and write it to the given split writer.
+     *
+     * @param externalContext External context
+     * @return Collection of generated test records
+     */
+    protected Collection<T> generateAndWriteTestData(ExternalContext<T> 
externalContext) {
+        final Collection<T> testRecordCollection = 
externalContext.generateTestData();
+        LOG.debug("Writing {} records to external system", 
testRecordCollection.size());
+        externalContext.createSourceSplit().writeRecords(testRecordCollection);
+        return testRecordCollection;
+    }
+
+    /**
+     * Check if records consumed by Flink are identical to test records 
written into the single
+     * split within the limit of record number.
+     *
+     * @param testRecordIterator Iterator of test records
+     * @param resultRecordIterator Iterator of result records consumed by Flink
+     * @param limit Number of records to check
+     */
+    protected void checkSingleSplitRecords(
+            Iterator<T> testRecordIterator, Iterator<T> resultRecordIterator, 
int limit) {
+        for (int i = 0; i < limit; i++) {
+            T testRecord = testRecordIterator.next();
+            T resultRecord = resultRecordIterator.next();
+            assertEquals(testRecord, resultRecord);
+        }
+        LOG.debug("{} records are validated", limit);
+    }
+
+    /**
+     * Check if records consumed by Flink are identical to test records 
written into the single
+     * split.
+     *
+     * @param testRecordIterator Iterator of test records
+     * @param resultRecordIterator Iterator of result records consumed by Flink
+     */
+    protected void checkSingleSplitRecords(
+            Iterator<T> testRecordIterator, Iterator<T> resultRecordIterator) {
+        int recordCounter = 0;
+
+        while (testRecordIterator.hasNext()) {
+            T testRecord = testRecordIterator.next();
+            T resultRecord = resultRecordIterator.next();
+            assertEquals(testRecord, resultRecord);
+            recordCounter++;
+        }
+
+        assertFalse(resultRecordIterator.hasNext());
+        LOG.debug("{} records are validated", recordCounter);
+    }
+
+    /**
+     * Check if records consumed by Flink is identical to test records written 
into multiple splits.
+     *
+     * <p>The order of records across different splits can be arbitrary, but 
should be identical
+     * within a split.
+     *
+     * @param testData Test data in all splits
+     * @param resultRecordIterator Iterator of result records consumed by Flink
+     */
+    protected void checkMultipleSplitRecords(
+            MultipleSplitTestData testData, Iterator<T> resultRecordIterator) {
+        int recordCounter = 0;
+
+        while (resultRecordIterator.hasNext()) {
+            testData.matchThenNext(resultRecordIterator.next());
+            recordCounter++;
+        }
+
+        assertTrue(
+                testData.hasReachedEnd(),
+                "Records received by Flink is less than records sent to 
external system.");
+
+        LOG.debug("{} records are validated", recordCounter);
+    }
+
+    /**
+     * Wrapper class for validating test data in multiple splits.
+     *
+     * <p>Each collection has a pointer (iterator) pointing to current 
checking record. When a
+     * record is received in the stream, it will be compared to all current 
pointing records in
+     * collections, and the pointer to the identical record will move forward.
+     *
+     * <p>If the stream preserves the correctness and order of records in all 
splits, all pointers
+     * should reach the end of the collection finally.
+     */
+    public class MultipleSplitTestData {

Review comment:
       static




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to