This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 895ecfb [FLINK-20615] Clean PartitionRequestClientFactory up if
createPartitionRequestClient fails
895ecfb is described below
commit 895ecfb7fb21bc11b91e99df14596032667272b8
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Dec 30 18:02:04 2020 +0100
[FLINK-20615] Clean PartitionRequestClientFactory up if
createPartitionRequestClient fails
In order to maintain the invariant that a failed
createPartitionRequestClient won't leave
a failed connection future in the clients field, this commit removes the
client future
if its creation fails.
This closes #14528.
---
.../netty/PartitionRequestClientFactory.java | 61 ++++++++++++----------
.../netty/PartitionRequestClientFactoryTest.java | 25 +++++++++
2 files changed, 57 insertions(+), 29 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 5e98894..591e972 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -23,6 +23,7 @@ import
org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
@@ -31,11 +32,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Factory for {@link NettyPartitionRequestClient} instances.
@@ -69,36 +68,34 @@ class PartitionRequestClientFactory {
NettyPartitionRequestClient createPartitionRequestClient(ConnectionID
connectionId)
throws IOException, InterruptedException {
while (true) {
- AtomicBoolean isTheFirstOne = new AtomicBoolean(false);
- CompletableFuture<NettyPartitionRequestClient> clientFuture =
- clients.computeIfAbsent(
- connectionId,
- unused -> {
- isTheFirstOne.set(true);
- return new CompletableFuture<>();
- });
- if (isTheFirstOne.get()) {
+ final CompletableFuture<NettyPartitionRequestClient>
newClientFuture =
+ new CompletableFuture<>();
+
+ final CompletableFuture<NettyPartitionRequestClient> clientFuture =
+ clients.putIfAbsent(connectionId, newClientFuture);
+
+ final NettyPartitionRequestClient client;
+
+ if (clientFuture == null) {
try {
- clientFuture.complete(connectWithRetries(connectionId));
- } catch (InterruptedException e) {
- clientFuture.complete(null); // let others waiting know
that they should retry
+ client = connectWithRetries(connectionId);
+ } catch (Throwable e) {
+ newClientFuture.completeExceptionally(
+ new IOException("Could not create Netty client.",
e));
+ clients.remove(connectionId, newClientFuture);
throw e;
- } catch (Exception e) {
- clientFuture.completeExceptionally(e);
}
- }
- final NettyPartitionRequestClient client;
- try {
- client = clientFuture.get();
- if (client == null) {
- // computation failed in another thread - cleanup the map
and restart the loop
- clients.remove(connectionId, clientFuture);
- continue;
+ newClientFuture.complete(client);
+ } else {
+ try {
+ client = clientFuture.get();
+ } catch (ExecutionException e) {
+
ExceptionUtils.rethrowIOException(ExceptionUtils.stripExecutionException(e));
+ return null;
}
- } catch (ExecutionException e) {
- throw new IOException(e);
}
+
// Make sure to increment the reference count before handing a
client
// out to ensure correct bookkeeping for channel closing.
if (client.incrementReferenceCounter()) {
@@ -110,16 +107,22 @@ class PartitionRequestClientFactory {
}
private NettyPartitionRequestClient connectWithRetries(ConnectionID
connectionId)
- throws InterruptedException {
+ throws InterruptedException, RemoteTransportException {
int tried = 0;
while (true) {
try {
return connect(connectionId);
} catch (RemoteTransportException e) {
tried++;
- LOG.error("Failed {} times to connect to {}", tried,
connectionId.getAddress(), e);
if (tried > retryNumber) {
- throw new CompletionException(e);
+ LOG.warn("Failed to connect to {}. Giving up.",
connectionId.getAddress(), e);
+ throw e;
+ } else {
+ LOG.warn(
+ "Failed {} times to connect to {}. Retrying.",
+ tried,
+ connectionId.getAddress(),
+ e);
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 3abbe2c..595b3e5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
@@ -98,6 +99,30 @@ public class PartitionRequestClientFactoryTest {
}
@Test
+ public void testExceptionsAreNotCached() throws Exception {
+ NettyTestUtil.NettyServerAndClient nettyServerAndClient =
createNettyServerAndClient();
+
+ try {
+ final PartitionRequestClientFactory factory =
+ new PartitionRequestClientFactory(
+ new
UnstableNettyClient(nettyServerAndClient.client(), 1), 0);
+
+ final ConnectionID connectionID =
nettyServerAndClient.getConnectionID(0);
+ try {
+ factory.createPartitionRequestClient(connectionID);
+ fail("Expected the first request to fail.");
+ } catch (RemoteTransportException expected) {
+ // expected
+ }
+
+ factory.createPartitionRequestClient(connectionID);
+ } finally {
+ nettyServerAndClient.client().shutdown();
+ nettyServerAndClient.server().shutdown();
+ }
+ }
+
+ @Test
public void testNettyClientConnectRetry() throws Exception {
NettyTestUtil.NettyServerAndClient serverAndClient =
createNettyServerAndClient();
UnstableNettyClient unstableNettyClient =