Repository: flink Updated Branches: refs/heads/master af3ea8103 -> 16ec3d7ea
[FLINK-8664] [rest] Change RpcEndpoint#TerminationFuture value type to Void This closes #5496. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c131546e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c131546e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c131546e Branch: refs/heads/master Commit: c131546eaadd07baf950bd6a44d07ee42d109e4c Parents: c27e2a7 Author: Till Rohrmann <[email protected]> Authored: Thu Feb 15 18:43:39 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Feb 23 18:22:06 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 2 +- .../org/apache/flink/runtime/jobmaster/JobManagerRunner.java | 4 ++-- .../java/org/apache/flink/runtime/jobmaster/JobMaster.java | 2 +- .../main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java | 2 +- .../main/java/org/apache/flink/runtime/rpc/RpcServer.java | 2 +- .../apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java | 6 +++--- .../org/apache/flink/runtime/rpc/akka/AkkaRpcService.java | 2 +- .../flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java | 2 +- .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 2 +- .../apache/flink/runtime/dispatcher/MiniDispatcherTest.java | 2 +- .../org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java | 8 ++++---- 11 files changed, 17 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index f347d05..8e4f936 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -213,7 +213,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { // TODO: Make shutDownAndTerminate non blocking to not use the global executor dispatcher.getTerminationFuture().whenCompleteAsync( - (Boolean success, Throwable throwable) -> { + (Void value, Throwable throwable) -> { if (throwable != null) { LOG.info("Could not properly terminate the Dispatcher.", throwable); } http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 5740bd7..4269243 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -225,10 +225,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F jobManager.shutDown(); - final CompletableFuture<Boolean> jobManagerTerminationFuture = jobManager.getTerminationFuture(); + final CompletableFuture<Void> jobManagerTerminationFuture = jobManager.getTerminationFuture(); jobManagerTerminationFuture.whenComplete( - (Boolean ignored, Throwable throwable) -> { + (Void ignored, Throwable throwable) -> { try { leaderElectionService.stop(); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index dd2a7ea..015751b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -407,7 +407,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast // shut down will internally release all registered slots slotPool.shutDown(); - CompletableFuture<Boolean> terminationFuture = slotPool.getTerminationFuture(); + CompletableFuture<Void> terminationFuture = slotPool.getTerminationFuture(); Exception exception = null; http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 9c27c95..9c2ed83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -228,7 +228,7 @@ public abstract class RpcEndpoint implements RpcGateway { * * @return Future which is completed when the rpc endpoint has been terminated. */ - public CompletableFuture<Boolean> getTerminationFuture() { + public CompletableFuture<Void> getTerminationFuture() { return rpcServer.getTerminationFuture(); } http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java index ac2f7eb..14d0cc9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java @@ -30,5 +30,5 @@ public interface RpcServer extends StartStoppable, MainThreadExecutable, RpcGate * * @return Future indicating when the rpc endpoint has been terminated */ - CompletableFuture<Boolean> getTerminationFuture(); + CompletableFuture<Void> getTerminationFuture(); } http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 863b780..cc54f2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -84,7 +84,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc // null if gateway; otherwise non-null @Nullable - private final CompletableFuture<Boolean> terminationFuture; + private final CompletableFuture<Void> terminationFuture; AkkaInvocationHandler( String address, @@ -92,7 +92,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc ActorRef rpcEndpoint, Time timeout, long maximumFramesize, - @Nullable CompletableFuture<Boolean> terminationFuture) { + @Nullable CompletableFuture<Void> terminationFuture) { this.address = Preconditions.checkNotNull(address); this.hostname = Preconditions.checkNotNull(hostname); @@ -341,7 +341,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc } @Override - public CompletableFuture<Boolean> getTerminationFuture() { + public CompletableFuture<Void> getTerminationFuture() { return terminationFuture; } } http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index a65fe46..8e96492 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -195,7 +195,7 @@ public class AkkaRpcService implements RpcService { public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); - CompletableFuture<Boolean> terminationFuture = new CompletableFuture<>(); + CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); final Props akkaRpcActorProps; if (rpcEndpoint instanceof FencedRpcEndpoint) { http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java index 3ca75e2..564b1ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java @@ -60,7 +60,7 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInv ActorRef rpcEndpoint, Time timeout, long maximumFramesize, - @Nullable CompletableFuture<Boolean> terminationFuture, + @Nullable CompletableFuture<Void> terminationFuture, Supplier<F> fencingTokenSupplier) { super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture); http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 4620585..2de1be8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -198,7 +198,7 @@ public class TaskManagerRunner implements FatalErrorHandler { } // export the termination future for caller to know it is terminated - public CompletableFuture<Boolean> getTerminationFuture() { + public CompletableFuture<Void> getTerminationFuture() { return taskManager.getTerminationFuture(); } http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 4291ef2..2b98939 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -222,7 +222,7 @@ public class MiniDispatcherTest extends TestLogger { resultFuture.complete(archivedExecutionGraph); - final CompletableFuture<Boolean> terminationFuture = miniDispatcher.getTerminationFuture(); + final CompletableFuture<Void> terminationFuture = miniDispatcher.getTerminationFuture(); assertThat(terminationFuture.isDone(), is(false)); http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 1b45006..2a65cac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -185,7 +185,7 @@ public class AkkaRpcActorTest extends TestLogger { final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService); rpcEndpoint.start(); - CompletableFuture<Boolean> terminationFuture = rpcEndpoint.getTerminationFuture(); + CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture(); assertFalse(terminationFuture.isDone()); @@ -246,7 +246,7 @@ public class AkkaRpcActorTest extends TestLogger { rpcEndpoint.shutDown(); - CompletableFuture<Boolean> terminationFuture = rpcEndpoint.getTerminationFuture(); + CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture(); try { terminationFuture.get(); @@ -265,7 +265,7 @@ public class AkkaRpcActorTest extends TestLogger { simpleRpcEndpoint.shutDown(); - CompletableFuture<Boolean> terminationFuture = simpleRpcEndpoint.getTerminationFuture(); + CompletableFuture<Void> terminationFuture = simpleRpcEndpoint.getTerminationFuture(); // check that we executed the postStop method in the main thread, otherwise an exception // would be thrown here. @@ -285,7 +285,7 @@ public class AkkaRpcActorTest extends TestLogger { rpcEndpoint.start(); - CompletableFuture<Boolean> terminationFuture = rpcEndpoint.getTerminationFuture(); + CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture(); rpcService.stopService();
