This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b9e1e4db20beca757e958a1f7a1797e59e5cc8cd Author: Qingsheng Ren <[email protected]> AuthorDate: Thu Aug 12 17:56:55 2021 +0800 [FLINK-19554][connector/testing-framework] Implementations of TestEnvironment --- .../flink-end-to-end-tests-common/pom.xml | 5 + .../util/flink/FlinkContainerTestEnvironment.java | 121 ++++++++++++++++++ .../flink-streaming-kinesis-test/pom.xml | 17 +++ .../flink/runtime/testutils/CommonTestUtils.java | 41 +++++++ .../environment/MiniClusterTestEnvironment.java | 135 +++++++++++++++++++++ .../environment/RemoteClusterTestEnvironment.java | 71 +++++++++++ 6 files changed, 390 insertions(+) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml index 702fe8d..3ed5d81 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml @@ -61,6 +61,11 @@ under the License. <scope>compile</scope> </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-testing_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> </dependency> diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java new file mode 100644 index 0000000..fc1b5d9 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connectors.test.common.environment.ClusterControllable; +import org.apache.flink.connectors.test.common.environment.TestEnvironment; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.time.Duration; +import java.util.Arrays; + +import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL; +import static org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT; +import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT; +import static org.apache.flink.configuration.TaskManagerOptions.NUM_TASK_SLOTS; + +/** Test environment running job on {@link FlinkContainer}. */ +public class FlinkContainerTestEnvironment implements TestEnvironment, ClusterControllable { + + private final FlinkContainer flinkContainer; + private final String[] jarPath; + + public FlinkContainerTestEnvironment( + int numTaskManagers, int numSlotsPerTaskManager, String... jarPath) { + + Configuration flinkConfiguration = new Configuration(); + flinkConfiguration.set(HEARTBEAT_INTERVAL, 1000L); + flinkConfiguration.set(HEARTBEAT_TIMEOUT, 5000L); + flinkConfiguration.set(SLOT_REQUEST_TIMEOUT, 10000L); + flinkConfiguration.set(NUM_TASK_SLOTS, numSlotsPerTaskManager); + + this.flinkContainer = + FlinkContainer.builder() + .numTaskManagers(numTaskManagers) + .withFlinkConfiguration(flinkConfiguration) + .build(); + this.jarPath = jarPath; + } + + @Override + public void startUp() { + if (!flinkContainer.isRunning()) { + this.flinkContainer.start(); + } + } + + @Override + public void tearDown() { + if (flinkContainer.isRunning()) { + this.flinkContainer.stop(); + } + } + + @Override + public StreamExecutionEnvironment createExecutionEnvironment() { + return StreamExecutionEnvironment.createRemoteEnvironment( + this.flinkContainer.getHost(), + this.flinkContainer.getMappedPort(FlinkContainer.JOB_MANAGER_REST_PORT), + this.jarPath); + } + + @Override + public void triggerJobManagerFailover(JobClient jobClient, Runnable afterFailAction) {} + + @Override + public void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) + throws Exception { + flinkContainer.restartTaskManager( + () -> { + try { + CommonTestUtils.waitForJobStatus( + jobClient, + Arrays.asList( + JobStatus.FAILING, JobStatus.FAILED, JobStatus.RESTARTING), + Deadline.fromNow(Duration.ofSeconds(30))); + } catch (Exception e) { + throw new RuntimeException( + "Error waiting for job entering failure status", e); + } + afterFailAction.run(); + }); + } + + @Override + public void isolateNetwork(JobClient jobClient, Runnable afterFailAction) {} + + @Override + public String toString() { + return "FlinkContainer"; + } + + /** + * Get instance of Flink containers for cluster controlling. + * + * @return Flink cluster on Testcontainers + */ + public FlinkContainer getFlinkContainer() { + return this.flinkContainer; + } +} diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml index edd6f61..534879e 100644 --- a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml @@ -145,6 +145,23 @@ under the License. </systemPropertyVariables> </configuration> </plugin> + + <!-- Skip dependency convergence check because of guava version --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <id>dependency-convergence</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java index 7933dbe..ea1d55b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -39,6 +40,7 @@ import java.lang.management.RuntimeMXBean; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -200,6 +202,45 @@ public class CommonTestUtils { waitUntilCondition(() -> jobStatusSupplier.get() != JobStatus.INITIALIZING, timeout, 20L); } + public static void waitForJobStatus( + JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception { + waitUntilCondition( + () -> { + final JobStatus currentStatus = client.getJobStatus().get(); + + // Entered an expected status + if (expectedStatus.contains(currentStatus)) { + return true; + } + + // Entered a terminal status but not expected + if (currentStatus.isTerminalState()) { + try { + // Exception will be exposed here if job failed + client.getJobExecutionResult().get(); + } catch (Exception e) { + throw new IllegalStateException( + String.format( + "Job has entered %s state, but expecting %s", + currentStatus, expectedStatus), + e); + } + throw new IllegalStateException( + String.format( + "Job has entered a terminal state %s, but expecting %s", + currentStatus, expectedStatus)); + } + + // Continue waiting for expected status + return false; + }, + deadline); + } + + public static void terminateJob(JobClient client, Duration timeout) throws Exception { + client.cancel().get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + /** Utility class to read the output of a process stream and forward it into a StringWriter. */ public static class PipeForwarder extends Thread { diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java new file mode 100644 index 0000000..ab1c4fc --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common.environment; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +/** Test environment for running jobs on Flink mini-cluster. */ +@Experimental +public class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable { + + private final MiniClusterWithClientResource miniCluster; + + private int latestTMIndex = 0; + + private static final Logger LOG = LoggerFactory.getLogger(MiniClusterTestEnvironment.class); + + private boolean isStarted = false; + + public MiniClusterTestEnvironment() { + this.miniCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(6) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + } + + @Override + public StreamExecutionEnvironment createExecutionEnvironment() { + return StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @Override + public void triggerJobManagerFailover(JobClient jobClient, Runnable afterFailAction) + throws ExecutionException, InterruptedException { + final Optional<HaLeadershipControl> controlOptional = + miniCluster.getMiniCluster().getHaLeadershipControl(); + if (!controlOptional.isPresent()) { + throw new UnsupportedOperationException( + "This MiniCluster does not support JobManager HA"); + } + final HaLeadershipControl haLeadershipControl = controlOptional.get(); + haLeadershipControl.revokeJobMasterLeadership(jobClient.getJobID()).get(); + afterFailAction.run(); + haLeadershipControl.grantJobMasterLeadership(jobClient.getJobID()).get(); + } + + @Override + public void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) + throws Exception { + terminateTaskManager(); + CommonTestUtils.waitForJobStatus( + jobClient, + Arrays.asList(JobStatus.FAILING, JobStatus.FAILED, JobStatus.RESTARTING), + Deadline.fromNow(Duration.ofSeconds(30))); + afterFailAction.run(); + startTaskManager(); + } + + @Override + public void isolateNetwork(JobClient jobClient, Runnable afterFailAction) { + throw new UnsupportedOperationException("Cannot isolate network in a MiniCluster"); + } + + @Override + public void startUp() throws Exception { + if (isStarted) { + return; + } + this.miniCluster.before(); + LOG.debug("MiniCluster is running"); + isStarted = true; + } + + @Override + public void tearDown() { + if (!isStarted) { + return; + } + isStarted = false; + this.miniCluster.after(); + LOG.debug("MiniCluster has been tear down"); + } + + private void terminateTaskManager() throws Exception { + miniCluster.getMiniCluster().terminateTaskManager(latestTMIndex).get(); + LOG.debug("TaskManager {} has been terminated.", latestTMIndex); + } + + private void startTaskManager() throws Exception { + miniCluster.getMiniCluster().startTaskManager(); + latestTMIndex++; + LOG.debug("New TaskManager {} has been launched.", latestTMIndex); + } + + @Override + public String toString() { + return "MiniCluster"; + } +} diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/RemoteClusterTestEnvironment.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/RemoteClusterTestEnvironment.java new file mode 100644 index 0000000..337ff2d --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/RemoteClusterTestEnvironment.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common.environment; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** Test environment for running test on a remote Flink cluster. */ +@Experimental +public class RemoteClusterTestEnvironment implements TestEnvironment { + + private final String host; + private final int port; + private final String[] jarPath; + private final Configuration config; + + /** + * Construct a test environment for a remote Flink cluster. + * + * @param host Hostname of the remote JobManager + * @param port REST port of the remote JobManager + * @param jarPath Path of JARs to be shipped to Flink cluster + */ + public RemoteClusterTestEnvironment(String host, int port, String... jarPath) { + this(host, port, new Configuration(), jarPath); + } + + /** + * Construct a test environment for a remote Flink cluster with configurations. + * + * @param host Hostname of the remote JobManager + * @param port REST port of the remote JobManager + * @param config Configurations of the test environment + * @param jarPath Path of JARs to be shipped to Flink cluster + */ + public RemoteClusterTestEnvironment( + String host, int port, Configuration config, String... jarPath) { + this.host = host; + this.port = port; + this.config = config; + this.jarPath = jarPath; + } + + @Override + public StreamExecutionEnvironment createExecutionEnvironment() { + return StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarPath); + } + + @Override + public void startUp() {} + + @Override + public void tearDown() {} +}
