This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e94a4b9f309295ba086dbd2f8a0b9c5cfdebf069 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Mar 27 14:30:35 2020 +0100 [hotfix][rpc] Add proper error reporting to AkkaRpcActor#handleControlMessage --- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 59 ++++++++++++---------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 25fb38b..0397172 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -119,7 +119,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure( new AkkaRpcException( String.format("RpcEndpoint %s has not been properly stopped.", rpcEndpoint.getEndpointId()))); - this.state = StoppedState.INSTANCE; + this.state = StoppedState.STOPPED; } @Override @@ -164,18 +164,23 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { } private void handleControlMessage(ControlMessages controlMessage) { - switch (controlMessage) { - case START: - state = state.start(this); - break; - case STOP: - state = state.stop(); - break; - case TERMINATE: - state.terminate(this); - break; - default: - handleUnknownControlMessage(controlMessage); + try { + switch (controlMessage) { + case START: + state = state.start(this); + break; + case STOP: + state = state.stop(); + break; + case TERMINATE: + state = state.terminate(this); + break; + default: + handleUnknownControlMessage(controlMessage); + } + } catch (Exception e) { + this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e); + throw e; } } @@ -462,19 +467,19 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { interface State { default State start(AkkaRpcActor<?> akkaRpcActor) { - throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.INSTANCE)); + throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.STARTED)); } default State stop() { - throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.INSTANCE)); + throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.STOPPED)); } default State terminate(AkkaRpcActor<?> akkaRpcActor) { - throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.INSTANCE)); + throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.TERMINATING)); } default State finishTermination() { - return TerminatedState.INSTANCE; + return TerminatedState.TERMINATED; } default boolean isRunning() { @@ -488,16 +493,16 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { @SuppressWarnings("Singleton") enum StartedState implements State { - INSTANCE; + STARTED; @Override public State start(AkkaRpcActor<?> akkaRpcActor) { - return INSTANCE; + return STARTED; } @Override public State stop() { - return StoppedState.INSTANCE; + return StoppedState.STOPPED; } @Override @@ -523,7 +528,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { terminationFuture.whenComplete((ignored, throwable) -> akkaRpcActor.stop(RpcEndpointTerminationResult.of(throwable))); - return TerminatingState.INSTANCE; + return TerminatingState.TERMINATING; } @Override @@ -534,7 +539,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { @SuppressWarnings("Singleton") enum StoppedState implements State { - INSTANCE; + STOPPED; @Override public State start(AkkaRpcActor<?> akkaRpcActor) { @@ -552,25 +557,25 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { akkaRpcActor.mainThreadValidator.exitMainThread(); } - return StartedState.INSTANCE; + return StartedState.STARTED; } @Override public State stop() { - return INSTANCE; + return STOPPED; } @Override public State terminate(AkkaRpcActor<?> akkaRpcActor) { akkaRpcActor.stop(RpcEndpointTerminationResult.success()); - return TerminatingState.INSTANCE; + return TerminatingState.TERMINATING; } } @SuppressWarnings("Singleton") enum TerminatingState implements State { - INSTANCE; + TERMINATING; @Override public boolean isRunning() { @@ -579,7 +584,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { } enum TerminatedState implements State { - INSTANCE + TERMINATED } private static final class RpcEndpointTerminationResult {
