rkhachatryan commented on a change in pull request #12746:
URL: https://github.com/apache/flink/pull/12746#discussion_r444198242



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -40,83 +45,91 @@
  * instances.
  */
 class PartitionRequestClientFactory {
+       private static final Logger LOG = 
LoggerFactory.getLogger(PartitionRequestClientFactory.class);
 
        private final NettyClient nettyClient;
 
-       private final ConcurrentMap<ConnectionID, Object> clients = new 
ConcurrentHashMap<ConnectionID, Object>();
+       private final int retryNumber;
+
+       private final ConcurrentMap<ConnectionID, 
CompletableFuture<NettyPartitionRequestClient>> clients = new 
ConcurrentHashMap<>();
 
        PartitionRequestClientFactory(NettyClient nettyClient) {
+               this(nettyClient, 0);
+       }
+
+       PartitionRequestClientFactory(NettyClient nettyClient, int retryNumber) 
{
                this.nettyClient = nettyClient;
+               this.retryNumber = retryNumber;
        }
 
        /**
         * Atomically establishes a TCP connection to the given remote address 
and
         * creates a {@link NettyPartitionRequestClient} instance for this 
connection.
         */
        NettyPartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId) throws IOException, InterruptedException {
-               Object entry;
-               NettyPartitionRequestClient client = null;
-
-               while (client == null) {
-                       entry = clients.get(connectionId);
-
-                       if (entry != null) {
-                               // Existing channel or connecting channel
-                               if (entry instanceof 
NettyPartitionRequestClient) {
-                                       client = (NettyPartitionRequestClient) 
entry;
-                               }
-                               else {
-                                       ConnectingChannel future = 
(ConnectingChannel) entry;
-                                       client = future.waitForChannel();
-
-                                       clients.replace(connectionId, future, 
client);
-                               }
-                       }
-                       else {
-                               // No channel yet. Create one, but watch out 
for a race.
-                               // We create a "connecting future" and 
atomically add it to the map.
-                               // Only the thread that really added it 
establishes the channel.
-                               // The others need to wait on that original 
establisher's future.
-                               ConnectingChannel connectingChannel = new 
ConnectingChannel(connectionId, this);
-                               Object old = clients.putIfAbsent(connectionId, 
connectingChannel);
-
-                               if (old == null) {
-                                       
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
-
-                                       client = 
connectingChannel.waitForChannel();
-
-                                       clients.replace(connectionId, 
connectingChannel, client);
-                               }
-                               else if (old instanceof ConnectingChannel) {
-                                       client = ((ConnectingChannel) 
old).waitForChannel();
-
-                                       clients.replace(connectionId, old, 
client);
-                               }
-                               else {
-                                       client = (NettyPartitionRequestClient) 
old;
-                               }
+               while (true) {
+                       
AtomicReference<Optional<CompletableFuture<NettyPartitionRequestClient>>> 
computeFuture = new AtomicReference<>(Optional.empty());
+                       CompletableFuture<NettyPartitionRequestClient> 
waitFuture = clients.computeIfAbsent(connectionId, unused -> {
+                               CompletableFuture<NettyPartitionRequestClient> 
future = new CompletableFuture<>();
+                               computeFuture.set(Optional.of(future));
+                               return future;
+                       });
+                       computeFuture.get().ifPresent(future -> 
future.complete(connectWithRetries(connectionId)));

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to