This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5a6f65cfc69dcc647f1c74c89900905e2426b811 Author: Till Rohrmann <[email protected]> AuthorDate: Wed Sep 26 09:12:06 2018 +0200 [hotfix] Make RestClient AutoCloseableAsync --- .../org/apache/flink/runtime/rest/RestClient.java | 51 +++++++++++++++------- .../apache/flink/runtime/rest/RestClientTest.java | 8 +--- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index a855749..3aa93bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.util.RestConstants; import org.apache.flink.runtime.rest.util.RestMapperUtils; import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -86,6 +87,7 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE; @@ -93,7 +95,7 @@ import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRes /** * This client is the counter-part to the {@link RestServerEndpoint}. */ -public class RestClient { +public class RestClient implements AutoCloseableAsync { private static final Logger LOG = LoggerFactory.getLogger(RestClient.class); private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); @@ -103,9 +105,14 @@ public class RestClient { private final Bootstrap bootstrap; + private final CompletableFuture<Void> terminationFuture; + + private final AtomicBoolean isRunning = new AtomicBoolean(true); + public RestClient(RestClientConfiguration configuration, Executor executor) { Preconditions.checkNotNull(configuration); this.executor = Preconditions.checkNotNull(executor); + this.terminationFuture = new CompletableFuture<>(); final SSLEngineFactory sslEngineFactory = configuration.getSslEngineFactory(); ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @@ -135,30 +142,42 @@ public class RestClient { LOG.info("Rest client endpoint started."); } + @Override + public CompletableFuture<Void> closeAsync() { + return shutdownInternally(Time.seconds(10L)); + } + public void shutdown(Time timeout) { - LOG.info("Shutting down rest endpoint."); - CompletableFuture<?> groupFuture = new CompletableFuture<>(); - if (bootstrap != null) { - if (bootstrap.group() != null) { - bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) - .addListener(finished -> { - if (finished.isSuccess()) { - groupFuture.complete(null); - } else { - groupFuture.completeExceptionally(finished.cause()); - } - }); - } - } + final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout); try { - groupFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); LOG.info("Rest endpoint shutdown complete."); } catch (Exception e) { LOG.warn("Rest endpoint shutdown failed.", e); } } + private CompletableFuture<Void> shutdownInternally(Time timeout) { + if (isRunning.compareAndSet(true, false)) { + LOG.info("Shutting down rest endpoint."); + + if (bootstrap != null) { + if (bootstrap.group() != null) { + bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> { + if (finished.isSuccess()) { + terminationFuture.complete(null); + } else { + terminationFuture.completeExceptionally(finished.cause()); + } + }); + } + } + } + return terminationFuture; + } + public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest( String targetAddress, int targetPort, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java index 22cd6f6..11434ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java @@ -55,8 +55,7 @@ public class RestClientTest extends TestLogger { public void testConnectionTimeout() throws Exception { final Configuration config = new Configuration(); config.setLong(RestOptions.CONNECTION_TIMEOUT, 1); - final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor()); - try { + try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor())) { restClient.sendRequest( unroutableIp, 80, @@ -73,9 +72,7 @@ public class RestClientTest extends TestLogger { @Test public void testInvalidVersionRejection() throws Exception { - final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor()); - - try { + try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor())) { CompletableFuture<EmptyResponseBody> invalidVersionResponse = restClient.sendRequest( unroutableIp, 80, @@ -89,7 +86,6 @@ public class RestClientTest extends TestLogger { } catch (IllegalArgumentException e) { // expected } - } private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
