This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new a998d9021 [CELEBORN-2056] Make the wait time for the client to read 
non shuffle partitions configurable
a998d9021 is described below

commit a998d9021b4479edf064b0450ddaee6ddaf4464e
Author: duanhao-jk <duanhao...@360shuke.com>
AuthorDate: Thu Jul 24 23:20:34 2025 -0700

    [CELEBORN-2056] Make the wait time for the client to read non shuffle 
partitions configurable
    
    ### What changes were proposed in this pull request?
    
    Added a configuration for client to read non shuffle partition waiting time
    
    ### Why are the changes needed?
    
    When the shuffle data of a task is relatively small and there are many 
empty shuffle partitions, it will take a lot of time for invalid waiting here
    
    ### Does this PR introduce _any_ user-facing change?
    
    add configurable
    
    ### How was this patch tested?
    
    production environment validation
    
    Closes #3358 from dh20/celeborn_add-20250707.
    
    Lead-authored-by: duanhao-jk <duanhao...@360shuke.com>
    Co-authored-by: Wang, Fei <fwan...@ebay.com>
    Signed-off-by: Wang, Fei <fwan...@ebay.com>
    (cherry picked from commit 29ab16989db68a9839d0c61ce6c8effd3f9ceaa9)
    Signed-off-by: Wang, Fei <fwan...@ebay.com>
---
 .../apache/celeborn/client/read/WorkerPartitionReader.java   |  4 +++-
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 12 ++++++++++++
 docs/configuration/client.md                                 |  1 +
 3 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
 
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
index df6f3902c..b6fb2209f 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
@@ -75,6 +75,7 @@ public class WorkerPartitionReader implements PartitionReader 
{
   private int fetchChunkRetryCnt;
   private int fetchChunkMaxRetry;
   private final boolean testFetch;
+  private Long pollChunkWaitTime;
 
   private Optional<PartitionReaderCheckpointMetadata> 
partitionReaderCheckpointMetadata;
 
@@ -97,6 +98,7 @@ public class WorkerPartitionReader implements PartitionReader 
{
     fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight();
     results = new LinkedBlockingQueue<>();
     fetchTimeoutMs = conf.clientFetchTimeoutMs();
+    pollChunkWaitTime = conf.clientFetchPollChunkWaitTime();
     inflightRequestCount = 0;
     this.metricsCallback = metricsCallback;
     // only add the buffer to results queue if this reader is not closed.
@@ -192,7 +194,7 @@ public class WorkerPartitionReader implements 
PartitionReader {
       while (chunk == null) {
         checkException();
         Long startFetchWait = System.nanoTime();
-        chunk = results.poll(500, TimeUnit.MILLISECONDS);
+        chunk = results.poll(pollChunkWaitTime, TimeUnit.MILLISECONDS);
         metricsCallback.incReadTime(
             TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
       }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 5148f31b9..e17bc1c1f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -996,6 +996,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   //               Shuffle Client Fetch                  //
   // //////////////////////////////////////////////////////
   def clientFetchTimeoutMs: Long = get(CLIENT_FETCH_TIMEOUT)
+  def clientFetchPollChunkWaitTime: Long = 
get(CLIENT_FETCH_POLL_CHUNK_WAIT_TIME)
   def clientFetchBufferSize: Int = get(CLIENT_FETCH_BUFFER_SIZE).toInt
   def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
   def isPartitionReaderCheckpointEnabled: Boolean =
@@ -4859,6 +4860,17 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("600s")
 
+  val CLIENT_FETCH_POLL_CHUNK_WAIT_TIME: ConfigEntry[Long] =
+    buildConf("celeborn.client.fetch.pollChunk.wait")
+      .categories("client")
+      .version("0.6.1")
+      .doc("The waiting time for shuffle client to read the empty chunk on the 
work side." +
+        "when there are many empty chunk in the shuffle partition of a small 
task," +
+        "the current value can be set small to avoid long waiting times and 
the illusion of the" +
+        "task getting stuck")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(500)
+
   val CLIENT_FETCH_BUFFER_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.client.fetch.buffer.size")
       .categories("client")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 0401fe266..047b82c64 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -35,6 +35,7 @@ license: |
 | celeborn.client.fetch.excludedWorker.expireTimeout | &lt;value of 
celeborn.client.excludedWorker.expireTimeout&gt; | false | ShuffleClient is a 
static object, it will be used in the whole lifecycle of Executor, We give a 
expire time for excluded workers to avoid a transient worker issues. | 0.3.0 |  
| 
 | celeborn.client.fetch.maxReqsInFlight | 3 | false | Amount of in-flight 
chunk fetch request. | 0.3.0 | celeborn.fetch.maxReqsInFlight | 
 | celeborn.client.fetch.maxRetriesForEachReplica | 3 | false | Max retry times 
of fetch chunk on each replica | 0.3.0 | 
celeborn.fetch.maxRetriesForEachReplica,celeborn.fetch.maxRetries | 
+| celeborn.client.fetch.pollChunk.wait | 500ms | false | The waiting time for 
shuffle client to read the empty chunk on the work side.when there are many 
empty chunk in the shuffle partition of a small task,the current value can be 
set small to avoid long waiting times and the illusion of thetask getting stuck 
| 0.6.1 |  | 
 | celeborn.client.fetch.timeout | 600s | false | Timeout for a task to open 
stream and fetch chunk. | 0.3.0 | celeborn.fetch.timeout | 
 | celeborn.client.flink.compression.enabled | true | false | Whether to 
compress data in Flink plugin. | 0.3.0 | 
remote-shuffle.job.enable-data-compression | 
 | celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | false | 
Max concurrent reading channels for a input gate. | 0.3.0 | 
remote-shuffle.job.concurrent-readings-per-gate | 

Reply via email to