Repository: flink
Updated Branches:
  refs/heads/master af3ea8103 -> 16ec3d7ea


[FLINK-8664] [rest] Change RpcEndpoint#TerminationFuture value type to Void

This closes #5496.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c131546e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c131546e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c131546e

Branch: refs/heads/master
Commit: c131546eaadd07baf950bd6a44d07ee42d109e4c
Parents: c27e2a7
Author: Till Rohrmann <[email protected]>
Authored: Thu Feb 15 18:43:39 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Feb 23 18:22:06 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java   | 2 +-
 .../org/apache/flink/runtime/jobmaster/JobManagerRunner.java | 4 ++--
 .../java/org/apache/flink/runtime/jobmaster/JobMaster.java   | 2 +-
 .../main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java  | 2 +-
 .../main/java/org/apache/flink/runtime/rpc/RpcServer.java    | 2 +-
 .../apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java | 6 +++---
 .../org/apache/flink/runtime/rpc/akka/AkkaRpcService.java    | 2 +-
 .../flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java  | 2 +-
 .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 2 +-
 .../apache/flink/runtime/dispatcher/MiniDispatcherTest.java  | 2 +-
 .../org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java  | 8 ++++----
 11 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f347d05..8e4f936 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -213,7 +213,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
 
                        // TODO: Make shutDownAndTerminate non blocking to not 
