This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b9e1e4db20beca757e958a1f7a1797e59e5cc8cd
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Aug 12 17:56:55 2021 +0800

    [FLINK-19554][connector/testing-framework] Implementations of 
TestEnvironment
---
 .../flink-end-to-end-tests-common/pom.xml          |   5 +
 .../util/flink/FlinkContainerTestEnvironment.java  | 121 ++++++++++++++++++
 .../flink-streaming-kinesis-test/pom.xml           |  17 +++
 .../flink/runtime/testutils/CommonTestUtils.java   |  41 +++++++
 .../environment/MiniClusterTestEnvironment.java    | 135 +++++++++++++++++++++
 .../environment/RemoteClusterTestEnvironment.java  |  71 +++++++++++
 6 files changed, 390 insertions(+)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
index 702fe8d..3ed5d81 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
@@ -61,6 +61,11 @@ under the License.
                        <scope>compile</scope>
                </dependency>
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-testing_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
                        <groupId>org.testcontainers</groupId>
                        <artifactId>testcontainers</artifactId>
                </dependency>
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
new file mode 100644
index 0000000..fc1b5d9
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.time.Duration;
+import java.util.Arrays;
+
+import static 
org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static 
org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static 
org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static org.apache.flink.configuration.TaskManagerOptions.NUM_TASK_SLOTS;
+
+/** Test environment running job on {@link FlinkContainer}. */
+public class FlinkContainerTestEnvironment implements TestEnvironment, 
ClusterControllable {
+
+    private final FlinkContainer flinkContainer;
+    private final String[] jarPath;
+
+    public FlinkContainerTestEnvironment(
+            int numTaskManagers, int numSlotsPerTaskManager, String... 
jarPath) {
+
+        Configuration flinkConfiguration = new Configuration();
+        flinkConfiguration.set(HEARTBEAT_INTERVAL, 1000L);
+        flinkConfiguration.set(HEARTBEAT_TIMEOUT, 5000L);
+        flinkConfiguration.set(SLOT_REQUEST_TIMEOUT, 10000L);
+        flinkConfiguration.set(NUM_TASK_SLOTS, numSlotsPerTaskManager);
+
+        this.flinkContainer =
+                FlinkContainer.builder()
+                        .numTaskManagers(numTaskManagers)
+                        .withFlinkConfiguration(flinkConfiguration)
+                        .build();
+        this.jarPath = jarPath;
+    }
+
+    @Override
+    public void startUp() {
+        if (!flinkContainer.isRunning()) {
+            this.flinkContainer.start();
+        }
+    }
+
+    @Override
+    public void tearDown() {
+        if (flinkContainer.isRunning()) {
+            this.flinkContainer.stop();
+        }
+    }
+
+    @Override
+    public StreamExecutionEnvironment createExecutionEnvironment() {
+        return StreamExecutionEnvironment.createRemoteEnvironment(
+                this.flinkContainer.getHost(),
+                
this.flinkContainer.getMappedPort(FlinkContainer.JOB_MANAGER_REST_PORT),
+                this.jarPath);
+    }
+
+    @Override
+    public void triggerJobManagerFailover(JobClient jobClient, Runnable 
afterFailAction) {}
+
+    @Override
+    public void triggerTaskManagerFailover(JobClient jobClient, Runnable 
afterFailAction)
+            throws Exception {
+        flinkContainer.restartTaskManager(
+                () -> {
+                    try {
+                        CommonTestUtils.waitForJobStatus(
+                                jobClient,
+                                Arrays.asList(
+                                        JobStatus.FAILING, JobStatus.FAILED, 
JobStatus.RESTARTING),
+                                Deadline.fromNow(Duration.ofSeconds(30)));
+                    } catch (Exception e) {
+                        throw new RuntimeException(
+                                "Error waiting for job entering failure 
status", e);
+                    }
+                    afterFailAction.run();
+                });
+    }
+
+    @Override
+    public void isolateNetwork(JobClient jobClient, Runnable afterFailAction) 
{}
+
+    @Override
+    public String toString() {
+        return "FlinkContainer";
+    }
+
+    /**
+     * Get instance of Flink containers for cluster controlling.
+     *
+     * @return Flink cluster on Testcontainers
+     */
+    public FlinkContainer getFlinkContainer() {
+        return this.flinkContainer;
+    }
+}
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
index edd6f61..534879e 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -145,6 +145,23 @@ under the License.
                                        </systemPropertyVariables>
                                </configuration>
                        </plugin>
+
+                       <!-- Skip dependency convergence check because of guava 
version -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-enforcer-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>dependency-convergence</id>
+                                               <goals>
+                                                       <goal>enforce</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <skip>true</skip>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 7933dbe..ea1d55b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testutils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -39,6 +40,7 @@ import java.lang.management.RuntimeMXBean;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -200,6 +202,45 @@ public class CommonTestUtils {
         waitUntilCondition(() -> jobStatusSupplier.get() != 
JobStatus.INITIALIZING, timeout, 20L);
     }
 
+    public static void waitForJobStatus(
+            JobClient client, List<JobStatus> expectedStatus, Deadline 
deadline) throws Exception {
+        waitUntilCondition(
+                () -> {
+                    final JobStatus currentStatus = 
client.getJobStatus().get();
+
+                    // Entered an expected status
+                    if (expectedStatus.contains(currentStatus)) {
+                        return true;
+                    }
+
+                    // Entered a terminal status but not expected
+                    if (currentStatus.isTerminalState()) {
+                        try {
+                            // Exception will be exposed here if job failed
+                            client.getJobExecutionResult().get();
+                        } catch (Exception e) {
+                            throw new IllegalStateException(
+                                    String.format(
+                                            "Job has entered %s state, but 
expecting %s",
+                                            currentStatus, expectedStatus),
+                                    e);
+                        }
+                        throw new IllegalStateException(
+                                String.format(
+                                        "Job has entered a terminal state %s, 
but expecting %s",
+                                        currentStatus, expectedStatus));
+                    }
+
+                    // Continue waiting for expected status
+                    return false;
+                },
+                deadline);
+    }
+
+    public static void terminateJob(JobClient client, Duration timeout) throws 
Exception {
+        client.cancel().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+    }
+
     /** Utility class to read the output of a process stream and forward it 
into a StringWriter. */
     public static class PipeForwarder extends Thread {
 
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java
new file mode 100644
index 0000000..ab1c4fc
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.environment;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.execution.JobClient;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+/** Test environment for running jobs on Flink mini-cluster. */
+@Experimental
+public class MiniClusterTestEnvironment implements TestEnvironment, 
ClusterControllable {
+
+    private final MiniClusterWithClientResource miniCluster;
+
+    private int latestTMIndex = 0;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MiniClusterTestEnvironment.class);
+
+    private boolean isStarted = false;
+
+    public MiniClusterTestEnvironment() {
+        this.miniCluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(6)
+                                
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+    }
+
+    @Override
+    public StreamExecutionEnvironment createExecutionEnvironment() {
+        return StreamExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    @Override
+    public void triggerJobManagerFailover(JobClient jobClient, Runnable 
afterFailAction)
+            throws ExecutionException, InterruptedException {
+        final Optional<HaLeadershipControl> controlOptional =
+                miniCluster.getMiniCluster().getHaLeadershipControl();
+        if (!controlOptional.isPresent()) {
+            throw new UnsupportedOperationException(
+                    "This MiniCluster does not support JobManager HA");
+        }
+        final HaLeadershipControl haLeadershipControl = controlOptional.get();
+        
haLeadershipControl.revokeJobMasterLeadership(jobClient.getJobID()).get();
+        afterFailAction.run();
+        
haLeadershipControl.grantJobMasterLeadership(jobClient.getJobID()).get();
+    }
+
+    @Override
+    public void triggerTaskManagerFailover(JobClient jobClient, Runnable 
afterFailAction)
+            throws Exception {
+        terminateTaskManager();
+        CommonTestUtils.waitForJobStatus(
+                jobClient,
+                Arrays.asList(JobStatus.FAILING, JobStatus.FAILED, 
JobStatus.RESTARTING),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+        afterFailAction.run();
+        startTaskManager();
+    }
+
+    @Override
+    public void isolateNetwork(JobClient jobClient, Runnable afterFailAction) {
+        throw new UnsupportedOperationException("Cannot isolate network in a 
MiniCluster");
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        if (isStarted) {
+            return;
+        }
+        this.miniCluster.before();
+        LOG.debug("MiniCluster is running");
+        isStarted = true;
+    }
+
+    @Override
+    public void tearDown() {
+        if (!isStarted) {
+            return;
+        }
+        isStarted = false;
+        this.miniCluster.after();
+        LOG.debug("MiniCluster has been tear down");
+    }
+
+    private void terminateTaskManager() throws Exception {
+        miniCluster.getMiniCluster().terminateTaskManager(latestTMIndex).get();
+        LOG.debug("TaskManager {} has been terminated.", latestTMIndex);
+    }
+
+    private void startTaskManager() throws Exception {
+        miniCluster.getMiniCluster().startTaskManager();
+        latestTMIndex++;
+        LOG.debug("New TaskManager {} has been launched.", latestTMIndex);
+    }
+
+    @Override
+    public String toString() {
+        return "MiniCluster";
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/RemoteClusterTestEnvironment.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/RemoteClusterTestEnvironment.java
new file mode 100644
index 0000000..337ff2d
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/RemoteClusterTestEnvironment.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.environment;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** Test environment for running test on a remote Flink cluster. */
+@Experimental
+public class RemoteClusterTestEnvironment implements TestEnvironment {
+
+    private final String host;
+    private final int port;
+    private final String[] jarPath;
+    private final Configuration config;
+
+    /**
+     * Construct a test environment for a remote Flink cluster.
+     *
+     * @param host Hostname of the remote JobManager
+     * @param port REST port of the remote JobManager
+     * @param jarPath Path of JARs to be shipped to Flink cluster
+     */
+    public RemoteClusterTestEnvironment(String host, int port, String... 
jarPath) {
+        this(host, port, new Configuration(), jarPath);
+    }
+
+    /**
+     * Construct a test environment for a remote Flink cluster with 
configurations.
+     *
+     * @param host Hostname of the remote JobManager
+     * @param port REST port of the remote JobManager
+     * @param config Configurations of the test environment
+     * @param jarPath Path of JARs to be shipped to Flink cluster
+     */
+    public RemoteClusterTestEnvironment(
+            String host, int port, Configuration config, String... jarPath) {
+        this.host = host;
+        this.port = port;
+        this.config = config;
+        this.jarPath = jarPath;
+    }
+
+    @Override
+    public StreamExecutionEnvironment createExecutionEnvironment() {
+        return StreamExecutionEnvironment.createRemoteEnvironment(host, port, 
jarPath);
+    }
+
+    @Override
+    public void startUp() {}
+
+    @Override
+    public void tearDown() {}
+}

Reply via email to