This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 14baec838 [CELEBORN-1673] Support retry create client
14baec838 is described below
commit 14baec8388d894c591d07edaa6e62fd9dbd993fd
Author: Shuang <[email protected]>
AuthorDate: Thu Oct 31 14:45:48 2024 +0800
[CELEBORN-1673] Support retry create client
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Currently, only Flink retries establishing a client when a connection
problem occurs. This would be beneficial for all other engines to implement as
well.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #2855 from RexXiong/CELEBORN-1673.
Lead-authored-by: Shuang <[email protected]>
Co-authored-by: lvshuang.xjs <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../flink/network/FlinkTransportClientFactory.java | 42 ++++------------------
.../flink/readclient/FlinkShuffleClientImpl.java | 4 +--
.../network/client/TransportClientFactory.java | 41 ++++++++++++++++++++-
.../org/apache/celeborn/common/CelebornConf.scala | 6 ++++
.../network/TransportClientFactorySuiteJ.java | 20 +++++++++++
.../common/network/sasl/RegistrationSuiteJ.java | 14 ++++++--
docs/configuration/network.md | 4 +--
7 files changed, 86 insertions(+), 45 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
index aef4891ca..3cb180b3f 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
@@ -32,57 +32,27 @@ import
org.apache.celeborn.common.network.client.TransportClient;
import org.apache.celeborn.common.network.client.TransportClientBootstrap;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.util.JavaUtils;
-import org.apache.celeborn.plugin.flink.utils.Utils;
public class FlinkTransportClientFactory extends TransportClientFactory {
public static final Logger logger =
LoggerFactory.getLogger(FlinkTransportClientFactory.class);
private ConcurrentHashMap<Long, Supplier<ByteBuf>> bufferSuppliers;
- private final int fetchMaxRetries;
public FlinkTransportClientFactory(
- TransportContext context, int fetchMaxRetries,
List<TransportClientBootstrap> bootstraps) {
+ TransportContext context, List<TransportClientBootstrap> bootstraps) {
super(context, bootstraps);
bufferSuppliers = JavaUtils.newConcurrentHashMap();
- this.fetchMaxRetries = fetchMaxRetries;
this.pooledAllocator = new UnpooledByteBufAllocator(true);
}
public TransportClient createClientWithRetry(String remoteHost, int
remotePort)
throws IOException, InterruptedException {
- int retryCount = fetchMaxRetries;
-
- while (retryCount > 0) {
- try {
- return createClient(remoteHost, remotePort);
- } catch (Exception e) {
- retryCount--;
- logger.warn(
- "Retrying ({}/{}) times create client to {}:{}",
- retryCount,
- fetchMaxRetries,
- remoteHost,
- remotePort,
- e);
- if (retryCount == 0) {
- if (e instanceof InterruptedException || e instanceof IOException) {
- throw e;
- } else {
- Utils.rethrowAsRuntimeException(e);
- }
- }
- }
- }
-
- return null;
- }
-
- @Override
- public TransportClient createClient(String remoteHost, int remotePort)
- throws IOException, InterruptedException {
- return createClient(
- remoteHost, remotePort, -1, new
TransportFrameDecoderWithBufferSupplier(bufferSuppliers));
+ return retryCreateClient(
+ remoteHost,
+ remotePort,
+ -1,
+ () -> new TransportFrameDecoderWithBufferSupplier(bufferSuppliers));
}
public void registerSupplier(long streamId, Supplier<ByteBuf> supplier) {
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index a3f8a1a9b..650ed1a3a 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -147,9 +147,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
private void initializeTransportClientFactory() {
if (null == flinkTransportClientFactory) {
- flinkTransportClientFactory =
- new FlinkTransportClientFactory(
- context, conf.clientFetchMaxRetriesForEachReplica(),
createBootstraps());
+ flinkTransportClientFactory = new FlinkTransportClientFactory(context,
createBootstraps());
}
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 8dd2380fe..7b87be40b 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -26,10 +26,12 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
@@ -92,6 +94,8 @@ public class TransportClientFactory implements Closeable {
private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
protected ByteBufAllocator pooledAllocator;
+ private final int maxClientConnectRetries;
+ private final int maxClientConnectRetryWaitTimeMs;
public TransportClientFactory(
TransportContext context, List<TransportClientBootstrap>
clientBootstraps) {
@@ -114,6 +118,8 @@ public class TransportClientFactory implements Closeable {
this.pooledAllocator =
NettyUtils.getPooledByteBufAllocator(
conf, context.getSource(), false, conf.clientThreads());
+ this.maxClientConnectRetries = conf.maxIORetries();
+ this.maxClientConnectRetryWaitTimeMs = conf.ioRetryWaitTimeMs();
}
/**
@@ -130,7 +136,40 @@ public class TransportClientFactory implements Closeable {
*/
public TransportClient createClient(String remoteHost, int remotePort, int
partitionId)
throws IOException, InterruptedException {
- return createClient(remoteHost, remotePort, partitionId, new
TransportFrameDecoder());
+ return retryCreateClient(remoteHost, remotePort, partitionId,
TransportFrameDecoder::new);
+ }
+
+ public TransportClient retryCreateClient(
+ String remoteHost,
+ int remotePort,
+ int partitionId,
+ Supplier<ChannelInboundHandlerAdapter> supplier)
+ throws IOException, InterruptedException {
+ int numTries = 0;
+ while (numTries < maxClientConnectRetries) {
+ try {
+ return createClient(remoteHost, remotePort, partitionId,
supplier.get());
+ } catch (Exception e) {
+ numTries++;
+ logger.warn(
+ "Retry create client, times {}/{} with error: {}",
+ numTries,
+ maxClientConnectRetries,
+ e.getMessage(),
+ e);
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ if (numTries == maxClientConnectRetries) {
+ throw e;
+ }
+
+ Uninterruptibles.sleepUninterruptibly(
+ maxClientConnectRetryWaitTimeMs, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ return null;
}
public TransportClient createClient(
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 8253ec298..046c9c95a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2038,6 +2038,10 @@ object CelebornConf extends Logging {
.doc(
"Max number of times we will try IO exceptions (such as connection
timeouts) per request. " +
"If set to 0, we will not do any retries. " +
+ s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`,
" +
+ s"it works for shuffle client push and fetch data. " +
+ s"If setting <module> to
`${TransportModuleConstants.REPLICATE_MODULE}`, " +
+ s"it works for replicate client of worker replicating data to peer
worker." +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`,
" +
s"it works for Flink shuffle client push data.")
.intConf
@@ -2050,6 +2054,8 @@ object CelebornConf extends Logging {
"Only relevant if maxIORetries > 0. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
+ s"If setting <module> to
`${TransportModuleConstants.REPLICATE_MODULE}`, " +
+ s"it works for replicate client of worker replicating data to peer
worker." +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for Flink shuffle client push data.")
.version("0.2.0")
diff --git
a/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
index fb87d1af3..7ec7190aa 100644
---
a/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
@@ -19,14 +19,20 @@ package org.apache.celeborn.common.network;
import static org.apache.celeborn.common.util.JavaUtils.getLocalHost;
import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.client.TransportClient;
@@ -34,6 +40,7 @@ import
org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.network.server.BaseMessageHandler;
import org.apache.celeborn.common.network.server.TransportServer;
import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.network.util.TransportFrameDecoder;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
@@ -240,4 +247,17 @@ public class TransportClientFactorySuiteJ {
assertNotEquals(exception.getCause(), null);
}
}
+
+ @Test
+ public void testRetryCreateClient() throws IOException, InterruptedException
{
+ TransportClientFactory factory =
Mockito.spy(context.createClientFactory());
+ TransportClient client = mock(TransportClient.class);
+ Mockito.doThrow(new IOException("xx"))
+ .doReturn(client)
+ .when(factory)
+ .createClient(anyString(), anyInt(), anyInt(), any());
+ TransportClient transportClient =
+ factory.retryCreateClient("xxx", 10, 1, TransportFrameDecoder::new);
+ Assert.assertEquals(transportClient, client);
+ }
}
diff --git
a/common/src/test/java/org/apache/celeborn/common/network/sasl/RegistrationSuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/network/sasl/RegistrationSuiteJ.java
index f41ded84d..5febcde5a 100644
---
a/common/src/test/java/org/apache/celeborn/common/network/sasl/RegistrationSuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/network/sasl/RegistrationSuiteJ.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import com.google.common.base.Throwables;
+import org.junit.Before;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
@@ -38,10 +39,17 @@ import
org.apache.celeborn.common.network.util.TransportConf;
* black boxes.
*/
public class RegistrationSuiteJ extends SaslTestBase {
+ private CelebornConf celebornConf;
+
+ @Before
+ public void before() throws Exception {
+ celebornConf = new CelebornConf();
+ celebornConf.set("celeborn.shuffle.io.maxRetries", "1");
+ }
@Test
public void testRegistration() throws Throwable {
- TransportConf conf = new TransportConf("shuffle", new CelebornConf());
+ TransportConf conf = new TransportConf("shuffle", celebornConf);
RegistrationServerBootstrap serverBootstrap =
new RegistrationServerBootstrap(conf, new TestSecretRegistry());
RegistrationClientBootstrap clientBootstrap =
@@ -52,7 +60,7 @@ public class RegistrationSuiteJ extends SaslTestBase {
@Test(expected = IOException.class)
public void testReRegisterationFails() throws Throwable {
- TransportConf conf = new TransportConf("shuffle", new CelebornConf());
+ TransportConf conf = new TransportConf("shuffle", celebornConf);
// The SecretRegistryImpl already has the entry for TEST_USER so
re-registering the app should
// fail.
RegistrationServerBootstrap serverBootstrap =
@@ -71,7 +79,7 @@ public class RegistrationSuiteJ extends SaslTestBase {
@Test(expected = IOException.class)
public void testConnectionAuthWithoutRegistrationShouldFail() throws
Throwable {
- TransportConf conf = new TransportConf("shuffle", new CelebornConf());
+ TransportConf conf = new TransportConf("shuffle", celebornConf);
RegistrationServerBootstrap serverBootstrap =
new RegistrationServerBootstrap(conf, new TestSecretRegistry());
SaslClientBootstrap clientBootstrap =
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 45b9f72dc..59057daa0 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -28,12 +28,12 @@ license: |
| celeborn.<module>.io.connectionTimeout | <value of
celeborn.network.timeout> | false | Connection active timeout. If setting
<module> to `rpc_app`, works for shuffle client. If setting <module> to
`rpc_service`, works for master or worker. If setting <module> to `data`, it
works for shuffle client push and fetch data. If setting <module> to `push`, it
works for worker receiving push data. If setting <module> to `replicate`, it
works for replicate server or client of worker [...]
| celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to
track Netty memory detailed metrics. If true, the detailed metrics of Netty
PoolByteBufAllocator will be gotten, otherwise only general memory usage will
be tracked. | | |
| celeborn.<module>.io.lazyFD | true | false | Whether to initialize
FileDescriptor lazily or not. If true, file descriptors are created only when
data is going to be transferred. This can reduce the number of open files. If
setting <module> to `fetch`, it works for worker fetch server. | | |
-| celeborn.<module>.io.maxRetries | 3 | false | Max number of times we
will try IO exceptions (such as connection timeouts) per request. If set to 0,
we will not do any retries. If setting <module> to `push`, it works for Flink
shuffle client push data. | | |
+| celeborn.<module>.io.maxRetries | 3 | false | Max number of times we
will try IO exceptions (such as connection timeouts) per request. If set to 0,
we will not do any retries. If setting <module> to `data`, it works for shuffle
client push and fetch data. If setting <module> to `replicate`, it works for
replicate client of worker replicating data to peer worker.If setting <module>
to `push`, it works for Flink shuffle client push data. | | |
| celeborn.<module>.io.mode | NIO | false | Netty EventLoopGroup
backend, available options: NIO, EPOLL. | | |
| celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of
concurrent connections between two nodes. If setting <module> to `rpc_app`,
works for shuffle client. If setting <module> to `rpc_service`, works for
master or worker. If setting <module> to `data`, it works for shuffle client
push and fetch data. If setting <module> to `replicate`, it works for replicate
client of worker replicating data to peer worker. | | |
| celeborn.<module>.io.preferDirectBufs | true | false | If true, we
will prefer allocating off-heap byte buffers within Netty. If setting <module>
to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`,
works for master or worker. If setting <module> to `data`, it works for shuffle
client push and fetch data. If setting <module> to `push`, it works for worker
receiving push data. If setting <module> to `replicate`, it works for replicate
server or client of w [...]
| celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth =
10Gbps buffer size should be ~ 1.25MB. If setting <module> to `rpc_app`, works
for shuffle client. If setting <module> to `rpc_service`, works for master or
worker. If setting <module> to `data`, it works for shuffle client push and
fetch data. If setting <mod [...]
-| celeborn.<module>.io.retryWait | 5s | false | Time that we will wait
in order to perform a retry after an IOException. Only relevant if maxIORetries
> 0. If setting <module> to `data`, it works for shuffle client push and fetch
data. If setting <module> to `push`, it works for Flink shuffle client push
data. | 0.2.0 | |
+| celeborn.<module>.io.retryWait | 5s | false | Time that we will wait
in order to perform a retry after an IOException. Only relevant if maxIORetries
> 0. If setting <module> to `data`, it works for shuffle client push and fetch
data. If setting <module> to `replicate`, it works for replicate client of
worker replicating data to peer worker.If setting <module> to `push`, it works
for Flink shuffle client push data. | 0.2.0 | |
| celeborn.<module>.io.saslTimeout | 30s | false | Timeout for a single
round trip of auth message exchange, in milliseconds. | 0.5.0 | |
| celeborn.<module>.io.sendBuffer | 0b | false | Send buffer size
(SO_SNDBUF). If setting <module> to `rpc_app`, works for shuffle client. If
setting <module> to `rpc_service`, works for master or worker. If setting
<module> to `data`, it works for shuffle client push and fetch data. If setting
<module> to `push`, it works for worker receiving push data. If setting
<module> to `replicate`, it works for replicate server or client of worker
replicating data to peer worker. If setting [...]
| celeborn.<module>.io.serverThreads | 0 | false | Number of threads
used in the server thread pool. Default to 0, which is 2x#cores. If setting
<module> to `rpc_app`, works for shuffle client. If setting <module> to
`rpc_service`, works for master or worker. If setting <module> to `push`, it
works for worker receiving push data. If setting <module> to `replicate`, it
works for replicate server of worker replicating data to peer worker. If
setting <module> to `fetch`, it works for [...]