[FLINK-8665] [rest] Let RpcEndpoint#postStop return completion future The RpcEndpoint#postStop method returns a CompletableFuture<Void> which is completed once all post stop actions have completed. The termination future of the respective RpcEndpoint is only completed afterwards.
This closes #5498. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9b28e81 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9b28e81 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9b28e81 Branch: refs/heads/master Commit: d9b28e817351eb2eb6b4cdd9597061713d9160e8 Parents: bb306b9 Author: Till Rohrmann <[email protected]> Authored: Thu Feb 15 19:19:48 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Feb 23 18:22:08 2018 +0100 ---------------------------------------------------------------------- .../clusterframework/MesosResourceManager.java | 22 +---- .../flink/runtime/dispatcher/Dispatcher.java | 24 ++--- .../flink/runtime/jobmaster/JobMaster.java | 28 +----- .../runtime/jobmaster/slotpool/SlotPool.java | 4 +- .../resourcemanager/ResourceManager.java | 13 +-- .../flink/runtime/rpc/FencedRpcEndpoint.java | 2 +- .../apache/flink/runtime/rpc/RpcEndpoint.java | 5 +- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 21 +++-- .../runtime/taskexecutor/TaskExecutor.java | 14 +-- .../flink/runtime/rpc/AsyncCallsTest.java | 10 ++ .../runtime/rpc/FencedRpcEndpointTest.java | 5 + .../flink/runtime/rpc/RpcEndpointTest.java | 5 + .../runtime/rpc/akka/AkkaRpcActorTest.java | 98 ++++++++++++++++---- .../rpc/akka/MainThreadValidationTest.java | 7 ++ .../rpc/akka/MessageSerializationTest.java | 5 + .../retriever/impl/RpcGatewayRetrieverTest.java | 5 + .../apache/flink/yarn/YarnResourceManager.java | 15 +-- 17 files changed, 170 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index d42d8d1..1f58b11 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -333,8 +333,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN } @Override - public void postStop() throws Exception { - Exception exception = null; + public CompletableFuture<Void> postStop() { FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS); CompletableFuture<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout); @@ -355,22 +354,11 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN stopLaunchCoordinatorFuture, stopReconciliationCoordinatorFuture); - // wait for the future to complete or to time out - try { - stopFuture.get(); - } catch (Exception e) { - exception = e; - } - - try { - super.postStop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + final CompletableFuture<Void> terminationFuture = super.postStop(); - if (exception != null) { - throw new ResourceManagerException("Could not properly shut down the ResourceManager.", exception); - } + return stopFuture.thenCombine( + terminationFuture, + (Void voidA, Void voidB) -> null); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index b2d2b6a..e212752 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -156,40 +156,40 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme //------------------------------------------------------ @Override - public void postStop() throws Exception { + public CompletableFuture<Void> postStop() { log.info("Stopping dispatcher {}.", getAddress()); - Throwable exception = null; - - clearState(); + Exception exception = null; try { - jobManagerSharedServices.shutdown(); - } catch (Throwable t) { - exception = ExceptionUtils.firstOrSuppressed(t, exception); + clearState(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); } try { - submittedJobGraphStore.stop(); + jobManagerSharedServices.shutdown(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } try { - leaderElectionService.stop(); + submittedJobGraphStore.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } try { - super.postStop(); + leaderElectionService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } if (exception != null) { - throw new FlinkException("Could not properly terminate the Dispatcher.", exception); + return FutureUtils.completedExceptionally( + new FlinkException("Could not properly terminate the Dispatcher.", exception)); + } else { + return CompletableFuture.completedFuture(null); } - log.info("Stopped dispatcher {}.", getAddress()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 015751b..425f241 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -99,7 +99,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.clock.SystemClock; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -123,7 +122,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -387,8 +385,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast * Suspend the job and shutdown all other services including rpc. */ @Override - public void postStop() throws Exception { - log.info("Stopping the JobMaster for job " + jobGraph.getName() + '(' + jobGraph.getJobID() + ")."); + public CompletableFuture<Void> postStop() { + log.info("Stopping the JobMaster for job {}({}).", jobGraph.getName(), jobGraph.getJobID()); // disconnect from all registered TaskExecutors final Set<ResourceID> taskManagerResourceIds = new HashSet<>(registeredTaskManagers.keySet()); @@ -407,28 +405,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast // shut down will internally release all registered slots slotPool.shutDown(); - CompletableFuture<Void> terminationFuture = slotPool.getTerminationFuture(); - Exception exception = null; - - // wait for the slot pool shut down - try { - terminationFuture.get(rpcTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - } catch (Exception e) { - exception = e; - } - - try { - super.postStop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - throw exception; - } - - log.info("Stopped the JobMaster for job " + jobGraph.getName() + '(' + jobGraph.getJobID() + ")."); + return slotPool.getTerminationFuture(); } //---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index a94a107..6ba9e8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -199,7 +199,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS } @Override - public void postStop() throws Exception { + public CompletableFuture<Void> postStop() { // cancel all pending allocations Set<AllocationID> allocationIds = pendingRequests.keySetB(); @@ -214,7 +214,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS clear(); - super.postStop(); + return CompletableFuture.completedFuture(null); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 2d633f1..77e4362 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -213,7 +213,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> } @Override - public void postStop() throws Exception { + public CompletableFuture<Void> postStop() { Exception exception = null; taskManagerHeartbeatManager.stop(); @@ -240,14 +240,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> clearState(); - try { - super.postStop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - if (exception != null) { - ExceptionUtils.rethrowException(exception, "Error while shutting the ResourceManager down."); + return FutureUtils.completedExceptionally( + new FlinkException("Could not properly shut down the ResourceManager.", exception)); + } else { + return CompletableFuture.completedFuture(null); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java index ff74f47..d078d58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java @@ -35,7 +35,7 @@ import java.util.concurrent.CompletableFuture; * * @param <F> type of the fencing token */ -public class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint { +public abstract class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint { private volatile F fencingToken; private volatile MainThreadExecutor fencedMainThreadExecutor; http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index d269e84..549e5c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -141,9 +141,10 @@ public abstract class RpcEndpoint implements RpcGateway { * * <p>IMPORTANT: This method should never be called directly by the user. * - * @throws Exception if an error occurs. The exception is returned as result of the termination future. + * @return Future which is completed once all post stop actions are completed. If an error + * occurs this future is completed exceptionally */ - public void postStop() throws Exception {} + public abstract CompletableFuture<Void> postStop(); /** * Triggers the shut down of the rpc endpoint. The shut down is executed asynchronously. http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index da7ce35..a7d15d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rpc.akka; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; @@ -90,12 +91,11 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { mainThreadValidator.enterMainThread(); try { - Throwable shutdownThrowable = null; - + CompletableFuture<Void> postStopFuture; try { - rpcEndpoint.postStop(); + postStopFuture = rpcEndpoint.postStop(); } catch (Throwable throwable) { - shutdownThrowable = throwable; + postStopFuture = FutureUtils.completedExceptionally(throwable); } super.postStop(); @@ -105,11 +105,14 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { // future. // Complete the termination future so that others know that we've stopped. - if (shutdownThrowable != null) { - terminationFuture.completeExceptionally(shutdownThrowable); - } else { - terminationFuture.complete(null); - } + postStopFuture.whenComplete( + (Void value, Throwable throwable) -> { + if (throwable != null) { + terminationFuture.completeExceptionally(throwable); + } else { + terminationFuture.complete(null); + } + }); } finally { mainThreadValidator.exitMainThread(); } http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index d880407..f4c953d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -254,7 +254,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { * Called to shut down the TaskManager. The method closes all TaskManager services. */ @Override - public void postStop() throws Exception { + public CompletableFuture<Void> postStop() { log.info("Stopping TaskManager {}.", getAddress()); Throwable throwable = null; @@ -281,17 +281,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { throwable = ExceptionUtils.firstOrSuppressed(t, throwable); } - try { - super.postStop(); - } catch (Throwable e) { - throwable = ExceptionUtils.firstOrSuppressed(e, throwable); - } - if (throwable != null) { - ExceptionUtils.rethrowException(throwable, "Error while shutting the TaskExecutor down."); + return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable)); + } else { + return CompletableFuture.completedFuture(null); } - - log.info("Stopped TaskManager {}.", getAddress()); } // ====================================================================== http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 8ba0ccd..66f8d9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -340,6 +340,11 @@ public class AsyncCallsTest extends TestLogger { public boolean hasConcurrentAccess() { return concurrentAccess; } + + @Override + public CompletableFuture<Void> postStop() { + return CompletableFuture.completedFuture(null); + } } public interface FencedTestGateway extends FencedRpcGateway<UUID> { @@ -384,5 +389,10 @@ public class AsyncCallsTest extends TestLogger { return CompletableFuture.completedFuture(Acknowledge.get()); } + + @Override + public CompletableFuture<Void> postStop() { + return CompletableFuture.completedFuture(null); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java index 3d99c3f..f488308 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java @@ -300,6 +300,11 @@ public class FencedRpcEndpointTest extends TestLogger { this(rpcService, value, null); } + @Override + public CompletableFuture<Void> postStop() { + return CompletableFuture.completedFuture(null); + } + protected FencedTestingEndpoint(RpcService rpcService, String value, UUID initialFencingToken) { super(rpcService); http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java index d52aadb..b5add60 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java @@ -163,6 +163,11 @@ public class RpcEndpointTest extends TestLogger { public CompletableFuture<Integer> foobar() { return CompletableFuture.completedFuture(foobarValue); } + + @Override + public CompletableFuture<Void> postStop() { + return CompletableFuture.completedFuture(null); + } } public static class ExtendedEndpoint extends BaseEndpoint implements ExtendedGateway, DifferentGateway { http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 2a65cac..2530bce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -18,26 +18,28 @@ package org.apache.flink.runtime.rpc.akka; -import akka.actor.ActorSystem; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; -import akka.actor.Terminated; +import akka.actor.ActorSystem; import org.hamcrest.core.Is; import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -56,21 +58,19 @@ public class AkkaRpcActorTest extends TestLogger { // shared test members // ------------------------------------------------------------------------ - private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); - private static Time timeout = Time.milliseconds(10000L); - private static AkkaRpcService akkaRpcService = - new AkkaRpcService(actorSystem, timeout); + private static AkkaRpcService akkaRpcService; + + + @BeforeClass + public static void setup() { + akkaRpcService = new TestingRpcService(); + } @AfterClass public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { - final CompletableFuture<Void> rpcTerminationFuture = akkaRpcService.stopService(); - final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); - - FutureUtils - .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) - .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + RpcUtils.terminateRpcService(akkaRpcService, timeout); } /** @@ -191,7 +191,7 @@ public class AkkaRpcActorTest extends TestLogger { CompletableFuture.runAsync( () -> rpcEndpoint.shutDown(), - actorSystem.dispatcher()); + akkaRpcService.getExecutor()); // wait until the rpc endpoint has terminated terminationFuture.get(); @@ -296,6 +296,33 @@ public class AkkaRpcActorTest extends TestLogger { } } + /** + * Tests that the {@link AkkaRpcActor} only completes after the asynchronous + * post stop action has completed. + */ + @Test + public void testActorTerminationWithAsynchronousPostStopAction() throws Exception { + final CompletableFuture<Void> postStopFuture = new CompletableFuture<>(); + final AsynchronousPostStopEndpoint endpoint = new AsynchronousPostStopEndpoint(akkaRpcService, postStopFuture); + + try { + endpoint.start(); + + final CompletableFuture<Void> terminationFuture = endpoint.getTerminationFuture(); + + endpoint.shutDown(); + + assertFalse(terminationFuture.isDone()); + + postStopFuture.complete(null); + + // the postStopFuture completion should allow the endpoint to terminate + terminationFuture.get(); + } finally { + RpcUtils.terminateRpcEndpoint(endpoint, timeout); + } + } + // ------------------------------------------------------------------------ // Test Actors and Interfaces // ------------------------------------------------------------------------ @@ -309,7 +336,19 @@ public class AkkaRpcActorTest extends TestLogger { void tell(String message); } - private static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway { + private static class TestRpcEndpoint extends RpcEndpoint { + + protected TestRpcEndpoint(RpcService rpcService) { + super(rpcService); + } + + @Override + public CompletableFuture<Void> postStop() { + return CompletableFuture.completedFuture(null); + } + } + + private static class DummyRpcEndpoint extends TestRpcEndpoint implements DummyRpcGateway { private volatile int _foobar = 42; @@ -333,7 +372,7 @@ public class AkkaRpcActorTest extends TestLogger { CompletableFuture<Integer> doStuff(); } - private static class ExceptionalEndpoint extends RpcEndpoint implements ExceptionalGateway { + private static class ExceptionalEndpoint extends TestRpcEndpoint implements ExceptionalGateway { protected ExceptionalEndpoint(RpcService rpcService) { super(rpcService); @@ -345,7 +384,7 @@ public class AkkaRpcActorTest extends TestLogger { } } - private static class ExceptionalFutureEndpoint extends RpcEndpoint implements ExceptionalGateway { + private static class ExceptionalFutureEndpoint extends TestRpcEndpoint implements ExceptionalGateway { protected ExceptionalFutureEndpoint(RpcService rpcService) { super(rpcService); @@ -379,8 +418,9 @@ public class AkkaRpcActorTest extends TestLogger { } @Override - public void postStop() { + public CompletableFuture<Void> postStop() { validateRunsInMainThread(); + return CompletableFuture.completedFuture(null); } } @@ -393,8 +433,8 @@ public class AkkaRpcActorTest extends TestLogger { } @Override - public void postStop() throws Exception { - throw new PostStopException("Test exception."); + public CompletableFuture<Void> postStop() { + return FutureUtils.completedExceptionally(new PostStopException("Test exception.")); } private static class PostStopException extends FlinkException { @@ -406,4 +446,22 @@ public class AkkaRpcActorTest extends TestLogger { } } } + + // ------------------------------------------------------------------------ + + private static class AsynchronousPostStopEndpoint extends RpcEndpoint { + + private final CompletableFuture<Void> postStopFuture; + + protected AsynchronousPostStopEndpoint(RpcService rpcService, CompletableFuture<Void> postStopFuture) { + super(rpcService); + + this.postStopFuture = Preconditions.checkNotNull(postStopFuture); + } + + @Override + public CompletableFuture<Void> postStop() { + return postStopFuture; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java index a69bd84..6dacdfd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java @@ -30,6 +30,8 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.concurrent.CompletableFuture; + import static org.junit.Assert.assertTrue; @Category(Flip6.class) @@ -91,6 +93,11 @@ public class MainThreadValidationTest extends TestLogger { } @Override + public CompletableFuture<Void> postStop() { + return CompletableFuture.completedFuture(null); + } + + @Override public void someConcurrencyCriticalFunction() { validateRunsInMainThread(); } http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java index 061145c..6006850 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java @@ -193,6 +193,11 @@ public class MessageSerializationTest extends TestLogger { public void foobar(Object object) throws InterruptedException { queue.put(object); } + + @Override + public CompletableFuture<Void> postStop() { + return CompletableFuture.completedFuture(null); + } } private static class NonSerializableObject { http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java index ae530f7..5f59d59 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java @@ -138,5 +138,10 @@ public class RpcGatewayRetrieverTest extends TestLogger { public UUID getFencingToken() { return HighAvailabilityServices.DEFAULT_LEADER_ID; } + + @Override + public CompletableFuture<Void> postStop() { + return CompletableFuture.completedFuture(null); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 87324cb..5380356 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -36,6 +37,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -56,6 +58,7 @@ import javax.annotation.Nullable; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -218,7 +221,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme } @Override - public void postStop() throws Exception { + public CompletableFuture<Void> postStop() { // shut down all components Throwable firstException = null; @@ -238,14 +241,12 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme } } - try { - super.postStop(); - } catch (Throwable t) { - firstException = ExceptionUtils.firstOrSuppressed(t, firstException); - } + final CompletableFuture<Void> terminationFuture = super.postStop(); if (firstException != null) { - ExceptionUtils.rethrowException(firstException, "Error while shutting down YARN resource manager"); + return FutureUtils.completedExceptionally(new FlinkException("Error while shutting down YARN resource manager", firstException)); + } else { + return terminationFuture; } }
