[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let 
rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.

This closes #2651.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1934255
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1934255
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1934255

Branch: refs/heads/master
Commit: a1934255421b97eefd579183e9c7199c43ad1a2c
Parents: 3aafa16
Author: Till Rohrmann <[email protected]>
Authored: Mon Oct 17 16:22:16 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:24 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobManagerRunner.java     |  6 +++++-
 .../org/apache/flink/runtime/jobmaster/JobMaster.java | 14 ++++++++++----
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java     |  8 ++++++--
 .../flink/runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../flink/runtime/taskexecutor/TaskManagerRunner.java |  2 +-
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java      |  7 +------
 .../runtime/rpc/akka/MainThreadValidationTest.java    |  2 +-
 .../runtime/rpc/akka/MessageSerializationTest.java    |  2 +-
 .../flink/runtime/taskexecutor/TaskExecutorTest.java  |  4 ++--
 9 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 3313d8a..9d8e004 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -374,7 +374,11 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        // This will eventually be noticed, but can not be 
ruled out from the beginning.
                        if (leaderElectionService.hasLeadership()) {
                                if (jobRunning) {
-                                       jobManager.start(leaderSessionID);
+                                       try {
+                                               
jobManager.start(leaderSessionID);
+                                       } catch (Exception e) {
+                                               onFatalError(new 
Exception("Could not start the job manager.", e));
+                                       }
                                } else {
                                        log.info("Job {} ({}) already finished 
by others.", jobGraph.getName(), jobGraph.getJobID());
                                        jobFinishedByOther();

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 204cd80..c80cc51 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -267,7 +267,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         *
         * @param leaderSessionID The necessary leader id for running the job.
         */
-       public void start(final UUID leaderSessionID) {
+       public void start(final UUID leaderSessionID) throws Exception {
                if (LEADER_ID_UPDATER.compareAndSet(this, null, 
leaderSessionID)) {
                        super.start();
 
@@ -283,7 +283,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         * Suspend the job and shutdown all other services including rpc.
         */
        @Override
-       public void shutDown() {
+       public void shutDown() throws Exception {
                // make sure there is a graceful exit
                getSelf().suspendExecution(new Exception("JobManager is 
shutting down."));
                super.shutDown();
@@ -382,7 +382,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                try {
                        resourceManagerLeaderRetriever.stop();
                } catch (Exception e) {
-                       log.warn("Failed to stop resource manager leader 
retriever when suspending.");
+                       log.warn("Failed to stop resource manager leader 
retriever when suspending.", e);
                }
                closeResourceManagerConnection();
 
@@ -761,7 +761,13 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        @Override
                        public void run() {
                                log.error("Fatal error occurred on JobManager, 
cause: {}", cause.getMessage(), cause);
-                               shutDown();
+
+                               try {
+                                       shutDown();
+                               } catch (Exception e) {
+                                       cause.addSuppressed(e);
+                               }
+
                                errorHandler.onFatalError(cause);
                        }
                });

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index f93a2e2..b971b96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -110,8 +110,10 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         *
         * IMPORTANT: Whenever you override this method, call the parent 
implementation to enable
         * rpc processing. It is advised to make the parent call last.
+        *
+        * @throws Exception indicating that something went wrong while 
starting the RPC endpoint
         */
-       public void start() {
+       public void start() throws Exception {
                ((StartStoppable) self).start();
        }
 
@@ -123,8 +125,10 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * 
         * <p>This method can be overridden to add RPC endpoint specific shut 
down code.
         * The overridden method should always call the parent shut down method.
+        *
+        * @throws Exception indicating that the something went wrong while 
shutting the RPC endpoint down
         */
-       public void shutDown() {
+       public void shutDown() throws Exception {
                rpcService.stopServer(self);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 679324b..8187fde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -196,7 +196,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void start() {
+       public void start() throws Exception {
                super.start();
 
                // start by connecting to the ResourceManager

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index f56d17c..7d9ee55 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -107,7 +107,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
        //  Lifecycle management
        // 
--------------------------------------------------------------------------------------------
 
-       public void start() {
+       public void start() throws Exception {
                taskManager.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index ba8eb11..d2dbab7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -36,10 +35,6 @@ import org.junit.Test;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -163,7 +158,7 @@ public class AkkaRpcActorTest extends TestLogger {
         * @throws InterruptedException
         */
        @Test(timeout=1000)
-       public void testRpcEndpointTerminationFuture() throws 
ExecutionException, InterruptedException {
+       public void testRpcEndpointTerminationFuture() throws Exception {
                final DummyRpcEndpoint rpcEndpoint = new 
DummyRpcEndpoint(akkaRpcService);
                rpcEndpoint.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 9ec1f7e..9f134d8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -34,7 +34,7 @@ import static org.junit.Assert.assertTrue;
 public class MainThreadValidationTest extends TestLogger {
 
        @Test
-       public void failIfNotInMainThread() {
+       public void failIfNotInMainThread() throws Exception {
                // test if assertions are activated. The test only works if 
assertions are loaded.
                try {
                        assert false;

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 0d5dc28..d640a97 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -80,7 +80,7 @@ public class MessageSerializationTest extends TestLogger {
         * Tests that a local rpc call with a non serializable argument can be 
executed.
         */
        @Test
-       public void testNonSerializableLocalMessageTransfer() throws 
InterruptedException, IOException {
+       public void testNonSerializableLocalMessageTransfer() throws Exception {
                LinkedBlockingQueue<Object> linkedBlockingQueue = new 
LinkedBlockingQueue<>();
                TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, 
linkedBlockingQueue);
                testEndpoint.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 4d73a4b..638ec56 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -354,7 +354,7 @@ public class TaskExecutorTest extends TestLogger {
         * the job leader, it will offer all reserved slots to the JobManager.
         */
        @Test
-       public void testJobLeaderDetection() throws 
TestingFatalErrorHandler.TestingException, SlotAllocationException {
+       public void testJobLeaderDetection() throws Exception {
                final JobID jobId = new JobID();
 
                final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
@@ -621,7 +621,7 @@ public class TaskExecutorTest extends TestLogger {
         */
        @Ignore
        @Test
-       public void testRejectAllocationRequestsForOutOfSyncSlots() throws 
SlotAllocationException {
+       public void testRejectAllocationRequestsForOutOfSyncSlots() throws 
Exception {
                final ResourceID resourceID = ResourceID.generate();
 
                final String address1 = "/resource/manager/address/one";

Reply via email to