This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1332362b [CELEBORN-213] Add configuration for whether to close idle
connections in client side (#1157)
1332362b is described below
commit 1332362bffcf276c53015fe9a036ecf570473e80
Author: Shuang <[email protected]>
AuthorDate: Tue Jan 10 19:13:33 2023 +0800
[CELEBORN-213] Add configuration for whether to close idle connections in
client side (#1157)
---
.../main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 3 ++-
.../src/main/scala/org/apache/celeborn/common/CelebornConf.scala | 9 +++++++++
docs/configuration/client.md | 1 +
3 files changed, 12 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index d7bc1700..7fc438a4 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -159,7 +159,8 @@ public class ShuffleClientImpl extends ShuffleClient {
TransportConf dataTransportConf =
Utils.fromCelebornConf(conf, module, conf.getInt("celeborn" + module +
".io.threads", 8));
TransportContext context =
- new TransportContext(dataTransportConf, new BaseMessageHandler(),
true);
+ new TransportContext(
+ dataTransportConf, new BaseMessageHandler(),
conf.clientCloseIdleConnections());
dataClientFactory = context.createClientFactory();
int pushDataRetryThreads = conf.pushRetryThreads();
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a8b46bc7..4e45b8f3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -514,6 +514,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
// Client //
// //////////////////////////////////////////////////////
def clientMaxTries: Int = get(CLIENT_MAX_RETRIES)
+ def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS)
def shuffleWriterMode: ShuffleMode =
ShuffleMode.valueOf(get(SHUFFLE_WRITER_MODE))
def shuffleForceFallbackEnabled: Boolean =
get(SHUFFLE_FORCE_FALLBACK_ENABLED)
def shuffleForceFallbackPartitionThreshold: Long =
get(SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
@@ -2301,6 +2302,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(15)
+ val CLIENT_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.closeIdleConnections")
+ .categories("client")
+ .doc("Whether client will close idle connections.")
+ .version("0.3.0")
+ .booleanConf
+ .createWithDefault(true)
+
val METRICS_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.metrics.enabled")
.withAlternative("rss.metrics.system.enabled")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 9e2692c2..212b51e5 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -21,6 +21,7 @@ license: |
| --- | ------- | ----------- | ----- |
| celeborn.application.heartbeatInterval | 10s | Interval for client to send
heartbeat message to master. | 0.2.0 |
| celeborn.client.blacklistSlave.enabled | true | When true, Celeborn will add
partition's peer worker into blacklist when push data to slave failed. | 0.3.0
|
+| celeborn.client.closeIdleConnections | true | Whether client will close idle
connections. | 0.3.0 |
| celeborn.client.maxRetries | 15 | Max retry times for client to connect
master endpoint | 0.2.0 |
| celeborn.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch
request. | 0.2.0 |
| celeborn.fetch.maxRetries | 3 | Max retries of fetch chunk | 0.2.0 |