This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 38127dcd7 [CELEBORN-1673] Support retry create client
38127dcd7 is described below

commit 38127dcd7d9789462c5a6918d84dee8f4bf9af3f
Author: Shuang <[email protected]>
AuthorDate: Thu Oct 31 14:45:48 2024 +0800

    [CELEBORN-1673] Support retry create client
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    Currently, only Flink retries establishing a client when a connection 
problem occurs. This would be beneficial for all other engines to implement as 
well.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #2855 from RexXiong/CELEBORN-1673.
    
    Lead-authored-by: Shuang <[email protected]>
    Co-authored-by: lvshuang.xjs <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 14baec8388d894c591d07edaa6e62fd9dbd993fd)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../flink/network/FlinkTransportClientFactory.java | 42 ++++------------------
 .../flink/readclient/FlinkShuffleClientImpl.java   |  4 +--
 .../network/client/TransportClientFactory.java     | 41 ++++++++++++++++++++-
 .../org/apache/celeborn/common/CelebornConf.scala  |  6 ++++
 .../network/TransportClientFactorySuiteJ.java      | 20 +++++++++++
 .../common/network/sasl/RegistrationSuiteJ.java    | 14 ++++++--
 docs/configuration/network.md                      |  4 +--
 7 files changed, 86 insertions(+), 45 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
index aef4891ca..3cb180b3f 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
@@ -32,57 +32,27 @@ import 
org.apache.celeborn.common.network.client.TransportClient;
 import org.apache.celeborn.common.network.client.TransportClientBootstrap;
 import org.apache.celeborn.common.network.client.TransportClientFactory;
 import org.apache.celeborn.common.util.JavaUtils;
