[FLINK-5141] [runtime] Add 'waitUntilTaskManagerRegistrationsComplete()' to MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82c1fcfa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82c1fcfa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82c1fcfa Branch: refs/heads/flip-6 Commit: 82c1fcfa1f34b963f45146830d51b1490b0dc1e3 Parents: c0086b5 Author: Stephan Ewen <[email protected]> Authored: Wed Nov 30 17:35:47 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 5 02:49:43 2016 +0100 ---------------------------------------------------------------------- .../leaderelection/LeaderAddressAndId.java | 73 +++++++++++++++++ .../flink/runtime/minicluster/MiniCluster.java | 58 ++++++++++++- .../minicluster/MiniClusterJobDispatcher.java | 2 +- .../OneTimeLeaderListenerFuture.java | 60 ++++++++++++++ .../resourcemanager/ResourceManager.java | 11 +++ .../resourcemanager/ResourceManagerGateway.java | 8 ++ .../runtime/minicluster/MiniClusterITCase.java | 8 ++ .../Flip6LocalStreamEnvironment.java | 23 +++--- .../LocalStreamEnvironmentITCase.java | 81 +++++++++++++++++++ .../flink/core/testutils/CheckedThread.java | 85 ++++++++++++++++++++ 10 files changed, 392 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java new file mode 100644 index 0000000..23cd34b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A combination of a leader address and leader id. + */ +public class LeaderAddressAndId { + + private final String leaderAddress; + private final UUID leaderId; + + public LeaderAddressAndId(String leaderAddress, UUID leaderId) { + this.leaderAddress = checkNotNull(leaderAddress); + this.leaderId = checkNotNull(leaderId); + } + + // ------------------------------------------------------------------------ + + public String leaderAddress() { + return leaderAddress; + } + + public UUID leaderId() { + return leaderId; + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return 31 * leaderAddress.hashCode()+ leaderId.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + else if (o != null && o.getClass() == LeaderAddressAndId.class) { + final LeaderAddressAndId that = (LeaderAddressAndId) o; + return this.leaderAddress.equals(that.leaderAddress) && this.leaderId.equals(that.leaderId); + } + else { + return false; + } + } + + @Override + public String toString() { + return "LeaderAddressAndId (" + leaderAddress + " / " + leaderId + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 3ede5b5..1b9f265 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -27,11 +27,15 @@ import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.leaderelection.LeaderAddressAndId; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; @@ -169,6 +173,7 @@ public class MiniCluster { final boolean singleRpc = config.getUseSingleRpcSystem(); try { + LOG.info("Starting Metrics Registry"); metricRegistry = createMetricRegistry(configuration); RpcService[] jobManagerRpcServices = new RpcService[numJobManagers]; @@ -176,10 +181,12 @@ public class MiniCluster { RpcService[] resourceManagerRpcServices = new RpcService[numResourceManagers]; // bring up all the RPC services - if (singleRpc) { - // one common RPC for all - commonRpcService = createRpcService(configuration, rpcTimeout, false, null); + LOG.info("Starting RPC Service(s)"); + + // we always need the 'commonRpcService' for auxiliary calls + commonRpcService = createRpcService(configuration, rpcTimeout, false, null); + if (singleRpc) { // set that same RPC service for all JobManagers and TaskManagers for (int i = 0; i < numJobManagers; i++) { jobManagerRpcServices[i] = commonRpcService; @@ -236,7 +243,7 @@ public class MiniCluster { configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices); // bring up the dispatcher that launches JobManagers when jobs submitted - LOG.info("Starting job dispatcher for {} JobManger(s)", numJobManagers); + LOG.info("Starting job dispatcher(s) for {} JobManger(s)", numJobManagers); jobDispatcher = new MiniClusterJobDispatcher( configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices); } @@ -357,6 +364,49 @@ public class MiniCluster { } } + public void waitUntilTaskManagerRegistrationsComplete() throws Exception { + LeaderRetrievalService rmMasterListener = null; + Future<LeaderAddressAndId> addressAndIdFuture; + + try { + synchronized (lock) { + checkState(running, "FlinkMiniCluster is not running"); + + OneTimeLeaderListenerFuture listenerFuture = new OneTimeLeaderListenerFuture(); + rmMasterListener = haServices.getResourceManagerLeaderRetriever(); + rmMasterListener.start(listenerFuture); + addressAndIdFuture = listenerFuture.future(); + } + + final LeaderAddressAndId addressAndId = addressAndIdFuture.get(); + + final ResourceManagerGateway resourceManager = + commonRpcService.connect(addressAndId.leaderAddress(), ResourceManagerGateway.class).get(); + + final int numTaskManagersToWaitFor = taskManagerRunners.length; + + // poll and wait until enough TaskManagers are available + while (true) { + int numTaskManagersAvailable = + resourceManager.getNumberOfRegisteredTaskManagers(addressAndId.leaderId()).get(); + + if (numTaskManagersAvailable >= numTaskManagersToWaitFor) { + break; + } + Thread.sleep(2); + } + } + finally { + try { + if (rmMasterListener != null) { + rmMasterListener.stop(); + } + } catch (Exception e) { + LOG.warn("Error shutting down leader listener for ResourceManager"); + } + } + } + // ------------------------------------------------------------------------ // running jobs // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index 8ac8eba..7fffaee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -143,7 +143,7 @@ public class MiniClusterJobDispatcher { if (!shutdown) { shutdown = true; - LOG.info("Shutting down the dispatcher"); + LOG.info("Shutting down the job dispatcher"); // in this shutdown code we copy the references to the stack first, // to avoid concurrent modification http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java new file mode 100644 index 0000000..b0157d8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.minicluster; + +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.leaderelection.LeaderAddressAndId; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; + +import java.util.UUID; + +/** + * A leader listener that exposes a future for the first leader notification. + * + * <p>The future can be obtained via the {@link #future()} method. + */ +public class OneTimeLeaderListenerFuture implements LeaderRetrievalListener { + + private final FlinkCompletableFuture<LeaderAddressAndId> future; + + public OneTimeLeaderListenerFuture() { + this.future = new FlinkCompletableFuture<>(); + } + + /** + * Gets the future that is completed with the leader address and ID. + * @return The future. + */ + public FlinkFuture<LeaderAddressAndId> future() { + return future; + } + + // ------------------------------------------------------------------------ + + @Override + public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { + future.complete(new LeaderAddressAndId(leaderAddress, leaderSessionID)); + } + + @Override + public void handleError(Exception exception) { + future.completeExceptionally(exception); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 145cc40..76b4a86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.LeaderIdMismatchException; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderContender; @@ -502,6 +503,16 @@ public abstract class ResourceManager<WorkerType extends Serializable> shutDownApplication(finalStatus, optionalDiagnostics); } + @RpcMethod + public Integer getNumberOfRegisteredTaskManagers(UUID leaderSessionId) throws LeaderIdMismatchException { + if (this.leaderSessionId != null && this.leaderSessionId.equals(leaderSessionId)) { + return taskExecutors.size(); + } + else { + throw new LeaderIdMismatchException(this.leaderSessionId, leaderSessionId); + } + } + // ------------------------------------------------------------------------ // Testing methods // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 0a37bb9..8235ea7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -122,4 +122,12 @@ public interface ResourceManagerGateway extends RpcGateway { * @param optionalDiagnostics */ void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics); + + /** + * Gets the currently registered number of TaskManagers. + * + * @param leaderSessionId The leader session ID with which to address the ResourceManager. + * @return The future to the number of registered TaskManagers. + */ + Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId); } http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index 2cf2d4d..d9a1896 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -33,6 +33,10 @@ import org.junit.Test; */ public class MiniClusterITCase extends TestLogger { + // ------------------------------------------------------------------------ + // Simple Job Running Tests + // ------------------------------------------------------------------------ + @Test public void runJobWithSingleRpcService() throws Exception { MiniClusterConfiguration cfg = new MiniClusterConfiguration(); @@ -63,6 +67,10 @@ public class MiniClusterITCase extends TestLogger { executeJob(miniCluster); } + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + private static void executeJob(MiniCluster miniCluster) throws Exception { miniCluster.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java index a0c128e..2007d35 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.environment; import org.apache.flink.annotation.Public; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -30,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.streaming.api.graph.StreamGraph; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,8 +67,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { "The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " + "or running in a TestEnvironment context."); } - + this.conf = config == null ? new Configuration() : config; + setParallelism(1); } /** @@ -85,17 +86,12 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); - JobGraph jobGraph = streamGraph.getJobGraph(); + // TODO - temp fix to enforce restarts due to a bug in the allocation protocol + streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 5)); + JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true); - // As jira FLINK-5140 described, - // we have to set restart strategy to handle NoResourceAvailableException. - ExecutionConfig executionConfig = new ExecutionConfig(); - executionConfig.setRestartStrategy( - RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); - jobGraph.setExecutionConfig(executionConfig); - Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); @@ -105,7 +101,8 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration); - // Currently we do not reuse slot anymore, so we need to sum all parallelism of vertices up. + // Currently we do not reuse slot anymore, + // so we need to sum up the parallelism of all vertices int slotsCount = 0; for (JobVertex jobVertex : jobGraph.getVertices()) { slotsCount += jobVertex.getParallelism(); @@ -119,8 +116,10 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { MiniCluster miniCluster = new MiniCluster(cfg); try { miniCluster.start(); + miniCluster.waitUntilTaskManagerRegistrationsComplete(); return miniCluster.runJobBlocking(jobGraph); - } finally { + } + finally { transformations.clear(); miniCluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java new file mode 100644 index 0000000..a360d0e --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.environment; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.streaming.api.datastream.DataStream; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +@SuppressWarnings("serial") +public class LocalStreamEnvironmentITCase { + + /** + * Test test verifies that the execution environment can be used to execute a + * single job with multiple slots. + */ + @Test + public void testRunIsolatedJob() throws Exception { + Flip6LocalStreamEnvironment env = new Flip6LocalStreamEnvironment(); + assertEquals(1, env.getParallelism()); + + addSmallBoundedJob(env, 3); + env.execute(); + } + + /** + * Test test verifies that the execution environment can be used to execute multiple + * bounded streaming jobs after one another. + */ + @Test + public void testMultipleJobsAfterAnother() throws Exception { + Flip6LocalStreamEnvironment env = new Flip6LocalStreamEnvironment(); + + addSmallBoundedJob(env, 3); + env.execute(); + + addSmallBoundedJob(env, 5); + env.execute(); + } + + // ------------------------------------------------------------------------ + + private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) { + DataStream<Long> stream = env + .generateSequence(1, 100) + .setParallelism(parallelism) + .slotSharingGroup("group_1"); + + stream + .filter(new FilterFunction<Long>() { + @Override + public boolean filter(Long value) { + return false; + } + }) + .setParallelism(parallelism) + .startNewChain() + .slotSharingGroup("group_2") + + .print() + .setParallelism(parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java new file mode 100644 index 0000000..aedbb5c --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.testutils; + +/** + * A thread that additionally catches exceptions and offers a joining method that + * re-throws the exceptions. + * + * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one + * needs to extends this class and implement the {@link #go()} method. That method may + * throw exceptions. + * + * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this + * thread via the {@link #sync()} method. + */ +public abstract class CheckedThread extends Thread { + + /** The error thrown from the main work method */ + private volatile Throwable error; + + // ------------------------------------------------------------------------ + + /** + * This method needs to be overwritten to contain the main work logic. + * It takes the role of {@link Thread#run()}, but should propagate exceptions. + * + * @throws Exception The exceptions thrown here will be re-thrown in the {@link #sync()} method. + */ + public abstract void go() throws Exception; + + // ------------------------------------------------------------------------ + + /** + * This method is final - thread work should go into the {@link #go()} method instead. + */ + @Override + public final void run() { + try { + go(); + } + catch (Throwable t) { + error = t; + } + } + + /** + * Waits until the thread is completed and checks whether any error occurred during + * the execution. + * + * <p>This method blocks like {@link #join()}, but performs an additional check for + * exceptions thrown from the {@link #go()} method. + */ + public void sync() throws Exception { + super.join(); + + // propagate the error + if (error != null) { + if (error instanceof Error) { + throw (Error) error; + } + else if (error instanceof Exception) { + throw (Exception) error; + } + else { + throw new Exception(error.getMessage(), error); + } + } + } +}