use the global executor
                        dispatcher.getTerminationFuture().whenCompleteAsync(
-                               (Boolean success, Throwable throwable) -> {
+                               (Void value, Throwable throwable) -> {
                                        if (throwable != null) {
                                                LOG.info("Could not properly 
terminate the Dispatcher.", throwable);
                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 5740bd7..4269243 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -225,10 +225,10 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
 
                                jobManager.shutDown();
 
-                               final CompletableFuture<Boolean> 
jobManagerTerminationFuture = jobManager.getTerminationFuture();
+                               final CompletableFuture<Void> 
jobManagerTerminationFuture = jobManager.getTerminationFuture();
 
                                jobManagerTerminationFuture.whenComplete(
-                                       (Boolean ignored, Throwable throwable) 
-> {
+                                       (Void ignored, Throwable throwable) -> {
                                                try {
                                                        
leaderElectionService.stop();
                                                } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index dd2a7ea..015751b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -407,7 +407,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                // shut down will internally release all registered slots
                slotPool.shutDown();
-               CompletableFuture<Boolean> terminationFuture = 
slotPool.getTerminationFuture();
+               CompletableFuture<Void> terminationFuture = 
slotPool.getTerminationFuture();
 
                Exception exception = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 9c27c95..9c2ed83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -228,7 +228,7 @@ public abstract class RpcEndpoint implements RpcGateway {
         *
         * @return Future which is completed when the rpc endpoint has been 
terminated.
         */
-       public CompletableFuture<Boolean> getTerminationFuture() {
+       public CompletableFuture<Void> getTerminationFuture() {
                return rpcServer.getTerminationFuture();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
index ac2f7eb..14d0cc9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
@@ -30,5 +30,5 @@ public interface RpcServer extends StartStoppable, 
MainThreadExecutable, RpcGate
         *
         * @return Future indicating when the rpc endpoint has been terminated
         */
-       CompletableFuture<Boolean> getTerminationFuture();
+       CompletableFuture<Void> getTerminationFuture();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 863b780..cc54f2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -84,7 +84,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaBasedEndpoint, Rpc
 
        // null if gateway; otherwise non-null
        @Nullable
-       private final CompletableFuture<Boolean> terminationFuture;
+       private final CompletableFuture<Void> terminationFuture;
 
        AkkaInvocationHandler(
                        String address,
@@ -92,7 +92,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaBasedEndpoint, Rpc
                        ActorRef rpcEndpoint,
                        Time timeout,
                        long maximumFramesize,
-                       @Nullable CompletableFuture<Boolean> terminationFuture) 
{
+                       @Nullable CompletableFuture<Void> terminationFuture) {
 
                this.address = Preconditions.checkNotNull(address);
                this.hostname = Preconditions.checkNotNull(hostname);
@@ -341,7 +341,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaBasedEndpoint, Rpc
        }
 
        @Override
-       public CompletableFuture<Boolean> getTerminationFuture() {
+       public CompletableFuture<Void> getTerminationFuture() {
                return terminationFuture;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index a65fe46..8e96492 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -195,7 +195,7 @@ public class AkkaRpcService implements RpcService {
        public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C 
rpcEndpoint) {
                checkNotNull(rpcEndpoint, "rpc endpoint");
 
-               CompletableFuture<Boolean> terminationFuture = new 
CompletableFuture<>();
+               CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
                final Props akkaRpcActorProps;
 
                if (rpcEndpoint instanceof FencedRpcEndpoint) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
index 3ca75e2..564b1ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
@@ -60,7 +60,7 @@ public class FencedAkkaInvocationHandler<F extends 
Serializable> extends AkkaInv
                        ActorRef rpcEndpoint,
                        Time timeout,
                        long maximumFramesize,
-                       @Nullable CompletableFuture<Boolean> terminationFuture,
+                       @Nullable CompletableFuture<Void> terminationFuture,
                        Supplier<F> fencingTokenSupplier) {
                super(address, hostname, rpcEndpoint, timeout, 
maximumFramesize, terminationFuture);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 4620585..2de1be8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -198,7 +198,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
        }
 
        // export the termination future for caller to know it is terminated
-       public CompletableFuture<Boolean> getTerminationFuture() {
+       public CompletableFuture<Void> getTerminationFuture() {
                return taskManager.getTerminationFuture();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 4291ef2..2b98939 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -222,7 +222,7 @@ public class MiniDispatcherTest extends TestLogger {
 
                        resultFuture.complete(archivedExecutionGraph);
 
-                       final CompletableFuture<Boolean> terminationFuture = 
miniDispatcher.getTerminationFuture();
+                       final CompletableFuture<Void> terminationFuture = 
miniDispatcher.getTerminationFuture();
 
                        assertThat(terminationFuture.isDone(), is(false));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c131546e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 1b45006..2a65cac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -185,7 +185,7 @@ public class AkkaRpcActorTest extends TestLogger {
                final DummyRpcEndpoint rpcEndpoint = new 
DummyRpcEndpoint(akkaRpcService);
                rpcEndpoint.start();
 
-               CompletableFuture<Boolean> terminationFuture = 
rpcEndpoint.getTerminationFuture();
+               CompletableFuture<Void> terminationFuture = 
rpcEndpoint.getTerminationFuture();
 
                assertFalse(terminationFuture.isDone());
 
@@ -246,7 +246,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
                rpcEndpoint.shutDown();
 
-               CompletableFuture<Boolean> terminationFuture = 
rpcEndpoint.getTerminationFuture();
+               CompletableFuture<Void> terminationFuture = 
rpcEndpoint.getTerminationFuture();
 
                try {
                        terminationFuture.get();
@@ -265,7 +265,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
                simpleRpcEndpoint.shutDown();
 
-               CompletableFuture<Boolean> terminationFuture = 
simpleRpcEndpoint.getTerminationFuture();
+               CompletableFuture<Void> terminationFuture = 
simpleRpcEndpoint.getTerminationFuture();
 
                // check that we executed the postStop method in the main 
thread, otherwise an exception
                // would be thrown here.
@@ -285,7 +285,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
                        rpcEndpoint.start();
 
-                       CompletableFuture<Boolean> terminationFuture = 
rpcEndpoint.getTerminationFuture();
+                       CompletableFuture<Void> terminationFuture = 
rpcEndpoint.getTerminationFuture();
 
                        rpcService.stopService();
 

Reply via email to