This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 91639923e2a8f4dde271b90d3917b1b2e8d6878a Author: Till Rohrmann <[email protected]> AuthorDate: Wed Sep 26 09:33:05 2018 +0200 [FLINK-10415] Fail response future if connection closes in RestClient If the RestClient detects that a connection was closed (channel became inactive), then it now fails the json response future with a ConnectionClosedException. --- .../runtime/rest/ConnectionClosedException.java | 41 ++++++++ .../org/apache/flink/runtime/rest/RestClient.java | 6 ++ .../apache/flink/runtime/rest/RestClientTest.java | 107 ++++++++++++++++++++- 3 files changed, 153 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java new file mode 100644 index 0000000..b294f49 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import java.io.IOException; + +/** + * Exception which is thrown if the {@link RestClient} detects that a connection + * was closed. + */ +public class ConnectionClosedException extends IOException { + private static final long serialVersionUID = 3802002501688542472L; + + public ConnectionClosedException(String message) { + super(message); + } + + public ConnectionClosedException(String message, Throwable cause) { + super(message, cause); + } + + public ConnectionClosedException(Throwable cause) { + super(cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index c6ebd35..16e4c98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -418,6 +418,12 @@ public class RestClient implements AutoCloseableAsync { } @Override + public void channelInactive(ChannelHandlerContext ctx) { + jsonFuture.completeExceptionally(new ConnectionClosedException("Channel became inactive.")); + ctx.close(); + } + + @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { if (cause instanceof TooLongFrameException) { jsonFuture.completeExceptionally(new TooLongFrameException(String.format( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java index d3d895a..8650929 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java @@ -25,16 +25,24 @@ import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.junit.Test; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; @@ -45,11 +53,14 @@ import static org.junit.Assert.assertThat; */ public class RestClientTest extends TestLogger { + private static final String unroutableIp = "10.255.255.1"; + + private static final long TIMEOUT = 10L; + @Test public void testConnectionTimeout() throws Exception { final Configuration config = new Configuration(); config.setLong(RestOptions.CONNECTION_TIMEOUT, 1); - final String unroutableIp = "10.255.255.1"; try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor())) { restClient.sendRequest( unroutableIp, @@ -65,6 +76,100 @@ public class RestClientTest extends TestLogger { } } + /** + * Tests that we fail the operation if the remote connection closes. + */ + @Test + public void testConnectionClosedHandling() throws Exception { + try (final ServerSocket serverSocket = new ServerSocket(0); + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) { + + final String targetAddress = "localhost"; + final int targetPort = serverSocket.getLocalPort(); + + // start server + final CompletableFuture<Socket> socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept)); + + final CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest( + targetAddress, + targetPort, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList()); + + Socket connectionSocket = null; + + try { + connectionSocket = socketCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS); + } catch (TimeoutException ignored) { + // could not establish a server connection --> see that the response failed + socketCompletableFuture.cancel(true); + } + + if (connectionSocket != null) { + // close connection + connectionSocket.close(); + } + + try { + responseFuture.get(); + } catch (ExecutionException ee) { + if (!ExceptionUtils.findThrowable(ee, IOException.class).isPresent()) { + throw ee; + } + } + } + } + + /** + * Tests that we fail the operation if the client closes. + */ + @Test + public void testRestClientClosedHandling() throws Exception { + + Socket connectionSocket = null; + + try (final ServerSocket serverSocket = new ServerSocket(0); + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) { + + final String targetAddress = "localhost"; + final int targetPort = serverSocket.getLocalPort(); + + // start server + final CompletableFuture<Socket> socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept)); + + final CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest( + targetAddress, + targetPort, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList()); + + try { + connectionSocket = socketCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS); + } catch (TimeoutException ignored) { + // could not establish a server connection --> see that the response failed + socketCompletableFuture.cancel(true); + } + + restClient.close(); + + try { + responseFuture.get(); + } catch (ExecutionException ee) { + if (!ExceptionUtils.findThrowable(ee, IOException.class).isPresent()) { + throw ee; + } + } + } finally { + if (connectionSocket != null) { + connectionSocket.close(); + } + } + } + private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> { @Override
