pnowojski commented on a change in pull request #11541:
URL: https://github.com/apache/flink/pull/11541#discussion_r413895991
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
##########
@@ -130,6 +134,56 @@ public void run() {
}
}
+ @Test
+ public void testNettyClientConnectRetry() throws Exception {
+ NettyTestUtil.NettyServerAndClient serverAndClient =
NettyTestUtil.initServerAndClient(
+ new NettyProtocol(null, null) {
+
+ @Override
+ public ChannelHandler[]
getServerChannelHandlers () {
+ return new ChannelHandler[0];
+ }
+
+ @Override
+ public ChannelHandler[]
getClientChannelHandlers () {
+ return new
ChannelHandler[]{mock(NetworkClientHandler.class)};
+ }
Review comment:
Could you just reuse existing
`org.apache.flink.runtime.io.network.netty.NettyClientServerSslTest.NoOpProtocol`?
Make it public and use it here to avoid code duplication?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -81,16 +92,12 @@ NettyPartitionRequestClient
createPartitionRequestClient(ConnectionID connection
Object old = clients.putIfAbsent(connectionId,
connectingChannel);
if (old == null) {
Review comment:
I think I agree with @zhijiangW. I don't get the heaviness argument.
Difference between 256 * 256 lock acquisitions vs current ConcurrentHashMap
accesses (which are not for free) wouldn't be probably measurable. (256 * 256
locks could be acquired in under 0.01s on a single machine, and you are
assuming 256 task managers, so we should be fine even with 100 000s
connections).
But we could easily block other users of this class, and calls like
`destroyPartitionRequestClient` or `closeOpenChannelConnections` do not sound
like should be blocked waiting for lock acquisition during
connections/reconnections
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -131,6 +138,43 @@ void destroyPartitionRequestClient(ConnectionID
connectionId, PartitionRequestCl
clients.remove(connectionId, client);
}
+ private void connectChannelWithRetry(ConnectingChannel
connectingChannel,
+
ConnectionID connectionId, boolean needConnect)
+ throws IOException, InterruptedException {
+ int count = 0;
+ Exception exception = null;
+ do {
+ try {
+ if (needConnect) {
+ LOG.info("Connecting to {} at {}
attempt", connectionId.getAddress(), count);
+
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
+ }
+
+ NettyPartitionRequestClient client =
connectingChannel.waitForChannel();
+
+ clients.replace(connectionId,
connectingChannel, client);
+ return;
+ } catch (IOException | ChannelException e) {
+ LOG.error("Failed {} times to connect to {}",
count, connectionId.getAddress(), e);
+ ConnectingChannel newConnectingChannel = new
ConnectingChannel(connectionId, this);
+ clients.replace(connectionId,
connectingChannel, newConnectingChannel);
+ Object old = clients.get(connectionId);
+ if (old instanceof ConnectingChannel) {
+ connectingChannel = (ConnectingChannel)
old;
+ } else {
+ return;
+ }
+
+ exception = e;
+ }
+ count++;
+ } while (count <= retryNumber);
Review comment:
The interaction here between multiple requesters is getting really
complicated. Especially this part in the catch block seems strange. I think it
might be working, but I'm not sure. It's quite hard to reason about and it's
missing test coverage.
On the other hand following @zhijiangW 's suggestion with using plain
`HashMap` with manually synchronisation and just blocking subsequent threads
from entering until reconnection loop completes sounds much simpler, but there
is that issue of how to implement other methods of this class.
All in all, I think I would be +1 for going as it is, given that we would
have also another test case, similar to the existing
`testNettyClientConnectRetry`, but which spawns couple of threads trying to do
the same thing:
```
factory.createPartitionRequestClient(serverAddress);
```
?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -81,16 +92,12 @@ NettyPartitionRequestClient
createPartitionRequestClient(ConnectionID connection
Object old = clients.putIfAbsent(connectionId,
connectingChannel);
if (old == null) {
-
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
-
- client =
connectingChannel.waitForChannel();
-
- clients.replace(connectionId,
connectingChannel, client);
+
connectChannelWithRetry(connectingChannel, connectionId, true);
+ client = (NettyPartitionRequestClient)
clients.get(connectionId);
}
else if (old instanceof ConnectingChannel) {
- client = ((ConnectingChannel)
old).waitForChannel();
-
- clients.replace(connectionId, old,
client);
+
connectChannelWithRetry((ConnectingChannel) old, connectionId, false);
+ client = (NettyPartitionRequestClient)
clients.get(connectionId);
Review comment:
nit: can not `connectChannelWithRetry` return new `client` value?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]