This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 895ecfb  [FLINK-20615] Clean PartitionRequestClientFactory up if 
createPartitionRequestClient fails
895ecfb is described below

commit 895ecfb7fb21bc11b91e99df14596032667272b8
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Dec 30 18:02:04 2020 +0100

    [FLINK-20615] Clean PartitionRequestClientFactory up if 
createPartitionRequestClient fails
    
    In order to maintain the invariant that a failed 
createPartitionRequestClient won't leave
    a failed connection future in the clients field, this commit removes the 
client future
    if its creation fails.
    
    This closes #14528.
---
 .../netty/PartitionRequestClientFactory.java       | 61 ++++++++++++----------
 .../netty/PartitionRequestClientFactoryTest.java   | 25 +++++++++
 2 files changed, 57 insertions(+), 29 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 5e98894..591e972 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 
@@ -31,11 +32,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Factory for {@link NettyPartitionRequestClient} instances.
@@ -69,36 +68,34 @@ class PartitionRequestClientFactory {
     NettyPartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId)
             throws IOException, InterruptedException {
         while (true) {
-            AtomicBoolean isTheFirstOne = new AtomicBoolean(false);
-            CompletableFuture<NettyPartitionRequestClient> clientFuture =
-                    clients.computeIfAbsent(
-                            connectionId,
-                            unused -> {
-                                isTheFirstOne.set(true);
-                                return new CompletableFuture<>();
-                            });
-            if (isTheFirstOne.get()) {
+            final CompletableFuture<NettyPartitionRequestClient> 
newClientFuture =
+                    new CompletableFuture<>();
+
+            final CompletableFuture<NettyPartitionRequestClient> clientFuture =
+                    clients.putIfAbsent(connectionId, newClientFuture);
+
+            final NettyPartitionRequestClient client;
+
+            if (clientFuture == null) {
                 try {
-                    clientFuture.complete(connectWithRetries(connectionId));
-                } catch (InterruptedException e) {
-                    clientFuture.complete(null); // let others waiting know 
that they should retry
+                    client = connectWithRetries(connectionId);
+                } catch (Throwable e) {
+                    newClientFuture.completeExceptionally(
+                            new IOException("Could not create Netty client.", 
e));
+                    clients.remove(connectionId, newClientFuture);
                     throw e;
-                } catch (Exception e) {
-                    clientFuture.completeExceptionally(e);
                 }
-            }
 
-            final NettyPartitionRequestClient client;
-            try {
-                client = clientFuture.get();
-                if (client == null) {
-                    // computation failed in another thread - cleanup the map 
and restart the loop
-                    clients.remove(connectionId, clientFuture);
-                    continue;
+                newClientFuture.complete(client);
+            } else {
+                try {
+                    client = clientFuture.get();
+                } catch (ExecutionException e) {
+                    
ExceptionUtils.rethrowIOException(ExceptionUtils.stripExecutionException(e));
+                    return null;
                 }
-            } catch (ExecutionException e) {
-                throw new IOException(e);
             }
+
             // Make sure to increment the reference count before handing a 
client
             // out to ensure correct bookkeeping for channel closing.
             if (client.incrementReferenceCounter()) {
@@ -110,16 +107,22 @@ class PartitionRequestClientFactory {
     }
 
     private NettyPartitionRequestClient connectWithRetries(ConnectionID 
connectionId)
-            throws InterruptedException {
+            throws InterruptedException, RemoteTransportException {
         int tried = 0;
         while (true) {
             try {
                 return connect(connectionId);
             } catch (RemoteTransportException e) {
                 tried++;
-                LOG.error("Failed {} times to connect to {}", tried, 
connectionId.getAddress(), e);
                 if (tried > retryNumber) {
-                    throw new CompletionException(e);
+                    LOG.warn("Failed to connect to {}. Giving up.", 
connectionId.getAddress(), e);
+                    throw e;
+                } else {
+                    LOG.warn(
+                            "Failed {} times to connect to {}. Retrying.",
+                            tried,
+                            connectionId.getAddress(),
+                            e);
                 }
             }
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 3abbe2c..595b3e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
@@ -98,6 +99,30 @@ public class PartitionRequestClientFactoryTest {
     }
 
     @Test
+    public void testExceptionsAreNotCached() throws Exception {
+        NettyTestUtil.NettyServerAndClient nettyServerAndClient = 
createNettyServerAndClient();
+
+        try {
+            final PartitionRequestClientFactory factory =
+                    new PartitionRequestClientFactory(
+                            new 
UnstableNettyClient(nettyServerAndClient.client(), 1), 0);
+
+            final ConnectionID connectionID = 
nettyServerAndClient.getConnectionID(0);
+            try {
+                factory.createPartitionRequestClient(connectionID);
+                fail("Expected the first request to fail.");
+            } catch (RemoteTransportException expected) {
+                // expected
+            }
+
+            factory.createPartitionRequestClient(connectionID);
+        } finally {
+            nettyServerAndClient.client().shutdown();
+            nettyServerAndClient.server().shutdown();
+        }
+    }
+
+    @Test
     public void testNettyClientConnectRetry() throws Exception {
         NettyTestUtil.NettyServerAndClient serverAndClient = 
createNettyServerAndClient();
         UnstableNettyClient unstableNettyClient =

Reply via email to