This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e94a4b9f309295ba086dbd2f8a0b9c5cfdebf069
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Mar 27 14:30:35 2020 +0100

    [hotfix][rpc] Add proper error reporting to 
AkkaRpcActor#handleControlMessage
---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       | 59 ++++++++++++----------
 1 file changed, 32 insertions(+), 27 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 25fb38b..0397172 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -119,7 +119,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
                this.rpcEndpointTerminationResult = 
RpcEndpointTerminationResult.failure(
                        new AkkaRpcException(
                                String.format("RpcEndpoint %s has not been 
properly stopped.", rpcEndpoint.getEndpointId())));
-               this.state = StoppedState.INSTANCE;
+               this.state = StoppedState.STOPPED;
        }
 
        @Override
@@ -164,18 +164,23 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
        }
 
        private void handleControlMessage(ControlMessages controlMessage) {
-               switch (controlMessage) {
-                       case START:
-                               state = state.start(this);
-                               break;
-                       case STOP:
-                               state = state.stop();
-                               break;
-                       case TERMINATE:
-                               state.terminate(this);
-                               break;
-                       default:
-                               handleUnknownControlMessage(controlMessage);
+               try {
+                       switch (controlMessage) {
+                               case START:
+                                       state = state.start(this);
+                                       break;
+                               case STOP:
+                                       state = state.stop();
+                                       break;
+                               case TERMINATE:
+                                       state = state.terminate(this);
+                                       break;
+                               default:
+                                       
handleUnknownControlMessage(controlMessage);
+                       }
+               } catch (Exception e) {
+                       this.rpcEndpointTerminationResult = 
RpcEndpointTerminationResult.failure(e);
+                       throw e;
                }
        }
 
@@ -462,19 +467,19 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
 
        interface State {
                default State start(AkkaRpcActor<?> akkaRpcActor) {
-                       throw new 
AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.INSTANCE));
+                       throw new 
AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.STARTED));
                }
 
                default State stop() {
-                       throw new 
AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.INSTANCE));
+                       throw new 
AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.STOPPED));
                }
 
                default State terminate(AkkaRpcActor<?> akkaRpcActor) {
-                       throw new 
AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.INSTANCE));
+                       throw new 
AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.TERMINATING));
                }
 
                default State finishTermination() {
-                       return TerminatedState.INSTANCE;
+                       return TerminatedState.TERMINATED;
                }
 
                default boolean isRunning() {
@@ -488,16 +493,16 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
 
        @SuppressWarnings("Singleton")
        enum StartedState implements State {
-               INSTANCE;
+               STARTED;
 
                @Override
                public State start(AkkaRpcActor<?> akkaRpcActor) {
-                       return INSTANCE;
+                       return STARTED;
                }
 
                @Override
                public State stop() {
-                       return StoppedState.INSTANCE;
+                       return StoppedState.STOPPED;
                }
 
                @Override
@@ -523,7 +528,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
 
                        terminationFuture.whenComplete((ignored, throwable) -> 
akkaRpcActor.stop(RpcEndpointTerminationResult.of(throwable)));
 
-                       return TerminatingState.INSTANCE;
+                       return TerminatingState.TERMINATING;
                }
 
                @Override
@@ -534,7 +539,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
 
        @SuppressWarnings("Singleton")
        enum StoppedState implements State {
-               INSTANCE;
+               STOPPED;
 
                @Override
                public State start(AkkaRpcActor<?> akkaRpcActor) {
@@ -552,25 +557,25 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
                                
akkaRpcActor.mainThreadValidator.exitMainThread();
                        }
 
-                       return StartedState.INSTANCE;
+                       return StartedState.STARTED;
                }
 
                @Override
                public State stop() {
-                       return INSTANCE;
+                       return STOPPED;
                }
 
                @Override
                public State terminate(AkkaRpcActor<?> akkaRpcActor) {
                        
akkaRpcActor.stop(RpcEndpointTerminationResult.success());
 
-                       return TerminatingState.INSTANCE;
+                       return TerminatingState.TERMINATING;
                }
        }
 
        @SuppressWarnings("Singleton")
        enum TerminatingState implements State {
-               INSTANCE;
+               TERMINATING;
 
                @Override
                public boolean isRunning() {
@@ -579,7 +584,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
        }
 
        enum TerminatedState implements State {
-               INSTANCE
+               TERMINATED
        }
 
        private static final class RpcEndpointTerminationResult {

Reply via email to