AHeise commented on a change in pull request #16590:
URL: https://github.com/apache/flink/pull/16590#discussion_r679265526



##########
File path: flink-test-utils-parent/flink-connector-testing-framework/pom.xml
##########
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-test-utils-parent</artifactId>
+               <version>1.14-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-testing-framework_${scala.binary.version}</artifactId>
+       <name>Flink : Test utils : Testing Framework</name>
+
+       <properties>
+               <testcontainers.version>1.15.1</testcontainers.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>testcontainers</artifactId>
+                       <version>${testcontainers.version}</version>
+               </dependency>
+
+               <!-- JUnit 5 -->
+               <dependency>
+                       <groupId>org.junit.jupiter</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>compile</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <dependencyManagement>
+               <dependencies>
+                       <!-- We pick an arbitrary version of 
net.java.dev.jna:jna to satisfy dependency
+                                convergence for 
org.testcontainers:testcontainers which transitively depends on
+                                two different versions.-->
+                       <dependency>
+                               <groupId>net.java.dev.jna</groupId>
+                               <artifactId>jna</artifactId>
+                               <version>5.5.0</version>
+                       </dependency>
+               </dependencies>
+       </dependencyManagement>
+
+       <profiles>
+               <profile>
+                       <!--
+                               This profile is for building Docker image based 
on latest Flink code.
+                               Docker environment should be installed before 
using this profile.
+                               To trigger this profile, use 
"-Pbuild-flink-image" option when running maven.
+                       -->
+                       <id>build-flink-image</id>

Review comment:
       I'm not sure if this is the correct project for that. I'm assuming that 
this is for `FlinkContainer`. Do we have a better way to (automatically) create 
the docker image? @zentol 

##########
File path: flink-test-utils-parent/flink-connector-testing-framework/pom.xml
##########
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-test-utils-parent</artifactId>
+               <version>1.14-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-testing-framework_${scala.binary.version}</artifactId>

Review comment:
       I'd probably remove the framework from the name. We have a lot of other 
(testing) frameworks without the explicit lateral in the name.

##########
File path: flink-test-utils-parent/flink-connector-testing-framework/pom.xml
##########
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-test-utils-parent</artifactId>
+               <version>1.14-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-testing-framework_${scala.binary.version}</artifactId>
+       <name>Flink : Test utils : Testing Framework</name>
+
+       <properties>
+               <testcontainers.version>1.15.1</testcontainers.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>testcontainers</artifactId>
+                       <version>${testcontainers.version}</version>
+               </dependency>
+
+               <!-- JUnit 5 -->
+               <dependency>
+                       <groupId>org.junit.jupiter</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>compile</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <dependencyManagement>

Review comment:
       Does it make sense to have that convergence in the root pom? It feels 