-import org.apache.celeborn.plugin.flink.utils.Utils;
 
 public class FlinkTransportClientFactory extends TransportClientFactory {
 
   public static final Logger logger = 
LoggerFactory.getLogger(FlinkTransportClientFactory.class);
 
   private ConcurrentHashMap<Long, Supplier<ByteBuf>> bufferSuppliers;
-  private final int fetchMaxRetries;
 
   public FlinkTransportClientFactory(
-      TransportContext context, int fetchMaxRetries, 
List<TransportClientBootstrap> bootstraps) {
+      TransportContext context, List<TransportClientBootstrap> bootstraps) {
     super(context, bootstraps);
     bufferSuppliers = JavaUtils.newConcurrentHashMap();
-    this.fetchMaxRetries = fetchMaxRetries;
     this.pooledAllocator = new UnpooledByteBufAllocator(true);
   }
 
   public TransportClient createClientWithRetry(String remoteHost, int 
remotePort)
       throws IOException, InterruptedException {
-    int retryCount = fetchMaxRetries;
-
-    while (retryCount > 0) {
-      try {
-        return createClient(remoteHost, remotePort);
-      } catch (Exception e) {
-        retryCount--;
-        logger.warn(
-            "Retrying ({}/{}) times create client to {}:{}",
-            retryCount,
-            fetchMaxRetries,
-            remoteHost,
-            remotePort,
-            e);
-        if (retryCount == 0) {
-          if (e instanceof InterruptedException || e instanceof IOException) {
-            throw e;
-          } else {
-            Utils.rethrowAsRuntimeException(e);
-          }
-        }
-      }
-    }
-
-    return null;
-  }
-
-  @Override
-  public TransportClient createClient(String remoteHost, int remotePort)
-      throws IOException, InterruptedException {
-    return createClient(
-        remoteHost, remotePort, -1, new 
TransportFrameDecoderWithBufferSupplier(bufferSuppliers));
+    return retryCreateClient(
+        remoteHost,
+        remotePort,
+        -1,
+        () -> new TransportFrameDecoderWithBufferSupplier(bufferSuppliers));
   }
 
   public void registerSupplier(long streamId, Supplier<ByteBuf> supplier) {
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index d9b261edb..c7b7971a7 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -145,9 +145,7 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
 
   private void initializeTransportClientFactory() {
     if (null == flinkTransportClientFactory) {
-      flinkTransportClientFactory =
-          new FlinkTransportClientFactory(
-              context, conf.clientFetchMaxRetriesForEachReplica(), 
createBootstraps());
+      flinkTransportClientFactory = new FlinkTransportClientFactory(context, 
createBootstraps());
     }
   }
 
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 8dd2380fe..7b87be40b 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -26,10 +26,12 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.*;
@@ -92,6 +94,8 @@ public class TransportClientFactory implements Closeable {
   private final Class<? extends Channel> socketChannelClass;
   private EventLoopGroup workerGroup;
   protected ByteBufAllocator pooledAllocator;
+  private final int maxClientConnectRetries;
+  private final int maxClientConnectRetryWaitTimeMs;
 
   public TransportClientFactory(
       TransportContext context, List<TransportClientBootstrap> 
clientBootstraps) {
@@ -114,6 +118,8 @@ public class TransportClientFactory implements Closeable {
     this.pooledAllocator =
         NettyUtils.getPooledByteBufAllocator(
             conf, context.getSource(), false, conf.clientThreads());
+    this.maxClientConnectRetries = conf.maxIORetries();
+    this.maxClientConnectRetryWaitTimeMs = conf.ioRetryWaitTimeMs();
   }
 
   /**
@@ -130,7 +136,40 @@ public class TransportClientFactory implements Closeable {
    */
   public TransportClient createClient(String remoteHost, int remotePort, int 
partitionId)
       throws IOException, InterruptedException {
-    return createClient(remoteHost, remotePort, partitionId, new 
TransportFrameDecoder());
+    return retryCreateClient(remoteHost, remotePort, partitionId, 
TransportFrameDecoder::new);
+  }
+
+  public TransportClient retryCreateClient(
+      String remoteHost,
+      int remotePort,
+      int partitionId,
+      Supplier<ChannelInboundHandlerAdapter> supplier)
+      throws IOException, InterruptedException {
+    int numTries = 0;
+    while (numTries < maxClientConnectRetries) {
+      try {
+        return createClient(remoteHost, remotePort, partitionId, 
supplier.get());
+      } catch (Exception e) {
+        numTries++;
+        logger.warn(
+            "Retry create client, times {}/{} with error: {}",
+            numTries,
+            maxClientConnectRetries,
+            e.getMessage(),
+            e);
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        if (numTries == maxClientConnectRetries) {
+          throw e;
+        }
+
+        Uninterruptibles.sleepUninterruptibly(
+            maxClientConnectRetryWaitTimeMs, TimeUnit.MILLISECONDS);
+      }
+    }
+
+    return null;
   }
 
   public TransportClient createClient(
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 4fd5965a5..9fe6ebb0e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1930,6 +1930,10 @@ object CelebornConf extends Logging {
       .doc(
         "Max number of times we will try IO exceptions (such as connection 
timeouts) per request. " +
           "If set to 0, we will not do any retries. " +
+          s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, 
" +
+          s"it works for shuffle client push and fetch data. " +
+          s"If setting <module> to 
`${TransportModuleConstants.REPLICATE_MODULE}`, " +
+          s"it works for replicate client of worker replicating data to peer 
worker." +
           s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, 
" +
           s"it works for Flink shuffle client push data.")
       .intConf
@@ -1942,6 +1946,8 @@ object CelebornConf extends Logging {
         "Only relevant if maxIORetries > 0. " +
         s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
         s"it works for shuffle client push and fetch data. " +
+        s"If setting <module> to 
`${TransportModuleConstants.REPLICATE_MODULE}`, " +
+        s"it works for replicate client of worker replicating data to peer 
worker." +
         s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
         s"it works for Flink shuffle client push data.")
       .version("0.2.0")
diff --git 
a/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
 
b/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
index fb87d1af3..7ec7190aa 100644
--- 
a/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
+++ 
b/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
@@ -19,14 +19,20 @@ package org.apache.celeborn.common.network;
 
 import static org.apache.celeborn.common.util.JavaUtils.getLocalHost;
 import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.network.client.TransportClient;
@@ -34,6 +40,7 @@ import 
org.apache.celeborn.common.network.client.TransportClientFactory;
 import org.apache.celeborn.common.network.server.BaseMessageHandler;
 import org.apache.celeborn.common.network.server.TransportServer;
 import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.network.util.TransportFrameDecoder;
 import org.apache.celeborn.common.util.JavaUtils;
 import org.apache.celeborn.common.util.ThreadUtils;
 
@@ -240,4 +247,17 @@ public class TransportClientFactorySuiteJ {
       assertNotEquals(exception.getCause(), null);
     }
   }
+
+  @Test
+  public void testRetryCreateClient() throws IOException, InterruptedException 
{
+    TransportClientFactory factory = 
Mockito.spy(context.createClientFactory());
+    TransportClient client = mock(TransportClient.class);
+    Mockito.doThrow(new IOException("xx"))
+        .doReturn(client)
+        .when(factory)
+        .createClient(anyString(), anyInt(), anyInt(), any());
+    TransportClient transportClient =
+        factory.retryCreateClient("xxx", 10, 1, TransportFrameDecoder::new);
+    Assert.assertEquals(transportClient, client);
+  }
 }
diff --git 
a/common/src/test/java/org/apache/celeborn/common/network/sasl/RegistrationSuiteJ.java
 
b/common/src/test/java/org/apache/celeborn/common/network/sasl/RegistrationSuiteJ.java
index f41ded84d..5febcde5a 100644
--- 
a/common/src/test/java/org/apache/celeborn/common/network/sasl/RegistrationSuiteJ.java
+++ 
b/common/src/test/java/org/apache/celeborn/common/network/sasl/RegistrationSuiteJ.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import com.google.common.base.Throwables;
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -38,10 +39,17 @@ import 
org.apache.celeborn.common.network.util.TransportConf;
  * black boxes.
  */
 public class RegistrationSuiteJ extends SaslTestBase {
+  private CelebornConf celebornConf;
+
+  @Before
+  public void before() throws Exception {
+    celebornConf = new CelebornConf();
+    celebornConf.set("celeborn.shuffle.io.maxRetries", "1");
+  }
 
   @Test
   public void testRegistration() throws Throwable {
-    TransportConf conf = new TransportConf("shuffle", new CelebornConf());
+    TransportConf conf = new TransportConf("shuffle", celebornConf);
     RegistrationServerBootstrap serverBootstrap =
         new RegistrationServerBootstrap(conf, new TestSecretRegistry());
     RegistrationClientBootstrap clientBootstrap =
@@ -52,7 +60,7 @@ public class RegistrationSuiteJ extends SaslTestBase {
 
   @Test(expected = IOException.class)
   public void testReRegisterationFails() throws Throwable {
-    TransportConf conf = new TransportConf("shuffle", new CelebornConf());
+    TransportConf conf = new TransportConf("shuffle", celebornConf);
     // The SecretRegistryImpl already has the entry for TEST_USER so 
re-registering the app should
     // fail.
     RegistrationServerBootstrap serverBootstrap =
@@ -71,7 +79,7 @@ public class RegistrationSuiteJ extends SaslTestBase {
 
   @Test(expected = IOException.class)
   public void testConnectionAuthWithoutRegistrationShouldFail() throws 
Throwable {
-    TransportConf conf = new TransportConf("shuffle", new CelebornConf());
+    TransportConf conf = new TransportConf("shuffle", celebornConf);
     RegistrationServerBootstrap serverBootstrap =
         new RegistrationServerBootstrap(conf, new TestSecretRegistry());
     SaslClientBootstrap clientBootstrap =
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 0045c6e3c..5b86ae554 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -28,12 +28,12 @@ license: |
 | celeborn.&lt;module&gt;.io.connectionTimeout | &lt;value of 
celeborn.network.timeout&gt; | false | Connection active timeout. If setting 
<module> to `rpc_app`, works for shuffle client. If setting <module> to 
`rpc_service`, works for master or worker. If setting <module> to `data`, it 
works for shuffle client push and fetch data. If setting <module> to `push`, it 
works for worker receiving push data. If setting <module> to `replicate`, it 
works for replicate server or client of worker  [...]
 | celeborn.&lt;module&gt;.io.enableVerboseMetrics | false | false | Whether to 
track Netty memory detailed metrics. If true, the detailed metrics of Netty 
PoolByteBufAllocator will be gotten, otherwise only general memory usage will 
be tracked. |  |  | 
 | celeborn.&lt;module&gt;.io.lazyFD | true | false | Whether to initialize 
FileDescriptor lazily or not. If true, file descriptors are created only when 
data is going to be transferred. This can reduce the number of open files. If 
setting <module> to `fetch`, it works for worker fetch server. |  |  | 
-| celeborn.&lt;module&gt;.io.maxRetries | 3 | false | Max number of times we 
will try IO exceptions (such as connection timeouts) per request. If set to 0, 
we will not do any retries. If setting <module> to `push`, it works for Flink 
shuffle client push data. |  |  | 
+| celeborn.&lt;module&gt;.io.maxRetries | 3 | false | Max number of times we 
will try IO exceptions (such as connection timeouts) per request. If set to 0, 
we will not do any retries. If setting <module> to `data`, it works for shuffle 
client push and fetch data. If setting <module> to `replicate`, it works for 
replicate client of worker replicating data to peer worker.If setting <module> 
to `push`, it works for Flink shuffle client push data. |  |  | 
 | celeborn.&lt;module&gt;.io.mode | NIO | false | Netty EventLoopGroup 
backend, available options: NIO, EPOLL. |  |  | 
 | celeborn.&lt;module&gt;.io.numConnectionsPerPeer | 1 | false | Number of 
concurrent connections between two nodes. If setting <module> to `rpc_app`, 
works for shuffle client. If setting <module> to `rpc_service`, works for 
master or worker. If setting <module> to `data`, it works for shuffle client 
push and fetch data. If setting <module> to `replicate`, it works for replicate 
client of worker replicating data to peer worker. |  |  | 
 | celeborn.&lt;module&gt;.io.preferDirectBufs | true | false | If true, we 
will prefer allocating off-heap byte buffers within Netty. If setting <module> 
to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, 
works for master or worker. If setting <module> to `data`, it works for shuffle 
client push and fetch data. If setting <module> to `push`, it works for worker 
receiving push data. If setting <module> to `replicate`, it works for replicate 
server or client of w [...]
 | celeborn.&lt;module&gt;.io.receiveBuffer | 0b | false | Receive buffer size 
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should 
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 
10Gbps buffer size should be ~ 1.25MB. If setting <module> to `rpc_app`, works 
for shuffle client. If setting <module> to `rpc_service`, works for master or 
worker. If setting <module> to `data`, it works for shuffle client push and 
fetch data. If setting <mod [...]
-| celeborn.&lt;module&gt;.io.retryWait | 5s | false | Time that we will wait 
in order to perform a retry after an IOException. Only relevant if maxIORetries 
> 0. If setting <module> to `data`, it works for shuffle client push and fetch 
data. If setting <module> to `push`, it works for Flink shuffle client push 
data. | 0.2.0 |  | 
+| celeborn.&lt;module&gt;.io.retryWait | 5s | false | Time that we will wait 
in order to perform a retry after an IOException. Only relevant if maxIORetries 
> 0. If setting <module> to `data`, it works for shuffle client push and fetch 
data. If setting <module> to `replicate`, it works for replicate client of 
worker replicating data to peer worker.If setting <module> to `push`, it works 
for Flink shuffle client push data. | 0.2.0 |  | 
 | celeborn.&lt;module&gt;.io.saslTimeout | 30s | false | Timeout for a single 
round trip of auth message exchange, in milliseconds. | 0.5.0 |  | 
 | celeborn.&lt;module&gt;.io.sendBuffer | 0b | false | Send buffer size 
(SO_SNDBUF). If setting <module> to `rpc_app`, works for shuffle client. If 
setting <module> to `rpc_service`, works for master or worker. If setting 
<module> to `data`, it works for shuffle client push and fetch data. If setting 
<module> to `push`, it works for worker receiving push data. If setting 
<module> to `replicate`, it works for replicate server or client of worker 
replicating data to peer worker. If setting [...]
 | celeborn.&lt;module&gt;.io.serverThreads | 0 | false | Number of threads 
used in the server thread pool. Default to 0, which is 2x#cores. If setting 
<module> to `rpc_app`, works for shuffle client. If setting <module> to 
`rpc_service`, works for master or worker. If setting <module> to `push`, it 
works for worker receiving push data. If setting <module> to `replicate`, it 
works for replicate server of worker replicating data to peer worker. If 
setting <module> to `fetch`, it works for  [...]

Reply via email to