This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 98baea31bcd9d7d02302c7a135ed20048482b322 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Feb 22 12:20:50 2019 +0100 [FLINK-11718][rpc] Add onStart method to RpcEndpoint Add a dedicated onStart method to the RpcEndpoint which is called when the RpcEndpoint is started via the start() method. Due to this change it is no longer necessary for the user to override the start() method which is error prone because it always requires to call super.start(). Now this contract is explicitly enforced. Moreover, it allows to execute the setup logic in the RpcEndpoint's main thread. --- .../org/apache/flink/runtime/rpc/RpcEndpoint.java | 18 +++- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 105 +++++++++++++++------ .../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 68 +++++++++++++ 3 files changed, 160 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 13a4d65..97d56dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -120,16 +120,26 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { * Starts the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready * to process remote procedure calls. * - * <p>IMPORTANT: Whenever you override this method, call the parent implementation to enable - * rpc processing. It is advised to make the parent call last. - * * @throws Exception indicating that something went wrong while starting the RPC endpoint */ - public void start() throws Exception { + public void start() { rpcServer.start(); } /** + * User overridable callback. + * + * <p>This method is called when the RpcEndpoint is being started. The method is guaranteed + * to be executed in the main thread context and can be used to start the rpc endpoint in the + * context of the rpc endpoint's main thread. + * + * <p>IMPORTANT: This method should never be called directly by the user. + * @throws Exception indicating that the rpc endpoint could not be started. If an exception occurs, + * then the rpc endpoint will automatically terminate. + */ + public void onStart() throws Exception {} + + /** * Stops the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is * no longer ready to process remote procedure calls. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 0ba2b5a..6dd9a22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.messages.RpcInvocation; import org.apache.flink.runtime.rpc.messages.RunAsync; import org.apache.flink.types.Either; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import akka.actor.ActorRef; @@ -53,6 +54,7 @@ import java.lang.reflect.Method; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import scala.concurrent.duration.FiniteDuration; import scala.concurrent.impl.Promise; @@ -93,12 +95,13 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { private final long maximumFramesize; + private final AtomicBoolean rpcEndpointStopped; + + private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult; + @Nonnull private State state; - @Nullable - private CompletableFuture<Void> rpcEndpointTerminationFuture; - AkkaRpcActor( final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture, @@ -111,27 +114,21 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { this.terminationFuture = checkNotNull(terminationFuture); this.version = version; this.maximumFramesize = maximumFramesize; + this.rpcEndpointStopped = new AtomicBoolean(false); + this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure( + new AkkaRpcException( + String.format("RpcEndpoint %s has not been properly stopped.", rpcEndpoint.getEndpointId()))); this.state = StoppedState.INSTANCE; - this.rpcEndpointTerminationFuture = null; } @Override public void postStop() throws Exception { super.postStop(); - if (rpcEndpointTerminationFuture != null && rpcEndpointTerminationFuture.isDone()) { - rpcEndpointTerminationFuture.whenComplete( - (Void value, Throwable throwable) -> { - if (throwable != null) { - terminationFuture.completeExceptionally(throwable); - } else { - terminationFuture.complete(null); - } - }); + if (rpcEndpointTerminationResult.isSuccess()) { + terminationFuture.complete(null); } else { - terminationFuture.completeExceptionally( - new AkkaRpcException( - String.format("RpcEndpoint %s has not been properly stopped.", rpcEndpoint.getEndpointId()))); + terminationFuture.completeExceptionally(rpcEndpointTerminationResult.getFailureCause()); } state = state.finishTermination(); @@ -164,7 +161,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { private void handleControlMessage(ControlMessages controlMessage) { switch (controlMessage) { case START: - state = state.start(); + state = state.start(this); break; case STOP: state = state.stop(); @@ -439,8 +436,11 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { /** * Stop the actor immediately. */ - private void stop() { - getContext().stop(getSelf()); + private void stop(RpcEndpointTerminationResult rpcEndpointTerminationResult) { + if (rpcEndpointStopped.compareAndSet(false, true)) { + this.rpcEndpointTerminationResult = rpcEndpointTerminationResult; + getContext().stop(getSelf()); + } } // --------------------------------------------------------------------------- @@ -448,7 +448,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { // --------------------------------------------------------------------------- interface State { - default State start() { + default State start(AkkaRpcActor<?> akkaRpcActor) { throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.INSTANCE)); } @@ -478,7 +478,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { INSTANCE; @Override - public State start() { + public State start(AkkaRpcActor<?> akkaRpcActor) { return INSTANCE; } @@ -491,10 +491,11 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { public State terminate(AkkaRpcActor<?> akkaRpcActor) { akkaRpcActor.mainThreadValidator.enterMainThread(); + CompletableFuture<Void> terminationFuture; try { - akkaRpcActor.rpcEndpointTerminationFuture = akkaRpcActor.rpcEndpoint.onStop(); + terminationFuture = akkaRpcActor.rpcEndpoint.onStop(); } catch (Throwable t) { - akkaRpcActor.rpcEndpointTerminationFuture = FutureUtils.completedExceptionally( + terminationFuture = FutureUtils.completedExceptionally( new AkkaRpcException( String.format("Failure while stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()), t)); @@ -507,7 +508,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { // future. // Complete the termination future so that others know that we've stopped. - akkaRpcActor.rpcEndpointTerminationFuture.whenComplete((ignored, throwable) -> akkaRpcActor.stop()); + terminationFuture.whenComplete((ignored, throwable) -> akkaRpcActor.stop(RpcEndpointTerminationResult.of(throwable))); return TerminatingState.INSTANCE; } @@ -523,7 +524,21 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { INSTANCE; @Override - public State start() { + public State start(AkkaRpcActor<?> akkaRpcActor) { + akkaRpcActor.mainThreadValidator.enterMainThread(); + + try { + akkaRpcActor.rpcEndpoint.onStart(); + } catch (Throwable throwable) { + akkaRpcActor.stop( + RpcEndpointTerminationResult.failure( + new AkkaRpcException( + String.format("Could not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()), + throwable))); + } finally { + akkaRpcActor.mainThreadValidator.exitMainThread(); + } + return StartedState.INSTANCE; } @@ -534,8 +549,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { @Override public State terminate(AkkaRpcActor<?> akkaRpcActor) { - akkaRpcActor.rpcEndpointTerminationFuture = CompletableFuture.completedFuture(null); - akkaRpcActor.stop(); + akkaRpcActor.stop(RpcEndpointTerminationResult.success()); return TerminatingState.INSTANCE; } @@ -554,4 +568,41 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { enum TerminatedState implements State { INSTANCE } + + private static final class RpcEndpointTerminationResult { + + private static final RpcEndpointTerminationResult SUCCESS = new RpcEndpointTerminationResult(null); + + @Nullable + private final Throwable failureCause; + + private RpcEndpointTerminationResult(@Nullable Throwable failureCause) { + this.failureCause = failureCause; + } + + public boolean isSuccess() { + return failureCause == null; + } + + public Throwable getFailureCause() { + Preconditions.checkState(failureCause != null); + return failureCause; + } + + private static RpcEndpointTerminationResult success() { + return SUCCESS; + } + + private static RpcEndpointTerminationResult failure(Throwable failureCause) { + return new RpcEndpointTerminationResult(failureCause); + } + + private static RpcEndpointTerminationResult of(@Nullable Throwable failureCause) { + if (failureCause == null) { + return success(); + } else { + return failure(failureCause); + } + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index cf05b83..c010810 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -41,7 +42,10 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import javax.annotation.Nullable; + import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -358,6 +362,41 @@ public class AkkaRpcActorTest extends TestLogger { } } + /** + * Tests that the {@link RpcEndpoint#onStart()} method is called when the {@link RpcEndpoint} + * is started. + */ + @Test + public void testOnStartIsCalledWhenRpcEndpointStarts() throws Exception { + final OnStartEndpoint onStartEndpoint = new OnStartEndpoint(akkaRpcService, null); + + try { + onStartEndpoint.start(); + onStartEndpoint.awaitUntilOnStartCalled(); + } finally { + RpcUtils.terminateRpcEndpoint(onStartEndpoint, timeout); + } + } + + /** + * Tests that if onStart fails, then the endpoint terminates. + */ + @Test + public void testOnStartFails() throws Exception { + final FlinkException testException = new FlinkException("Test exception"); + final OnStartEndpoint onStartEndpoint = new OnStartEndpoint(akkaRpcService, testException); + + onStartEndpoint.start(); + onStartEndpoint.awaitUntilOnStartCalled(); + + try { + onStartEndpoint.getTerminationFuture().get(); + fail("Expected that the rpc endpoint failed onStart and thus has terminated."); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.findThrowable(ee, exception -> exception.equals(testException)).isPresent(), is(true)); + } + } + // ------------------------------------------------------------------------ // Test Actors and Interfaces // ------------------------------------------------------------------------ @@ -543,4 +582,33 @@ public class AkkaRpcActorTest extends TestLogger { return asyncOperationCounter.get(); } } + + // ------------------------------------------------------------------------ + + private static final class OnStartEndpoint extends RpcEndpoint { + + private final CountDownLatch countDownLatch; + + @Nullable + private final Exception exception; + + OnStartEndpoint(RpcService rpcService, @Nullable Exception exception) { + super(rpcService); + this.countDownLatch = new CountDownLatch(1); + this.exception = exception; + // remove this endpoint from the rpc service once it terminates (normally or exceptionally) + getTerminationFuture().whenComplete((aVoid, throwable) -> closeAsync()); + } + + @Override + public void onStart() throws Exception { + countDownLatch.countDown(); + + ExceptionUtils.tryRethrowException(exception); + } + + public void awaitUntilOnStartCalled() throws InterruptedException { + countDownLatch.await(); + } + } }
