[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints to quickly fail without having to use a callback like the FatalErrorHandler.
This closes #2651. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1934255 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1934255 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1934255 Branch: refs/heads/master Commit: a1934255421b97eefd579183e9c7199c43ad1a2c Parents: 3aafa16 Author: Till Rohrmann <[email protected]> Authored: Mon Oct 17 16:22:16 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:24 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobManagerRunner.java | 6 +++++- .../org/apache/flink/runtime/jobmaster/JobMaster.java | 14 ++++++++++---- .../org/apache/flink/runtime/rpc/RpcEndpoint.java | 8 ++++++-- .../flink/runtime/taskexecutor/TaskExecutor.java | 2 +- .../flink/runtime/taskexecutor/TaskManagerRunner.java | 2 +- .../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 7 +------ .../runtime/rpc/akka/MainThreadValidationTest.java | 2 +- .../runtime/rpc/akka/MessageSerializationTest.java | 2 +- .../flink/runtime/taskexecutor/TaskExecutorTest.java | 4 ++-- 9 files changed, 28 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 3313d8a..9d8e004 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -374,7 +374,11 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F // This will eventually be noticed, but can not be ruled out from the beginning. if (leaderElectionService.hasLeadership()) { if (jobRunning) { - jobManager.start(leaderSessionID); + try { + jobManager.start(leaderSessionID); + } catch (Exception e) { + onFatalError(new Exception("Could not start the job manager.", e)); + } } else { log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID()); jobFinishedByOther(); http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 204cd80..c80cc51 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -267,7 +267,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { * * @param leaderSessionID The necessary leader id for running the job. */ - public void start(final UUID leaderSessionID) { + public void start(final UUID leaderSessionID) throws Exception { if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) { super.start(); @@ -283,7 +283,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { * Suspend the job and shutdown all other services including rpc. */ @Override - public void shutDown() { + public void shutDown() throws Exception { // make sure there is a graceful exit getSelf().suspendExecution(new Exception("JobManager is shutting down.")); super.shutDown(); @@ -382,7 +382,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { try { resourceManagerLeaderRetriever.stop(); } catch (Exception e) { - log.warn("Failed to stop resource manager leader retriever when suspending."); + log.warn("Failed to stop resource manager leader retriever when suspending.", e); } closeResourceManagerConnection(); @@ -761,7 +761,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @Override public void run() { log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause); - shutDown(); + + try { + shutDown(); + } catch (Exception e) { + cause.addSuppressed(e); + } + errorHandler.onFatalError(cause); } }); http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index f93a2e2..b971b96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -110,8 +110,10 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * * IMPORTANT: Whenever you override this method, call the parent implementation to enable * rpc processing. It is advised to make the parent call last. + * + * @throws Exception indicating that something went wrong while starting the RPC endpoint */ - public void start() { + public void start() throws Exception { ((StartStoppable) self).start(); } @@ -123,8 +125,10 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * * <p>This method can be overridden to add RPC endpoint specific shut down code. * The overridden method should always call the parent shut down method. + * + * @throws Exception indicating that the something went wrong while shutting the RPC endpoint down */ - public void shutDown() { + public void shutDown() throws Exception { rpcService.stopServer(self); } http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 679324b..8187fde 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -196,7 +196,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { // ------------------------------------------------------------------------ @Override - public void start() { + public void start() throws Exception { super.start(); // start by connecting to the ResourceManager http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index f56d17c..7d9ee55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -107,7 +107,7 @@ public class TaskManagerRunner implements FatalErrorHandler { // Lifecycle management // -------------------------------------------------------------------------------------------- - public void start() { + public void start() throws Exception { taskManager.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index ba8eb11..d2dbab7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -22,7 +22,6 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; @@ -36,10 +35,6 @@ import org.junit.Test; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -163,7 +158,7 @@ public class AkkaRpcActorTest extends TestLogger { * @throws InterruptedException */ @Test(timeout=1000) - public void testRpcEndpointTerminationFuture() throws ExecutionException, InterruptedException { + public void testRpcEndpointTerminationFuture() throws Exception { final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService); rpcEndpoint.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java index 9ec1f7e..9f134d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java @@ -34,7 +34,7 @@ import static org.junit.Assert.assertTrue; public class MainThreadValidationTest extends TestLogger { @Test - public void failIfNotInMainThread() { + public void failIfNotInMainThread() throws Exception { // test if assertions are activated. The test only works if assertions are loaded. try { assert false; http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java index 0d5dc28..d640a97 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java @@ -80,7 +80,7 @@ public class MessageSerializationTest extends TestLogger { * Tests that a local rpc call with a non serializable argument can be executed. */ @Test - public void testNonSerializableLocalMessageTransfer() throws InterruptedException, IOException { + public void testNonSerializableLocalMessageTransfer() throws Exception { LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>(); TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue); testEndpoint.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 4d73a4b..638ec56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -354,7 +354,7 @@ public class TaskExecutorTest extends TestLogger { * the job leader, it will offer all reserved slots to the JobManager. */ @Test - public void testJobLeaderDetection() throws TestingFatalErrorHandler.TestingException, SlotAllocationException { + public void testJobLeaderDetection() throws Exception { final JobID jobId = new JobID(); final TestingSerialRpcService rpc = new TestingSerialRpcService(); @@ -621,7 +621,7 @@ public class TaskExecutorTest extends TestLogger { */ @Ignore @Test - public void testRejectAllocationRequestsForOutOfSyncSlots() throws SlotAllocationException { + public void testRejectAllocationRequestsForOutOfSyncSlots() throws Exception { final ResourceID resourceID = ResourceID.generate(); final String address1 = "/resource/manager/address/one";
