This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5a6f65cfc69dcc647f1c74c89900905e2426b811
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Sep 26 09:12:06 2018 +0200

    [hotfix] Make RestClient AutoCloseableAsync
---
 .../org/apache/flink/runtime/rest/RestClient.java  | 51 +++++++++++++++-------
 .../apache/flink/runtime/rest/RestClientTest.java  |  8 +---
 2 files changed, 37 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index a855749..3aa93bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -86,6 +87,7 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
@@ -93,7 +95,7 @@ import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRes
 /**
  * This client is the counter-part to the {@link RestServerEndpoint}.
  */
-public class RestClient {
+public class RestClient implements AutoCloseableAsync {
        private static final Logger LOG = 
LoggerFactory.getLogger(RestClient.class);
 
        private static final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
@@ -103,9 +105,14 @@ public class RestClient {
 
        private final Bootstrap bootstrap;
 
+       private final CompletableFuture<Void> terminationFuture;
+
+       private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
        public RestClient(RestClientConfiguration configuration, Executor 
executor) {
                Preconditions.checkNotNull(configuration);
                this.executor = Preconditions.checkNotNull(executor);
+               this.terminationFuture = new CompletableFuture<>();
 
                final SSLEngineFactory sslEngineFactory = 
configuration.getSslEngineFactory();
                ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
@@ -135,30 +142,42 @@ public class RestClient {
                LOG.info("Rest client endpoint started.");
        }
 
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               return shutdownInternally(Time.seconds(10L));
+       }
+
        public void shutdown(Time timeout) {
-               LOG.info("Shutting down rest endpoint.");
-               CompletableFuture<?> groupFuture = new CompletableFuture<>();
-               if (bootstrap != null) {
-                       if (bootstrap.group() != null) {
-                               bootstrap.group().shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
-                                       .addListener(finished -> {
-                                               if (finished.isSuccess()) {
-                                                       
groupFuture.complete(null);
-                                               } else {
-                                                       
groupFuture.completeExceptionally(finished.cause());
-                                               }
-                                       });
-                       }
-               }
+               final CompletableFuture<Void> shutDownFuture = 
shutdownInternally(timeout);
 
                try {
-                       groupFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                       shutDownFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
                        LOG.info("Rest endpoint shutdown complete.");
                } catch (Exception e) {
                        LOG.warn("Rest endpoint shutdown failed.", e);
                }
        }
 
+       private CompletableFuture<Void> shutdownInternally(Time timeout) {
+               if (isRunning.compareAndSet(true, false)) {
+                       LOG.info("Shutting down rest endpoint.");
+
+                       if (bootstrap != null) {
+                               if (bootstrap.group() != null) {
+                                       
bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS)
+                                               .addListener(finished -> {
+                                                       if 
(finished.isSuccess()) {
+                                                               
terminationFuture.complete(null);
+                                                       } else {
+                                                               
terminationFuture.completeExceptionally(finished.cause());
+                                                       }
+                                               });
+                               }
+                       }
+               }
+               return terminationFuture;
+       }
+
        public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
                        String targetAddress,
                        int targetPort,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 22cd6f6..11434ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -55,8 +55,7 @@ public class RestClientTest extends TestLogger {
        public void testConnectionTimeout() throws Exception {
                final Configuration config = new Configuration();
                config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
-               final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
Executors.directExecutor());
-               try {
+               try (final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
Executors.directExecutor())) {
                        restClient.sendRequest(
                                unroutableIp,
                                80,
@@ -73,9 +72,7 @@ public class RestClientTest extends TestLogger {
 
        @Test
        public void testInvalidVersionRejection() throws Exception {
-               final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
Executors.directExecutor());
-
-               try {
+               try (final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
Executors.directExecutor())) {
                        CompletableFuture<EmptyResponseBody> 
invalidVersionResponse = restClient.sendRequest(
                                unroutableIp,
                                80,
@@ -89,7 +86,6 @@ public class RestClientTest extends TestLogger {
                } catch (IllegalArgumentException e) {
                        // expected
                }
-
        }
 
        private static class TestMessageHeaders implements 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {

Reply via email to