pnowojski commented on a change in pull request #11541:
URL: https://github.com/apache/flink/pull/11541#discussion_r413895991



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
##########
@@ -130,6 +134,56 @@ public void run() {
                }
        }
 
+       @Test
+       public void testNettyClientConnectRetry() throws Exception {
+               NettyTestUtil.NettyServerAndClient serverAndClient = 
NettyTestUtil.initServerAndClient(
+                               new NettyProtocol(null, null) {
+
+                                       @Override
+                                       public ChannelHandler[] 
getServerChannelHandlers () {
+                                               return new ChannelHandler[0];
+                                       }
+
+                                       @Override
+                                       public ChannelHandler[] 
getClientChannelHandlers () {
+                                               return new 
ChannelHandler[]{mock(NetworkClientHandler.class)};
+                                       }

Review comment:
       Could you just reuse existing 
`org.apache.flink.runtime.io.network.netty.NettyClientServerSslTest.NoOpProtocol`?
 Make it public and use it here to avoid code duplication?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -81,16 +92,12 @@ NettyPartitionRequestClient 
createPartitionRequestClient(ConnectionID connection
                                Object old = clients.putIfAbsent(connectionId, 
connectingChannel);
 
                                if (old == null) {

Review comment:
       I think I agree with @zhijiangW. I don't get the heaviness argument. 
Difference between 256 * 256 lock acquisitions vs current ConcurrentHashMap 
accesses  (which are not for free) wouldn't be probably measurable. (256 * 256 
locks could be acquired in under 0.01s on a single machine, and you are 
assuming 256 task managers, so we should be fine even with 100 000s 
connections).
   
   But we could easily block other users of this class, and calls like 
`destroyPartitionRequestClient` or `closeOpenChannelConnections` do not sound 
like should be blocked waiting for lock acquisition during 
connections/reconnections

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -131,6 +138,43 @@ void destroyPartitionRequestClient(ConnectionID 
connectionId, PartitionRequestCl
                clients.remove(connectionId, client);
        }
 
+       private void connectChannelWithRetry(ConnectingChannel 
connectingChannel,
+                                                                               
 ConnectionID connectionId, boolean needConnect)
+               throws IOException, InterruptedException {
+               int count = 0;
+               Exception exception = null;
+               do {
+                       try {
+                               if (needConnect) {
+                                       LOG.info("Connecting to {} at {} 
attempt", connectionId.getAddress(), count);
+                                       
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
+                               }
+
+                               NettyPartitionRequestClient client = 
connectingChannel.waitForChannel();
+
+                               clients.replace(connectionId, 
connectingChannel, client);
+                               return;
+                       } catch (IOException | ChannelException e) {
+                               LOG.error("Failed {} times to connect to {}", 
count, connectionId.getAddress(), e);
+                               ConnectingChannel newConnectingChannel = new 
ConnectingChannel(connectionId, this);
+                               clients.replace(connectionId, 
connectingChannel, newConnectingChannel);
+                               Object old = clients.get(connectionId);
+                               if (old instanceof ConnectingChannel) {
+                                       connectingChannel = (ConnectingChannel) 
old;
+                               } else {
+                                       return;
+                               }
+
+                               exception = e;
+                       }
+                       count++;
+               } while (count <= retryNumber);

Review comment:
       The interaction here between multiple requesters is getting really 
complicated. Especially this part in the catch block seems strange. I think it 
might be working, but I'm not sure. It's quite hard to reason about and it's 
missing test coverage.
   
   On the other hand following @zhijiangW 's suggestion with using plain 
`HashMap` with manually synchronisation and just blocking subsequent threads 
from entering until reconnection loop completes sounds much simpler, but there 
is that issue of how to implement other methods of this class.
   
   All in all, I think I would be +1 for going as it is, given that we would 
have also another test case, similar to the existing 
`testNettyClientConnectRetry`, but which spawns couple of threads trying to do 
the same thing:
   ```
   factory.createPartitionRequestClient(serverAddress);
   ```
   ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -81,16 +92,12 @@ NettyPartitionRequestClient 
createPartitionRequestClient(ConnectionID connection
                                Object old = clients.putIfAbsent(connectionId, 
connectingChannel);
 
                                if (old == null) {
-                                       
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
-
-                                       client = 
connectingChannel.waitForChannel();
-
-                                       clients.replace(connectionId, 
connectingChannel, client);
+                                       
connectChannelWithRetry(connectingChannel, connectionId, true);
+                                       client = (NettyPartitionRequestClient) 
clients.get(connectionId);
                                }
                                else if (old instanceof ConnectingChannel) {
-                                       client = ((ConnectingChannel) 
old).waitForChannel();
-
-                                       clients.replace(connectionId, old, 
client);
+                                       
connectChannelWithRetry((ConnectingChannel) old, connectionId, false);
+                                       client = (NettyPartitionRequestClient) 
clients.get(connectionId);

Review comment:
       nit: can not `connectChannelWithRetry` return new `client` value?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to