This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98baea31bcd9d7d02302c7a135ed20048482b322
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Feb 22 12:20:50 2019 +0100

    [FLINK-11718][rpc] Add onStart method to RpcEndpoint
    
    Add a dedicated onStart method to the RpcEndpoint which is called when the 
RpcEndpoint
    is started via the start() method. Due to this change it is no longer 
necessary for the
    user to override the start() method which is error prone because it always 
requires to
    call super.start(). Now this contract is explicitly enforced. Moreover, it 
allows to
    execute the setup logic in the RpcEndpoint's main thread.
---
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  |  18 +++-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       | 105 +++++++++++++++------
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   |  68 +++++++++++++
 3 files changed, 160 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 13a4d65..97d56dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -120,16 +120,26 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
         * Starts the rpc endpoint. This tells the underlying rpc server that 
the rpc endpoint is ready
         * to process remote procedure calls.
         *
-        * <p>IMPORTANT: Whenever you override this method, call the parent 
implementation to enable
-        * rpc processing. It is advised to make the parent call last.
-        *
         * @throws Exception indicating that something went wrong while 
starting the RPC endpoint
         */
-       public void start() throws Exception {
+       public void start() {
                rpcServer.start();
        }
 
        /**
+        * User overridable callback.
+        *
+        * <p>This method is called when the RpcEndpoint is being started. The 
method is guaranteed
+        * to be executed in the main thread context and can be used to start 
the rpc endpoint in the
+        * context of the rpc endpoint's main thread.
+        *
+        * <p>IMPORTANT: This method should never be called directly by the 
user.
+        * @throws Exception indicating that the rpc endpoint could not be 
started. If an exception occurs,
+        * then the rpc endpoint will automatically terminate.
+        */
+       public void onStart() throws Exception {}
+
+       /**
         * Stops the rpc endpoint. This tells the underlying rpc server that 
the rpc endpoint is
         * no longer ready to process remote procedure calls.
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 0ba2b5a..6dd9a22 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
 import akka.actor.ActorRef;
@@ -53,6 +54,7 @@ import java.lang.reflect.Method;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
@@ -93,12 +95,13 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
 
        private final long maximumFramesize;
 
+       private final AtomicBoolean rpcEndpointStopped;
+
+       private volatile RpcEndpointTerminationResult 
rpcEndpointTerminationResult;
+
        @Nonnull
        private State state;
 
-       @Nullable
-       private CompletableFuture<Void> rpcEndpointTerminationFuture;
-
        AkkaRpcActor(
                        final T rpcEndpoint,
                        final CompletableFuture<Boolean> terminationFuture,
@@ -111,27 +114,21 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
                this.terminationFuture = checkNotNull(terminationFuture);
                this.version = version;
                this.maximumFramesize = maximumFramesize;
+               this.rpcEndpointStopped = new AtomicBoolean(false);
+               this.rpcEndpointTerminationResult = 
RpcEndpointTerminationResult.failure(
+                       new AkkaRpcException(
+                               String.format("RpcEndpoint %s has not been 
properly stopped.", rpcEndpoint.getEndpointId())));
                this.state = StoppedState.INSTANCE;
-               this.rpcEndpointTerminationFuture = null;
        }
 
        @Override
        public void postStop() throws Exception {
                super.postStop();
 
-               if (rpcEndpointTerminationFuture != null && 
rpcEndpointTerminationFuture.isDone()) {
-                       rpcEndpointTerminationFuture.whenComplete(
-                               (Void value, Throwable throwable) -> {
-                                       if (throwable != null) {
-                                               
terminationFuture.completeExceptionally(throwable);
-                                       } else {
-                                               
terminationFuture.complete(null);
-                                       }
-                               });
+               if (rpcEndpointTerminationResult.isSuccess()) {
+                       terminationFuture.complete(null);
                } else {
-                       terminationFuture.completeExceptionally(
-                               new AkkaRpcException(
-                                       String.format("RpcEndpoint %s has not 
been properly stopped.", rpcEndpoint.getEndpointId())));
+                       
terminationFuture.completeExceptionally(rpcEndpointTerminationResult.getFailureCause());
                }
 
                state = state.finishTermination();
@@ -164,7 +161,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
        private void handleControlMessage(ControlMessages controlMessage) {
                switch (controlMessage) {
                        case START:
-                               state = state.start();
+                               state = state.start(this);
                                break;
                        case STOP:
                                state = state.stop();
@@ -439,8 +436,11 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
        /**
         * Stop the actor immediately.
         */
-       private void stop() {
-               getContext().stop(getSelf());
+       private void stop(RpcEndpointTerminationResult 
rpcEndpointTerminationResult) {
+               if (rpcEndpointStopped.compareAndSet(false, true)) {
+                       this.rpcEndpointTerminationResult = 
rpcEndpointTerminationResult;
+                       getContext().stop(getSelf());
+               }
        }
 
        // 
---------------------------------------------------------------------------
@@ -448,7 +448,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
        // 
---------------------------------------------------------------------------
 
        interface State {
-               default State start() {
+               default State start(AkkaRpcActor<?> akkaRpcActor) {
                        throw new 
AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.INSTANCE));
                }
 
@@ -478,7 +478,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
                INSTANCE;
 
                @Override
-               public State start() {
+               public State start(AkkaRpcActor<?> akkaRpcActor) {
                        return INSTANCE;
                }
 
@@ -491,10 +491,11 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
                public State terminate(AkkaRpcActor<?> akkaRpcActor) {
                        akkaRpcActor.mainThreadValidator.enterMainThread();
 
+                       CompletableFuture<Void> terminationFuture;
                        try {
-                               akkaRpcActor.rpcEndpointTerminationFuture = 
akkaRpcActor.rpcEndpoint.onStop();
+                               terminationFuture = 
akkaRpcActor.rpcEndpoint.onStop();
                        } catch (Throwable t) {
-                               akkaRpcActor.rpcEndpointTerminationFuture = 
FutureUtils.completedExceptionally(
+                               terminationFuture = 
FutureUtils.completedExceptionally(
                                        new AkkaRpcException(
                                                String.format("Failure while 
stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
                                                t));
@@ -507,7 +508,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
                        // future.
                        // Complete the termination future so that others know 
that we've stopped.
 
-                       
akkaRpcActor.rpcEndpointTerminationFuture.whenComplete((ignored, throwable) -> 
akkaRpcActor.stop());
+                       terminationFuture.whenComplete((ignored, throwable) -> 
akkaRpcActor.stop(RpcEndpointTerminationResult.of(throwable)));
 
                        return TerminatingState.INSTANCE;
                }
@@ -523,7 +524,21 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
                INSTANCE;
 
                @Override
-               public State start() {
+               public State start(AkkaRpcActor<?> akkaRpcActor) {
+                       akkaRpcActor.mainThreadValidator.enterMainThread();
+
+                       try {
+                               akkaRpcActor.rpcEndpoint.onStart();
+                       } catch (Throwable throwable) {
+                               akkaRpcActor.stop(
+                                       RpcEndpointTerminationResult.failure(
+                                               new AkkaRpcException(
+                                                       String.format("Could 
not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
+                                                       throwable)));
+                       } finally {
+                               
akkaRpcActor.mainThreadValidator.exitMainThread();
+                       }
+
                        return StartedState.INSTANCE;
                }
 
@@ -534,8 +549,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
 
                @Override
                public State terminate(AkkaRpcActor<?> akkaRpcActor) {
-                       akkaRpcActor.rpcEndpointTerminationFuture = 
CompletableFuture.completedFuture(null);
-                       akkaRpcActor.stop();
+                       
akkaRpcActor.stop(RpcEndpointTerminationResult.success());
 
                        return TerminatingState.INSTANCE;
                }
@@ -554,4 +568,41 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
        enum TerminatedState implements State {
                INSTANCE
        }
+
+       private static final class RpcEndpointTerminationResult {
+
+               private static final RpcEndpointTerminationResult SUCCESS = new 
RpcEndpointTerminationResult(null);
+
+               @Nullable
+               private final Throwable failureCause;
+
+               private RpcEndpointTerminationResult(@Nullable Throwable 
failureCause) {
+                       this.failureCause = failureCause;
+               }
+
+               public boolean isSuccess() {
+                       return failureCause == null;
+               }
+
+               public Throwable getFailureCause() {
+                       Preconditions.checkState(failureCause != null);
+                       return failureCause;
+               }
+
+               private static RpcEndpointTerminationResult success() {
+                       return SUCCESS;
+               }
+
+               private static RpcEndpointTerminationResult failure(Throwable 
failureCause) {
+                       return new RpcEndpointTerminationResult(failureCause);
+               }
+
+               private static RpcEndpointTerminationResult of(@Nullable 
Throwable failureCause) {
+                       if (failureCause == null) {
+                               return success();
+                       } else {
+                               return failure(failureCause);
+                       }
+               }
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index cf05b83..c010810 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -41,7 +42,10 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -358,6 +362,41 @@ public class AkkaRpcActorTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that the {@link RpcEndpoint#onStart()} method is called when 
the {@link RpcEndpoint}
+        * is started.
+        */
+       @Test
+       public void testOnStartIsCalledWhenRpcEndpointStarts() throws Exception 
{
+               final OnStartEndpoint onStartEndpoint = new 
OnStartEndpoint(akkaRpcService, null);
+
+               try {
+                       onStartEndpoint.start();
+                       onStartEndpoint.awaitUntilOnStartCalled();
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(onStartEndpoint, timeout);
+               }
+       }
+
+       /**
+        * Tests that if onStart fails, then the endpoint terminates.
+        */
+       @Test
+       public void testOnStartFails() throws Exception {
+               final FlinkException testException = new FlinkException("Test 
exception");
+               final OnStartEndpoint onStartEndpoint = new 
OnStartEndpoint(akkaRpcService, testException);
+
+               onStartEndpoint.start();
+               onStartEndpoint.awaitUntilOnStartCalled();
+
+               try {
+                       onStartEndpoint.getTerminationFuture().get();
+                       fail("Expected that the rpc endpoint failed onStart and 
thus has terminated.");
+               } catch (ExecutionException ee) {
+                       assertThat(ExceptionUtils.findThrowable(ee, exception 
-> exception.equals(testException)).isPresent(), is(true));
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Test Actors and Interfaces
        // 
------------------------------------------------------------------------
@@ -543,4 +582,33 @@ public class AkkaRpcActorTest extends TestLogger {
                        return asyncOperationCounter.get();
                }
        }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class OnStartEndpoint extends RpcEndpoint {
+
+               private final CountDownLatch countDownLatch;
+
+               @Nullable
+               private final Exception exception;
+
+               OnStartEndpoint(RpcService rpcService, @Nullable Exception 
exception) {
+                       super(rpcService);
+                       this.countDownLatch = new CountDownLatch(1);
+                       this.exception = exception;
+                       // remove this endpoint from the rpc service once it 
terminates (normally or exceptionally)
+                       getTerminationFuture().whenComplete((aVoid, throwable) 
-> closeAsync());
+               }
+
+               @Override
+               public void onStart() throws Exception {
+                       countDownLatch.countDown();
+
+                       ExceptionUtils.tryRethrowException(exception);
+               }
+
+               public void awaitUntilOnStartCalled() throws 
InterruptedException {
+                       countDownLatch.await();
+               }
+       }
 }

Reply via email to