This is an automated email from the ASF dual-hosted git repository. feiwang pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push: new a998d9021 [CELEBORN-2056] Make the wait time for the client to read non shuffle partitions configurable a998d9021 is described below commit a998d9021b4479edf064b0450ddaee6ddaf4464e Author: duanhao-jk <duanhao...@360shuke.com> AuthorDate: Thu Jul 24 23:20:34 2025 -0700 [CELEBORN-2056] Make the wait time for the client to read non shuffle partitions configurable ### What changes were proposed in this pull request? Added a configuration for client to read non shuffle partition waiting time ### Why are the changes needed? When the shuffle data of a task is relatively small and there are many empty shuffle partitions, it will take a lot of time for invalid waiting here ### Does this PR introduce _any_ user-facing change? add configurable ### How was this patch tested? production environment validation Closes #3358 from dh20/celeborn_add-20250707. Lead-authored-by: duanhao-jk <duanhao...@360shuke.com> Co-authored-by: Wang, Fei <fwan...@ebay.com> Signed-off-by: Wang, Fei <fwan...@ebay.com> (cherry picked from commit 29ab16989db68a9839d0c61ce6c8effd3f9ceaa9) Signed-off-by: Wang, Fei <fwan...@ebay.com> --- .../apache/celeborn/client/read/WorkerPartitionReader.java | 4 +++- .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 12 ++++++++++++ docs/configuration/client.md | 1 + 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java index df6f3902c..b6fb2209f 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java @@ -75,6 +75,7 @@ public class WorkerPartitionReader implements PartitionReader { private int fetchChunkRetryCnt; private int fetchChunkMaxRetry; private final boolean testFetch; + private Long pollChunkWaitTime; private Optional<PartitionReaderCheckpointMetadata> partitionReaderCheckpointMetadata; @@ -97,6 +98,7 @@ public class WorkerPartitionReader implements PartitionReader { fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight(); results = new LinkedBlockingQueue<>(); fetchTimeoutMs = conf.clientFetchTimeoutMs(); + pollChunkWaitTime = conf.clientFetchPollChunkWaitTime(); inflightRequestCount = 0; this.metricsCallback = metricsCallback; // only add the buffer to results queue if this reader is not closed. @@ -192,7 +194,7 @@ public class WorkerPartitionReader implements PartitionReader { while (chunk == null) { checkException(); Long startFetchWait = System.nanoTime(); - chunk = results.poll(500, TimeUnit.MILLISECONDS); + chunk = results.poll(pollChunkWaitTime, TimeUnit.MILLISECONDS); metricsCallback.incReadTime( TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)); } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 5148f31b9..e17bc1c1f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -996,6 +996,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // Shuffle Client Fetch // // ////////////////////////////////////////////////////// def clientFetchTimeoutMs: Long = get(CLIENT_FETCH_TIMEOUT) + def clientFetchPollChunkWaitTime: Long = get(CLIENT_FETCH_POLL_CHUNK_WAIT_TIME) def clientFetchBufferSize: Int = get(CLIENT_FETCH_BUFFER_SIZE).toInt def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT) def isPartitionReaderCheckpointEnabled: Boolean = @@ -4859,6 +4860,17 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("600s") + val CLIENT_FETCH_POLL_CHUNK_WAIT_TIME: ConfigEntry[Long] = + buildConf("celeborn.client.fetch.pollChunk.wait") + .categories("client") + .version("0.6.1") + .doc("The waiting time for shuffle client to read the empty chunk on the work side." + + "when there are many empty chunk in the shuffle partition of a small task," + + "the current value can be set small to avoid long waiting times and the illusion of the" + + "task getting stuck") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(500) + val CLIENT_FETCH_BUFFER_SIZE: ConfigEntry[Long] = buildConf("celeborn.client.fetch.buffer.size") .categories("client") diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 0401fe266..047b82c64 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -35,6 +35,7 @@ license: | | celeborn.client.fetch.excludedWorker.expireTimeout | <value of celeborn.client.excludedWorker.expireTimeout> | false | ShuffleClient is a static object, it will be used in the whole lifecycle of Executor, We give a expire time for excluded workers to avoid a transient worker issues. | 0.3.0 | | | celeborn.client.fetch.maxReqsInFlight | 3 | false | Amount of in-flight chunk fetch request. | 0.3.0 | celeborn.fetch.maxReqsInFlight | | celeborn.client.fetch.maxRetriesForEachReplica | 3 | false | Max retry times of fetch chunk on each replica | 0.3.0 | celeborn.fetch.maxRetriesForEachReplica,celeborn.fetch.maxRetries | +| celeborn.client.fetch.pollChunk.wait | 500ms | false | The waiting time for shuffle client to read the empty chunk on the work side.when there are many empty chunk in the shuffle partition of a small task,the current value can be set small to avoid long waiting times and the illusion of thetask getting stuck | 0.6.1 | | | celeborn.client.fetch.timeout | 600s | false | Timeout for a task to open stream and fetch chunk. | 0.3.0 | celeborn.fetch.timeout | | celeborn.client.flink.compression.enabled | true | false | Whether to compress data in Flink plugin. | 0.3.0 | remote-shuffle.job.enable-data-compression | | celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | false | Max concurrent reading channels for a input gate. | 0.3.0 | remote-shuffle.job.concurrent-readings-per-gate |