Repository: flink
Updated Branches:
  refs/heads/master 5909b5bb7 -> 3d52f52e9


[FLINK-8675] Add non-blocking shut down method to RestServerEndpoint

Make shut down method of RestServerEndpoint non blocking.

This closes #5511.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fac5aff9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fac5aff9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fac5aff9

Branch: refs/heads/master
Commit: fac5aff979976b19b49a65243cfcc788dece8bcf
Parents: 62b6cea
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Feb 16 18:01:58 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 19 16:04:17 2018 +0100

----------------------------------------------------------------------
 .../program/rest/RestClusterClientTest.java     |   5 +-
 .../flink/runtime/concurrent/FutureUtils.java   |  15 +++
 .../runtime/entrypoint/ClusterEntrypoint.java   |   2 +-
 .../flink/runtime/rest/RestServerEndpoint.java  | 133 ++++++++++++-------
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  44 +++---
 .../runtime/rest/RestServerEndpointITCase.java  |   7 +-
 6 files changed, 137 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f54d9d2..5b1a3f8 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -596,8 +596,11 @@ public class RestClusterClientTest extends TestLogger {
                }
 
                @Override
+               protected void startInternal() throws Exception {}
+
+               @Override
                public void close() throws Exception {
-                       shutdown(Time.seconds(5));
+                       shutDownAsync().get();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 181bc5d..4a253b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -291,6 +292,20 @@ public class FutureUtils {
         *
         * @param future to wait for its completion
         * @param runnable action which is triggered after the future's 
completion
+        * @return Future which is completed after the action has completed. 
This future can contain an exception,
+        * if an error occurred in the given future or action.
+        */
+       public static CompletableFuture<Void> 
runAfterwardsAsync(CompletableFuture<?> future, RunnableWithException runnable) 
{
+               return runAfterwardsAsync(future, runnable, 
ForkJoinPool.commonPool());
+       }
+
+       /**
+        * Run the given action after the completion of the given future. The 
given future can be
+        * completed normally or exceptionally. In case of an exceptional 
completion the, the
+        * action's exception will be added to the initial exception.
+        *
+        * @param future to wait for its completion
+        * @param runnable action which is triggered after the future's 
completion
         * @param executor to run the given action
         * @return Future which is completed after the action has completed. 
This future can contain an exception,
         * if an error occurred in the given future or action.

http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 584fcae..ccb3ae4 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -454,7 +454,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                        Throwable exception = null;
 
                        if (webMonitorEndpoint != null) {
-                               webMonitorEndpoint.shutdown(Time.seconds(10L));
+                               webMonitorEndpoint.shutDownAsync().get();
                        }
 
                        if (dispatcherLeaderRetrievalService != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index dbb25a7..bade160 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.RouterHandler;
@@ -45,6 +46,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFact
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.IOException;
@@ -52,6 +54,7 @@ import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -73,11 +76,13 @@ public abstract class RestServerEndpoint {
        private final SSLEngine sslEngine;
        private final Path uploadDir;
 
+       private final CompletableFuture<Void> terminationFuture;
+
        private ServerBootstrap bootstrap;
        private Channel serverChannel;
        private String restAddress;
 
-       private volatile boolean started;
+       private State state = State.CREATED;
 
        public RestServerEndpoint(RestServerEndpointConfiguration 
configuration) throws IOException {
                Preconditions.checkNotNull(configuration);
@@ -88,9 +93,9 @@ public abstract class RestServerEndpoint {
                this.uploadDir = configuration.getUploadDir();
                createUploadDir(uploadDir, log);
 
-               this.restAddress = null;
+               terminationFuture = new CompletableFuture<>();
 
-               this.started = false;
+               this.restAddress = null;
        }
 
        /**
@@ -107,12 +112,9 @@ public abstract class RestServerEndpoint {
         *
         * @throws Exception if we cannot start the RestServerEndpoint
         */
-       public void start() throws Exception {
+       public final void start() throws Exception {
                synchronized (lock) {
-                       if (started) {
-                               // RestServerEndpoint already started
-                               return;
-                       }
+                       Preconditions.checkState(state == State.CREATED, "The 
RestServerEndpoint cannot be restarted.");
 
                        log.info("Starting rest endpoint.");
 
@@ -192,28 +194,40 @@ public abstract class RestServerEndpoint {
 
                        restAddressFuture.complete(restAddress);
 
-                       started = true;
+                       state = State.RUNNING;
+
+                       startInternal();
                }
        }
 
        /**
+        * Hook to start sub class specific services.
+        *
+        * @throws Exception if an error occurred
+        */
+       protected abstract void startInternal() throws Exception;
+
+       /**
         * Returns the address on which this endpoint is accepting requests.
         *
-        * @return address on which this endpoint is accepting requests
+        * @return address on which this endpoint is accepting requests or null 
if none
         */
+       @Nullable
        public InetSocketAddress getServerAddress() {
-               Preconditions.checkState(started, "The RestServerEndpoint has 
not been started yet.");
-               Channel server = this.serverChannel;
-
-               if (server != null) {
-                       try {
-                               return ((InetSocketAddress) 
server.localAddress());
-                       } catch (Exception e) {
-                               log.error("Cannot access local server address", 
e);
+               synchronized (lock) {
+                       Preconditions.checkState(state != State.CREATED, "The 
RestServerEndpoint has not been started yet.");
+                       Channel server = this.serverChannel;
+
+                       if (server != null) {
+                               try {
+                                       return ((InetSocketAddress) 
server.localAddress());
+                               } catch (Exception e) {
+                                       log.error("Cannot access local server 
address", e);
+                               }
                        }
-               }
 
-               return null;
+                       return null;
+               }
        }
 
        /**
@@ -222,26 +236,49 @@ public abstract class RestServerEndpoint {
         * @return REST address of this endpoint
         */
        public String getRestAddress() {
-               Preconditions.checkState(started, "The RestServerEndpoint has 
not been started yet.");
-               return restAddress;
+               synchronized (lock) {
+                       Preconditions.checkState(state != State.CREATED, "The 
RestServerEndpoint has not been started yet.");
+                       return restAddress;
+               }
+       }
+
+       public final CompletableFuture<Void> shutDownAsync() {
+               synchronized (lock) {
+                       log.info("Shutting down rest endpoint.");
+
+                       if (state == State.RUNNING) {
+                               final CompletableFuture<Void> shutDownFuture = 
shutDownInternal();
+
+                               shutDownFuture.whenComplete(
+                                       (Void ignored, Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       
terminationFuture.completeExceptionally(throwable);
+                                               } else {
+                                                       
terminationFuture.complete(null);
+                                               }
+                                       });
+                               state = State.SHUTDOWN;
+                       } else if (state == State.CREATED) {
+                               terminationFuture.complete(null);
+                               state = State.SHUTDOWN;
+                       }
+
+                       return terminationFuture;
+               }
        }
 
        /**
         * Stops this REST server endpoint.
+        *
+        * @return Future which is completed once the shut down has been 
finished.
         */
-       public void shutdown(Time timeout) {
+       protected CompletableFuture<Void> shutDownInternal() {
 
                synchronized (lock) {
-                       if (!started) {
-                               // RestServerEndpoint has not been started
-                               return;
-                       }
-
-                       log.info("Shutting down rest endpoint.");
 
                        CompletableFuture<?> channelFuture = new 
CompletableFuture<>();
-                       if (this.serverChannel != null) {
-                               this.serverChannel.close().addListener(finished 
-> {
+                       if (serverChannel != null) {
+                               serverChannel.close().addListener(finished -> {
                                        if (finished.isSuccess()) {
                                                channelFuture.complete(null);
                                        } else {
@@ -252,11 +289,12 @@ public abstract class RestServerEndpoint {
                        }
                        CompletableFuture<?> groupFuture = new 
CompletableFuture<>();
                        CompletableFuture<?> childGroupFuture = new 
CompletableFuture<>();
+                       final Time gracePeriod = Time.seconds(10L);
 
                        channelFuture.thenRun(() -> {
                                if (bootstrap != null) {
                                        if (bootstrap.group() != null) {
-                                               
bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS)
+                                               
bootstrap.group().shutdownGracefully(0L, gracePeriod.toMilliseconds(), 
TimeUnit.MILLISECONDS)
                                                        .addListener(finished 
-> {
                                                                if 
(finished.isSuccess()) {
                                                                        
groupFuture.complete(null);
@@ -266,7 +304,7 @@ public abstract class RestServerEndpoint {
                                                        });
                                        }
                                        if (bootstrap.childGroup() != null) {
-                                               
bootstrap.childGroup().shutdownGracefully(0L, timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS)
+                                               
bootstrap.childGroup().shutdownGracefully(0L, gracePeriod.toMilliseconds(), 
TimeUnit.MILLISECONDS)
                                                        .addListener(finished 
-> {
                                                                if 
(finished.isSuccess()) {
                                                                        
childGroupFuture.complete(null);
@@ -283,22 +321,15 @@ public abstract class RestServerEndpoint {
                                }
                        });
 
-                       try {
-                               CompletableFuture.allOf(groupFuture, 
childGroupFuture).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-                               log.info("Rest endpoint shutdown complete.");
-                       } catch (Exception e) {
-                               log.warn("Rest endpoint shutdown failed.", e);
-                       }
-
-                       restAddress = null;
-                       started = false;
+                       final CompletableFuture<Void> channelTerminationFuture 
= FutureUtils.completeAll(
+                               Arrays.asList(groupFuture, childGroupFuture));
 
-                       try {
-                               log.info("Cleaning upload directory {}", 
uploadDir);
-                               FileUtils.cleanDirectory(uploadDir.toFile());
-                       } catch (IOException e) {
-                               log.warn("Error while cleaning upload directory 
{}", uploadDir, e);
-                       }
+                       return FutureUtils.runAfterwards(
+                               channelTerminationFuture,
+                               () -> {
+                                       log.info("Cleaning upload directory 
{}", uploadDir);
+                                       
FileUtils.cleanDirectory(uploadDir.toFile());
+                               });
                }
        }
 
@@ -433,4 +464,10 @@ public abstract class RestServerEndpoint {
                        }
                }
        }
+
+       private enum State {
+               CREATED,
+               RUNNING,
+               SHUTDOWN
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 10ad344..427332f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -113,6 +114,7 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -607,31 +609,39 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
        }
 
        @Override
-       public void start() throws Exception {
-               super.start();
+       public void startInternal() throws Exception {
                leaderElectionService.start(this);
        }
 
        @Override
-       public void shutdown(Time timeout) {
+       protected CompletableFuture<Void> shutDownInternal() {
                executionGraphCache.close();
 
-               final File tmpDir = restConfiguration.getTmpDir();
-
-               try {
-                       log.info("Removing cache directory {}", tmpDir);
-                       FileUtils.deleteDirectory(tmpDir);
-               } catch (Throwable t) {
-                       log.warn("Error while deleting cache directory {}", 
tmpDir, t);
-               }
+               final CompletableFuture<Void> shutdownFuture = 
super.shutDownInternal();
 
-               try {
-                       leaderElectionService.stop();
-               } catch (Exception e) {
-                       log.warn("Error while stopping leaderElectionService", 
e);
-               }
+               final File tmpDir = restConfiguration.getTmpDir();
 
-               super.shutdown(timeout);
+               return FutureUtils.runAfterwardsAsync(
+                       shutdownFuture,
+                       () -> {
+                               Exception exception = null;
+                               try {
+                                       log.info("Removing cache directory {}", 
tmpDir);
+                                       FileUtils.deleteDirectory(tmpDir);
+                               } catch (Exception e) {
+                                       exception = e;
+                               }
+
+                               try {
+                                       leaderElectionService.stop();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                               }
+
+                               if (exception != null) {
+                                       throw exception;
+                               }
+                       });
        }
 
        
//-------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 50b26e3..3ad7ee5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -136,14 +136,14 @@ public class RestServerEndpointITCase extends TestLogger {
        }
 
        @After
-       public void teardown() {
+       public void teardown() throws Exception {
                if (restClient != null) {
                        restClient.shutdown(timeout);
                        restClient = null;
                }
 
                if (serverEndpoint != null) {
-                       serverEndpoint.shutdown(timeout);
+                       serverEndpoint.shutDownAsync().get();
                        serverEndpoint = null;
                }
        }
@@ -316,6 +316,9 @@ public class RestServerEndpointITCase extends TestLogger {
                                Tuple2.of(new TestHeaders(), testHandler),
                                Tuple2.of(TestUploadHeaders.INSTANCE, 
testUploadHandler));
                }
+
+               @Override
+               protected void startInternal() throws Exception {}
        }
 
        private static class TestHandler extends 
AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {

Reply via email to