like we have more and more projects that depend on testcontainer and they could 
all use that.

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/utils/FlinkContainers.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.connectors.test.common.TestResource;
+
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Flink cluster running on <a 
href="https://www.testcontainers.org/";>Testcontainers</a>.
+ *
+ * <p>This cluster is integrated with components below:
+ * <li>Job manager and task managers
+ * <li>REST cluster client for job status tracking
+ * <li>Workspace directory for storing files generated in flink jobs
+ * <li>Checkpoint directory for storing checkpoints
+ * <li>Job directory for storing Flink job JAR files
+ */
+public class FlinkContainers implements TestResource {

Review comment:
       Is there a particular reason why we cannot reuse 
`org.apache.flink.tests.util.flink.FlinkContainer`?

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/environment/ClusterControllable.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.environment;
+
+import org.apache.flink.core.execution.JobClient;
+
+/** Interface for triggering failover in a Flink cluster. */
+public interface ClusterControllable {

Review comment:
       Wouldn't that be something that we should offer outside of the source 
testing framework? I'm also fine with moving it later.

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.environment;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+/** Test environment for running jobs on Flink mini-cluster. */
+public class MiniClusterTestEnvironment implements TestEnvironment, 
ClusterControllable {
+
+    private final MiniClusterWithClientResource miniCluster;
+
+    private int latestTMIndex = 0;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MiniClusterTestEnvironment.class);
+
+    private boolean isStarted = false;
+
+    public MiniClusterTestEnvironment() {
+        this.miniCluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(6)
+                                
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+    }
+
+    @Override
+    public StreamExecutionEnvironment createExecutionEnvironment() {
+        return StreamExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    @Override
+    public void triggerJobManagerFailover(JobClient jobClient, Runnable 
afterFailAction)
+            throws ExecutionException, InterruptedException {
+        final Optional<HaLeadershipControl> controlOptional =
+                miniCluster.getMiniCluster().getHaLeadershipControl();
+        if (!controlOptional.isPresent()) {
+            throw new UnsupportedOperationException(
+                    "This MiniCluster does not support JobManager HA");
+        }
+        final HaLeadershipControl haLeadershipControl = controlOptional.get();
+        
haLeadershipControl.revokeJobMasterLeadership(jobClient.getJobID()).get();
+        afterFailAction.run();
+        
haLeadershipControl.grantJobMasterLeadership(jobClient.getJobID()).get();
+    }
+
+    @Override
+    public void triggerTaskManagerFailover(JobClient jobClient, Runnable 
afterFailAction)
+            throws Exception {
+        terminateTaskManager();
+        JobStatusUtils.waitForJobStatus(
+                jobClient,
+                Arrays.asList(JobStatus.FAILING, JobStatus.FAILED, 
JobStatus.RESTARTING),
+                Duration.ofSeconds(30));
+        afterFailAction.run();
+        startTaskManager();
+    }
+
+    @Override
+    public void isolateNetwork(JobClient jobClient, Runnable afterFailAction) {
+        throw new UnsupportedOperationException("Cannot isolate network in a 
MiniCluster");
+    }

Review comment:
       I like the way to induce errors. That would be handy for all kinds of 
tests. I could also think that we could bake it into `MiniCluster`.

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/TestLoggerExtension.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.junit.extensions;
+
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+/** A JUnit-5-style test logger. */
+public class TestLoggerExtension implements TestWatcher, BeforeEachCallback {

Review comment:
       We should probably go with a global `TestExecutionListener`, see 
https://junit.org/junit5/docs/current/user-guide/#launcher-api-listeners-custom 
. We can defer it to fat junit5 refactor commit.

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test");
+
+        // Check test result
+        checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism.

Review comment:
       `Flink job with 4 parallelism`

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case

Review comment:
       Shouldn't we stick to `@Test` and just annotate `TestSuiteBase` with the 
extension? Is there any benefit in treating test with injected parameters 
different from test without such injections?

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test");
+
+        // Check test result
+        checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test");
+
+        // Check test result
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with a redundant parallelism.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testRedundantParallelism(
+            TestEnvironment testEnv, ExternalContext<T> externalContext) 
throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        final CloseableIterator<T> resultIterator =
+                testEnv.createExecutionEnvironment()
+                        .fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber + 1)
+                        .executeAndCollect("Redundant Parallelism Test");
+
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+    }
+
+    /**
+     * Test connector source with task manager failover.
+     *
+     * <p>This test will create 1 split in the external system, write test 
record set A into the
+     * split, restart task manager to trigger job failover, write test record 
set B into the split,
+     * and terminate the Flink job finally.
+     *
+     * <p>The number and order of records consumed by Flink should be 
identical to A before the
+     * failover and B after the failover in order to pass the test.
+     *
+     * <p>An unbounded source is required for this test, since TaskManager 
failover will be
+     * triggered in the middle of the test.
+     */
+    @Case
+    @Tag("failover")
+    @DisplayName("Test TaskManager failure")
+    public void testTaskManagerFailure(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        checkEnvironmentIsControllable(testEnv);
+
+        final Collection<T> testRecordsBeforeFailure = 
externalContext.generateTestData();
+        final SourceSplitDataWriter<T> sourceSplitDataWriter = 
externalContext.createSourceSplit();
+        sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
+
+        final StreamExecutionEnvironment env = 
testEnv.createExecutionEnvironment();
+
+        env.enableCheckpointing(50);
+        final DataStreamSource<T> dataStreamSource =
+                env.fromSource(
+                                
externalContext.createSource(Boundedness.CONTINUOUS_UNBOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1);
+
+        // Since DataStream API doesn't expose job client for 
executeAndCollect(), we have
+        // to reuse these part of code to get both job client and result 
iterator :-(
+        // ------------------------------------ START 
---------------------------------------------
+        TypeSerializer<T> serializer = 
dataStreamSource.getType().createSerializer(env.getConfig());
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectSinkOperatorFactory<T> factory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
factory.getOperator();
+        CollectResultIterator<T> iterator =
+                new CollectResultIterator<>(
+                        operator.getOperatorIdFuture(),
+                        serializer,
+                        accumulatorName,
+                        env.getCheckpointConfig());
+        CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, 
factory);
+        sink.name("Data stream collect sink");
+        env.addOperator(sink.getTransformation());
+        final JobClient jobClient = env.executeAsync("TaskManager Failover 
Test");
+        iterator.setJobClient(jobClient);
+        // -------------------------------------- END 
---------------------------------------------

Review comment:
       I don't get why you can't use `DataStream#executeAndCollect`. Could you 
elaborate?

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/utils/ConnectorJarUtils.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Objects;
+
+/** Utilities for searching connector JARs. */
+public class ConnectorJarUtils {
+
+    private static File searchedJarFile = null;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConnectorJarUtils.class);
+
+    /**
+     * Search JAR file in target folder.
+     *
+     * @return JAR file
+     * @throws FileNotFoundException if JAR file is not found in target folder
+     */
+    public static File searchConnectorJar() throws FileNotFoundException {

Review comment:
       I think we have similar functionality already 
[here](https://github.com/apache/flink/blob/e32f0f82164512f632a533fad01ead6a12ac8152/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceSetup.java#L75-L75).
 However, I'm not sure if the spec can be also applied to the `FlinkContainer`. 
@zentol ?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/e2e/resources/KafkaSingleTopicExternalContext.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.e2e.resources;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Kafka external context that will create only one topic and use partitions 
in that topic as
+ * source splits.
+ */
+public class KafkaSingleTopicExternalContext implements 
ExternalContext<String> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(KafkaSingleTopicExternalContext.class);
+
+    protected String bootstrapServers;
+    private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
+
+    private final String topicName;
+
+    private static final int DEFAULT_TIMEOUT = 30;
+
+    private final Map<Integer, SourceSplitDataWriter<String>> 
partitionToSplitWriter =
+            new HashMap<>();
+
+    private int numSplits = 0;
+
+    protected final AdminClient kafkaAdminClient;
+
+    public KafkaSingleTopicExternalContext(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+        this.topicName =
+                TOPIC_NAME_PREFIX + "-" + 
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        kafkaAdminClient = createAdminClient();
+    }
+
+    protected void createTopic(String topicName, int numPartitions, short 
replicationFactor) {
+        LOG.debug(
+                "Creating new Kafka topic {} with {} partitions and {} 
replicas",
+                topicName,
+                numPartitions,
+                replicationFactor);
+        NewTopic newTopic = new NewTopic(topicName, numPartitions, 
replicationFactor);
+        try {
+            kafkaAdminClient
+                    .createTopics(Collections.singletonList(newTopic))
+                    .all()
+                    .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Cannot create topic 
'%s'", topicName), e);
+        }
+    }
+
+    protected void deleteTopic(String topicName) {
+        LOG.debug("Deleting Kafka topic {}", topicName);
+        try {
+            kafkaAdminClient
+                    .deleteTopics(Collections.singletonList(topicName))
+                    .all()
+                    .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            if (ExceptionUtils.getRootCause(e) instanceof 
UnknownTopicOrPartitionException) {
+                throw new RuntimeException(String.format("Cannot delete topic 
'%s'", topicName), e);
+            }
+        }
+    }

Review comment:
       Can we get rid of the replication in respect to 
`KafkaContainerizedExternalSystem`?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/e2e/resources/KafkaContainerizedExternalSystem.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.e2e.resources;
+
+import 
org.apache.flink.connectors.test.common.external.ContainerizedExternalSystem;
+import org.apache.flink.connectors.test.common.utils.FlinkContainers;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Kafka external system based on {@link KafkaContainer} from <a
+ * href="https://www.testcontainers.org/";>Testcontainers</a>.
+ *
+ * <p>This external system is also integrated with a Kafka admin client for 
topic management.
+ */
+public class KafkaContainerizedExternalSystem
+        implements 
ContainerizedExternalSystem<KafkaContainerizedExternalSystem> {

Review comment:
       I'd convert `ContainerizedExternalSystem` into a abstract base class 
where the docker specific stuff and life-cycle is already implemented.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/e2e/KafkaSourceE2ECase.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.e2e;
+
+import 
org.apache.flink.connector.kafka.source.e2e.resources.KafkaContainerizedExternalSystem;
+import 
org.apache.flink.connector.kafka.source.e2e.resources.KafkaMultipleTopicExternalContext;
+import 
org.apache.flink.connector.kafka.source.e2e.resources.KafkaSingleTopicExternalContext;
+import 
org.apache.flink.connectors.test.common.environment.FlinkContainersTestEnvironment;
+import 
org.apache.flink.connectors.test.common.environment.MiniClusterTestEnvironment;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithExternalContextFactory;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithExternalSystem;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithTestEnvironment;
+import org.apache.flink.connectors.test.common.testsuites.TestSuiteBase;
+import org.apache.flink.connectors.test.common.utils.ConnectorJarUtils;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Tag;
+
+/**
+ * End-to-end test for {@link 
org.apache.flink.connector.kafka.source.KafkaSource} using testing
+ * framework, based on JUnit 5.
+ *
+ * <p>This class uses {@link Nested} classes to test functionality of 
KafkaSource under different
+ * Flink environments (MiniCluster and Containers). Each nested class extends 
{@link TestSuiteBase}
+ * for reusing test cases already defined by testing framework.
+ */
+@Disabled
+@Tag("TestingFramework")
+@DisplayName("Kafka Source E2E Test")
+public class KafkaSourceE2ECase {
+
+    @Nested
+    @DisplayName("On MiniCluster")
+    class OnMiniCluster extends TestSuiteBase<String> {

Review comment:
       I'd probably rather go for a parameterized method that supplies a 
collection of TestEnvironments. Then we could even use a different set of 
`TestEnvironments` depending on where it's executed. It's also significantly 
fewer LOC.

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test");
+
+        // Check test result
+        checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test");
+
+        // Check test result
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with a redundant parallelism.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testRedundantParallelism(
+            TestEnvironment testEnv, ExternalContext<T> externalContext) 
throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        final CloseableIterator<T> resultIterator =
+                testEnv.createExecutionEnvironment()
+                        .fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber + 1)
+                        .executeAndCollect("Redundant Parallelism Test");
+
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+    }
+
+    /**
+     * Test connector source with task manager failover.
+     *
+     * <p>This test will create 1 split in the external system, write test 
record set A into the
+     * split, restart task manager to trigger job failover, write test record 
set B into the split,
+     * and terminate the Flink job finally.
+     *
+     * <p>The number and order of records consumed by Flink should be 
identical to A before the
+     * failover and B after the failover in order to pass the test.
+     *
+     * <p>An unbounded source is required for this test, since TaskManager 
failover will be
+     * triggered in the middle of the test.
+     */
+    @Case
+    @Tag("failover")
+    @DisplayName("Test TaskManager failure")
+    public void testTaskManagerFailure(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        checkEnvironmentIsControllable(testEnv);
+
+        final Collection<T> testRecordsBeforeFailure = 
externalContext.generateTestData();
+        final SourceSplitDataWriter<T> sourceSplitDataWriter = 
externalContext.createSourceSplit();
+        sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
+
+        final StreamExecutionEnvironment env = 
testEnv.createExecutionEnvironment();
+
+        env.enableCheckpointing(50);
+        final DataStreamSource<T> dataStreamSource =
+                env.fromSource(
+                                
externalContext.createSource(Boundedness.CONTINUOUS_UNBOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1);
+
+        // Since DataStream API doesn't expose job client for 
executeAndCollect(), we have
+        // to reuse these part of code to get both job client and result 
iterator :-(
+        // ------------------------------------ START 
---------------------------------------------
+        TypeSerializer<T> serializer = 
dataStreamSource.getType().createSerializer(env.getConfig());
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectSinkOperatorFactory<T> factory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
factory.getOperator();
+        CollectResultIterator<T> iterator =
+                new CollectResultIterator<>(
+                        operator.getOperatorIdFuture(),
+                        serializer,
+                        accumulatorName,
+                        env.getCheckpointConfig());
+        CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, 
factory);
+        sink.name("Data stream collect sink");
+        env.addOperator(sink.getTransformation());
+        final JobClient jobClient = env.executeAsync("TaskManager Failover 
Test");
+        iterator.setJobClient(jobClient);
+        // -------------------------------------- END 
---------------------------------------------
+
+        checkSingleSplitRecords(
+                testRecordsBeforeFailure.iterator(), iterator, 
testRecordsBeforeFailure.size());
+
+        // -------------------------------- Trigger failover 
---------------------------------------
+        final ClusterControllable controller = (ClusterControllable) testEnv;
+        controller.triggerTaskManagerFailover(jobClient, () -> {});
+
+        JobStatusUtils.waitForJobStatus(
+                jobClient, Collections.singletonList(JobStatus.RUNNING), 
Duration.ofSeconds(30));
+
+        final Collection<T> testRecordsAfterFailure = 
externalContext.generateTestData();
+        sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);
+        checkSingleSplitRecords(
+                testRecordsAfterFailure.iterator(), iterator, 
testRecordsAfterFailure.size());
+
+        iterator.close();
+        JobStatusUtils.terminateJob(jobClient, Duration.ofSeconds(30));
+        JobStatusUtils.waitForJobStatus(
+                jobClient, Collections.singletonList(JobStatus.CANCELED), 
Duration.ofSeconds(30));
+    }
+
+    // ----------------------------- Helper Functions 
---------------------------------
+
+    /**
+     * Generate a set of test records and write it to the given split writer.
+     *
+     * @param externalContext External context
+     * @return Collection of generated test records
+     */
+    protected Collection<T> generateAndWriteTestData(ExternalContext<T> 
externalContext) {
+        final Collection<T> testRecordCollection = 
externalContext.generateTestData();
+        LOG.debug("Writing {} records to external system", 
testRecordCollection.size());
+        externalContext.createSourceSplit().writeRecords(testRecordCollection);
+        return testRecordCollection;
+    }
+
+    /**
+     * Check if the given test environment is controllable (can trigger 
failover or network
+     * isolation).
+     *
+     * @param testEnvironment Test environment being checked
+     * @throws IllegalArgumentException if the test environment is not 
controllable
+     */
+    protected void checkEnvironmentIsControllable(TestEnvironment 
testEnvironment)

Review comment:
       Not necessary if you make the controller an explicit parameter.

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/utils/JobStatusUtils.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.utils;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.execution.JobClient;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/** Helper components for checking Flink job status. */
+public class JobStatusUtils {

Review comment:
       We can probably reuse some parts of 
`org.apache.flink.runtime.testutils.CommonTestUtils` and even move the 
functions to it.

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test");
+
+        // Check test result
+        checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test");
+
+        // Check test result
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with a redundant parallelism.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testRedundantParallelism(

Review comment:
       `testIdleReader`

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test");
+
+        // Check test result
+        checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test");
+
+        // Check test result
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with a redundant parallelism.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testRedundantParallelism(
+            TestEnvironment testEnv, ExternalContext<T> externalContext) 
throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        final CloseableIterator<T> resultIterator =
+                testEnv.createExecutionEnvironment()
+                        .fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber + 1)
+                        .executeAndCollect("Redundant Parallelism Test");
+
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+    }
+
+    /**
+     * Test connector source with task manager failover.
+     *
+     * <p>This test will create 1 split in the external system, write test 
record set A into the
+     * split, restart task manager to trigger job failover, write test record 
set B into the split,
+     * and terminate the Flink job finally.
+     *
+     * <p>The number and order of records consumed by Flink should be 
identical to A before the
+     * failover and B after the failover in order to pass the test.
+     *
+     * <p>An unbounded source is required for this test, since TaskManager 
failover will be
+     * triggered in the middle of the test.

Review comment:
       We probably should add a way for users to explicitly disable test on 
bounded/unbounded jobs by a simple annotation. (e.g. a source doesn't support 
different boundedness)

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test");
+
+        // Check test result
+        checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =

Review comment:
       try around resultIterator - for all tests.

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {

Review comment:
       `SourceTestSuiteBase`.

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test");
+
+        // Check test result
+        checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test");
+
+        // Check test result
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with a redundant parallelism.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testRedundantParallelism(
+            TestEnvironment testEnv, ExternalContext<T> externalContext) 
throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        final CloseableIterator<T> resultIterator =
+                testEnv.createExecutionEnvironment()
+                        .fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber + 1)
+                        .executeAndCollect("Redundant Parallelism Test");
+
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+    }
+
+    /**
+     * Test connector source with task manager failover.
+     *
+     * <p>This test will create 1 split in the external system, write test 
record set A into the
+     * split, restart task manager to trigger job failover, write test record 
set B into the split,
+     * and terminate the Flink job finally.
+     *
+     * <p>The number and order of records consumed by Flink should be 
identical to A before the
+     * failover and B after the failover in order to pass the test.
+     *
+     * <p>An unbounded source is required for this test, since TaskManager 
failover will be
+     * triggered in the middle of the test.
+     */
+    @Case
+    @Tag("failover")
+    @DisplayName("Test TaskManager failure")
+    public void testTaskManagerFailure(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        checkEnvironmentIsControllable(testEnv);
+
+        final Collection<T> testRecordsBeforeFailure = 
externalContext.generateTestData();
+        final SourceSplitDataWriter<T> sourceSplitDataWriter = 
externalContext.createSourceSplit();
+        sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
+
+        final StreamExecutionEnvironment env = 
testEnv.createExecutionEnvironment();
+
+        env.enableCheckpointing(50);
+        final DataStreamSource<T> dataStreamSource =
+                env.fromSource(
+                                
externalContext.createSource(Boundedness.CONTINUOUS_UNBOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1);
+
+        // Since DataStream API doesn't expose job client for 
executeAndCollect(), we have
+        // to reuse these part of code to get both job client and result 
iterator :-(
+        // ------------------------------------ START 
---------------------------------------------
+        TypeSerializer<T> serializer = 
dataStreamSource.getType().createSerializer(env.getConfig());
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectSinkOperatorFactory<T> factory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
factory.getOperator();
+        CollectResultIterator<T> iterator =
+                new CollectResultIterator<>(
+                        operator.getOperatorIdFuture(),
+                        serializer,
+                        accumulatorName,
+                        env.getCheckpointConfig());
+        CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, 
factory);
+        sink.name("Data stream collect sink");
+        env.addOperator(sink.getTransformation());
+        final JobClient jobClient = env.executeAsync("TaskManager Failover 
Test");
+        iterator.setJobClient(jobClient);
+        // -------------------------------------- END 
---------------------------------------------
+
+        checkSingleSplitRecords(
+                testRecordsBeforeFailure.iterator(), iterator, 
testRecordsBeforeFailure.size());
+
+        // -------------------------------- Trigger failover 
---------------------------------------
+        final ClusterControllable controller = (ClusterControllable) testEnv;
+        controller.triggerTaskManagerFailover(jobClient, () -> {});
+
+        JobStatusUtils.waitForJobStatus(
+                jobClient, Collections.singletonList(JobStatus.RUNNING), 
Duration.ofSeconds(30));
+
+        final Collection<T> testRecordsAfterFailure = 
externalContext.generateTestData();

Review comment:
       this could be the exact same data as before right? couldn't the test 
then pass by just rereading the first data points?

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test");
+
+        // Check test result
+        checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test");
+
+        // Check test result
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with a redundant parallelism.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testRedundantParallelism(
+            TestEnvironment testEnv, ExternalContext<T> externalContext) 
throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        final CloseableIterator<T> resultIterator =
+                testEnv.createExecutionEnvironment()
+                        .fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber + 1)
+                        .executeAndCollect("Redundant Parallelism Test");
+
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+    }
+
+    /**
+     * Test connector source with task manager failover.
+     *
+     * <p>This test will create 1 split in the external system, write test 
record set A into the
+     * split, restart task manager to trigger job failover, write test record 
set B into the split,
+     * and terminate the Flink job finally.
+     *
+     * <p>The number and order of records consumed by Flink should be 
identical to A before the
+     * failover and B after the failover in order to pass the test.
+     *
+     * <p>An unbounded source is required for this test, since TaskManager 
failover will be
+     * triggered in the middle of the test.
+     */
+    @Case
+    @Tag("failover")
+    @DisplayName("Test TaskManager failure")
+    public void testTaskManagerFailure(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        checkEnvironmentIsControllable(testEnv);
+
+        final Collection<T> testRecordsBeforeFailure = 
externalContext.generateTestData();
+        final SourceSplitDataWriter<T> sourceSplitDataWriter = 
externalContext.createSourceSplit();
+        sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
+
+        final StreamExecutionEnvironment env = 
testEnv.createExecutionEnvironment();
+
+        env.enableCheckpointing(50);
+        final DataStreamSource<T> dataStreamSource =
+                env.fromSource(
+                                
externalContext.createSource(Boundedness.CONTINUOUS_UNBOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1);
+
+        // Since DataStream API doesn't expose job client for 
executeAndCollect(), we have
+        // to reuse these part of code to get both job client and result 
iterator :-(
+        // ------------------------------------ START 
---------------------------------------------
+        TypeSerializer<T> serializer = 
dataStreamSource.getType().createSerializer(env.getConfig());
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectSinkOperatorFactory<T> factory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
factory.getOperator();
+        CollectResultIterator<T> iterator =
+                new CollectResultIterator<>(
+                        operator.getOperatorIdFuture(),
+                        serializer,
+                        accumulatorName,
+                        env.getCheckpointConfig());
+        CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, 
factory);
+        sink.name("Data stream collect sink");
+        env.addOperator(sink.getTransformation());
+        final JobClient jobClient = env.executeAsync("TaskManager Failover 
Test");
+        iterator.setJobClient(jobClient);
+        // -------------------------------------- END 
---------------------------------------------
+
+        checkSingleSplitRecords(
+                testRecordsBeforeFailure.iterator(), iterator, 
testRecordsBeforeFailure.size());
+
+        // -------------------------------- Trigger failover 
---------------------------------------
+        final ClusterControllable controller = (ClusterControllable) testEnv;
+        controller.triggerTaskManagerFailover(jobClient, () -> {});
+
+        JobStatusUtils.waitForJobStatus(
+                jobClient, Collections.singletonList(JobStatus.RUNNING), 
Duration.ofSeconds(30));
+
+        final Collection<T> testRecordsAfterFailure = 
externalContext.generateTestData();
+        sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);
+        checkSingleSplitRecords(
+                testRecordsAfterFailure.iterator(), iterator, 
testRecordsAfterFailure.size());
+
+        iterator.close();
+        JobStatusUtils.terminateJob(jobClient, Duration.ofSeconds(30));
+        JobStatusUtils.waitForJobStatus(
+                jobClient, Collections.singletonList(JobStatus.CANCELED), 
Duration.ofSeconds(30));
+    }
+
+    // ----------------------------- Helper Functions 
---------------------------------
+
+    /**
+     * Generate a set of test records and write it to the given split writer.
+     *
+     * @param externalContext External context
+     * @return Collection of generated test records
+     */
+    protected Collection<T> generateAndWriteTestData(ExternalContext<T> 
externalContext) {
+        final Collection<T> testRecordCollection = 
externalContext.generateTestData();
+        LOG.debug("Writing {} records to external system", 
testRecordCollection.size());
+        externalContext.createSourceSplit().writeRecords(testRecordCollection);
+        return testRecordCollection;
+    }
+
+    /**
+     * Check if the given test environment is controllable (can trigger 
failover or network
+     * isolation).
+     *
+     * @param testEnvironment Test environment being checked
+     * @throws IllegalArgumentException if the test environment is not 
controllable
+     */
+    protected void checkEnvironmentIsControllable(TestEnvironment 
testEnvironment)
+            throws IllegalArgumentException {
+        assumeTrue(
+                
ClusterControllable.class.isAssignableFrom(testEnvironment.getClass()),
+                "Provided test environment should be controllable.");
+    }
+
+    /**
+     * Check if records consumed by Flink are identical to test records 
written into the single
+     * split within the limit of record number.
+     *
+     * @param testRecordIterator Iterator of test records
+     * @param resultRecordIterator Iterator of result records consumed by Flink
+     * @param limit Number of records to check
+     */
+    protected void checkSingleSplitRecords(
+            Iterator<T> testRecordIterator, Iterator<T> resultRecordIterator, 
int limit) {
+        for (int i = 0; i < limit; i++) {
+            T testRecord = testRecordIterator.next();
+            T resultRecord = resultRecordIterator.next();
+            assertEquals(testRecord, resultRecord);
+        }
+        LOG.debug("{} records are validated", limit);
+    }
+
+    /**
+     * Check if records consumed by Flink are identical to test records 
written into the single
+     * split.
+     *
+     * @param testRecordIterator Iterator of test records
+     * @param resultRecordIterator Iterator of result records consumed by Flink
+     */
+    protected void checkSingleSplitRecords(
+            Iterator<T> testRecordIterator, Iterator<T> resultRecordIterator) {
+        int recordCounter = 0;
+
+        while (testRecordIterator.hasNext()) {
+            T testRecord = testRecordIterator.next();
+            T resultRecord = resultRecordIterator.next();
+            assertEquals(testRecord, resultRecord);
+            recordCounter++;
+        }
+
+        assertFalse(resultRecordIterator.hasNext());
+        LOG.debug("{} records are validated", recordCounter);
+    }
+
+    /**
+     * Check if records consumed by Flink is identical to test records written 
into multiple splits.
+     *
+     * <p>The order of records across different splits can be arbitrary, but 
should be identical
+     * within a split.
+     *
+     * @param testRecordIterators Collection of iterators for records in 
different splits
+     * @param resultRecordIterator Iterator of result records consumed by Flink
+     */
+    protected void checkMultipleSplitRecords(
+            Collection<Iterator<T>> testRecordIterators, Iterator<T> 
resultRecordIterator) {

Review comment:
       I'd probably materialize all results and get rid of iterators. It's just 
so much harder to read...

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/testsuites/TestSuiteBase.java
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.annotations.Case;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestingFrameworkExtension;
+import org.apache.flink.connectors.test.common.utils.JobStatusUtils;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({TestingFrameworkExtension.class, TestLoggerExtension.class})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test");
+
+        // Check test result
+        checkSingleSplitRecords(testRecords.iterator(), resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+        final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test");
+
+        // Check test result
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+        resultIterator.close();
+    }
+
+    /**
+     * Test connector source with a redundant parallelism.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @Case
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testRedundantParallelism(
+            TestEnvironment testEnv, ExternalContext<T> externalContext) 
throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        final CloseableIterator<T> resultIterator =
+                testEnv.createExecutionEnvironment()
+                        .fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber + 1)
+                        .executeAndCollect("Redundant Parallelism Test");
+
+        checkMultipleSplitRecords(
+                testRecordCollections.stream()
+                        .map(Collection::iterator)
+                        .collect(Collectors.toList()),
+                resultIterator);
+    }
+
+    /**
+     * Test connector source with task manager failover.
+     *
+     * <p>This test will create 1 split in the external system, write test 
record set A into the
+     * split, restart task manager to trigger job failover, write test record 
set B into the split,
+     * and terminate the Flink job finally.
+     *
+     * <p>The number and order of records consumed by Flink should be 
identical to A before the
+     * failover and B after the failover in order to pass the test.
+     *
+     * <p>An unbounded source is required for this test, since TaskManager 
failover will be
+     * triggered in the middle of the test.
+     */
+    @Case
+    @Tag("failover")
+    @DisplayName("Test TaskManager failure")
+    public void testTaskManagerFailure(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        checkEnvironmentIsControllable(testEnv);
+
+        final Collection<T> testRecordsBeforeFailure = 
externalContext.generateTestData();
+        final SourceSplitDataWriter<T> sourceSplitDataWriter = 
externalContext.createSourceSplit();
+        sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
+
+        final StreamExecutionEnvironment env = 
testEnv.createExecutionEnvironment();
+
+        env.enableCheckpointing(50);
+        final DataStreamSource<T> dataStreamSource =
+                env.fromSource(
+                                
externalContext.createSource(Boundedness.CONTINUOUS_UNBOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1);
+
+        // Since DataStream API doesn't expose job client for 
executeAndCollect(), we have
+        // to reuse these part of code to get both job client and result 
iterator :-(
+        // ------------------------------------ START 
---------------------------------------------
+        TypeSerializer<T> serializer = 
dataStreamSource.getType().createSerializer(env.getConfig());
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectSinkOperatorFactory<T> factory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
factory.getOperator();
+        CollectResultIterator<T> iterator =
+                new CollectResultIterator<>(
+                        operator.getOperatorIdFuture(),
+                        serializer,
+                        accumulatorName,
+                        env.getCheckpointConfig());
+        CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, 
factory);
+        sink.name("Data stream collect sink");
+        env.addOperator(sink.getTransformation());
+        final JobClient jobClient = env.executeAsync("TaskManager Failover 
Test");
+        iterator.setJobClient(jobClient);
+        // -------------------------------------- END 
---------------------------------------------
+
+        checkSingleSplitRecords(
+                testRecordsBeforeFailure.iterator(), iterator, 
testRecordsBeforeFailure.size());
+
+        // -------------------------------- Trigger failover 
---------------------------------------
+        final ClusterControllable controller = (ClusterControllable) testEnv;

Review comment:
       Also make `ClusterControllable` injectable.

##########
File path: flink-connectors/flink-connector-kafka/pom.xml
##########
@@ -262,10 +270,116 @@ under the License.
                                <configuration>
                                        <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
                                        <forkCount>1</forkCount>
-                                       <argLine>-Xms256m -Xmx2048m 
-Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+                                       <argLine>-Xms256m -Xmx2048m 
-Dmvn.forkNumber=${surefire.forkNumber}
+                                               -XX:-UseGCOverheadLimit
+                                       </argLine>
                                </configuration>
+                               <executions>
+                                       <execution>
+                                               <id>integration-tests</id>
+                                               <configuration>
+                                                       <excludes>
+                                                               
<exclude>${test.unit.pattern}</exclude>
+                                                               
<exclude>**/*E2ECase*</exclude>
+                                                       </excludes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
                        </plugin>
                </plugins>
        </build>
 
+       <!-- Profile for running testing framework cases -->
+       <profiles>
+               <profile>
+                       <id>run-testing-framework</id>

Review comment:
       Not a massive fan that this is a) optional and b) is quite a bit of 
boilerplate that would need to be replicated across several connectors.
   Ideally, we should execute all connector tests as ITCases with minicluster 
in the IDE and as E2E with container on AZP. Then we would just need a way to 
re-execute the same tests with a different TestEnvironment. @zentol WDYT?

##########
File path: 
flink-test-utils-parent/flink-connector-testing-framework/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/TestingFrameworkExtension.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.junit.extensions;
+
+import org.apache.flink.connectors.test.common.TestResource;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.ExternalSystem;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithExternalContextFactory;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithExternalSystem;
+import 
org.apache.flink.connectors.test.common.junit.annotations.WithTestEnvironment;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.platform.commons.support.AnnotationSupport;
+
+import java.lang.annotation.Annotation;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A JUnit 5 {@link Extension} for supporting running of testing framework.
+ *
+ * <p>This extension is responsible for searching test resources annotated by 
{@link
+ * WithTestEnvironment}, {@link WithExternalSystem} and {@link 
WithExternalContextFactory}, storing
+ * them into storage provided by JUnit, and manage lifecycle of these 
resources.
+ *
+ * <p>The extension uses {@link ExtensionContext.Store} for handing over test 
resources to {@link
+ * TestCaseInvocationContextProvider}, which will inject these resources into 
test cases.
+ */
+public class TestingFrameworkExtension implements BeforeAllCallback, 
AfterAllCallback {

Review comment:
       Are you expecting usage beyond connectors? If not, we could name it 
`ConnectorTestingExtension` (again dropping framework). If it's general, then 
we should find a new package/module name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to