This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 17d2b257e [#2649] feat(spark): Introduce timeout mechanism when 
getting the decompressing data (#2651)
17d2b257e is described below

commit 17d2b257e3094d3d4d285d1ff0e584e51ecefae2
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Nov 4 17:11:52 2025 +0800

    [#2649] feat(spark): Introduce timeout mechanism when getting the 
decompressing data (#2651)
    
    ### What changes were proposed in this pull request?
    
    This PR is to introduce the timeout mechanism when getting the overlapping 
decompression data.
    
    ### Why are the changes needed?
    
    If not having this PR, the blocking wait have the potential risk to forever 
hang of the tasks when the rpc hang
    
    ### Does this PR introduce _any_ user-facing change?
    
    `rss.client.read.overlappingDecompressionFetchSecondsThreshold=-1`, this 
mechanism will be disabled by default.
    
    ### How was this patch tested?
    
    Internal job tests
---
 .../org/apache/uniffle/client/impl/DecompressionWorker.java   |  8 ++++++--
 .../org/apache/uniffle/client/impl/ShuffleReadClientImpl.java |  5 ++++-
 .../uniffle/client/response/DecompressedShuffleBlock.java     | 11 +++++++++--
 .../apache/uniffle/client/impl/DecompressionWorkerTest.java   |  5 +++--
 .../java/org/apache/uniffle/common/config/RssClientConf.java  |  8 ++++++++
 5 files changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index db6dc8b48..bd50caff2 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -49,7 +49,9 @@ public class DecompressionWorker {
   // the millis for the block get operation to measure profit from overlapping 
decompression
   private final AtomicLong waitMillis = new AtomicLong(0);
 
-  public DecompressionWorker(Codec codec, int threads) {
+  private final int fetchSecondsThreshold;
+
+  public DecompressionWorker(Codec codec, int threads, int 
fetchSecondsThreshold) {
     if (codec == null) {
       throw new IllegalArgumentException("Codec cannot be null");
     }
@@ -60,6 +62,7 @@ public class DecompressionWorker {
     this.executorService =
         Executors.newFixedThreadPool(threads, 
ThreadUtils.getThreadFactory("decompressionWorker"));
     this.codec = codec;
+    this.fetchSecondsThreshold = fetchSecondsThreshold;
   }
 
   public void add(int batchIndex, ShuffleDataResult shuffleDataResult) {
@@ -104,7 +107,8 @@ public class DecompressionWorker {
           new DecompressedShuffleBlock(
               f,
               waitMillis -> this.waitMillis.addAndGet(waitMillis),
-              bufferSegment.getTaskAttemptId()));
+              bufferSegment.getTaskAttemptId(),
+              fetchSecondsThreshold));
     }
   }
 
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index ba21e2146..fe126d531 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -61,6 +61,7 @@ import 
org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
 
 import static 
org.apache.uniffle.common.config.RssClientConf.READ_CLIENT_NEXT_SEGMENTS_REPORT_COUNT;
 import static 
org.apache.uniffle.common.config.RssClientConf.READ_CLIENT_NEXT_SEGMENTS_REPORT_ENABLED;
+import static 
org.apache.uniffle.common.config.RssClientConf.RSS_READ_OVERLAPPING_DECOMPRESSION_FETCH_SECONDS_THRESHOLD;
 
 public class ShuffleReadClientImpl implements ShuffleReadClient {
 
@@ -162,9 +163,11 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
 
   private void init(ShuffleClientFactory.ReadClientBuilder builder) {
     if (builder.isOverlappingDecompressionEnabled()) {
+      int fetchThreshold =
+          
builder.getRssConf().get(RSS_READ_OVERLAPPING_DECOMPRESSION_FETCH_SECONDS_THRESHOLD);
       this.decompressionWorker =
           new DecompressionWorker(
-              builder.getCodec(), 
builder.getOverlappingDecompressionThreadNum());
+              builder.getCodec(), 
builder.getOverlappingDecompressionThreadNum(), fetchThreshold);
     }
     this.shuffleId = builder.getShuffleId();
     this.partitionId = builder.getPartitionId();
diff --git 
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
 
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
index 22df42a2d..6f644ffd8 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.client.response;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.uniffle.common.exception.RssException;
@@ -26,12 +27,17 @@ import org.apache.uniffle.common.exception.RssException;
 public class DecompressedShuffleBlock extends ShuffleBlock {
   private CompletableFuture<ByteBuffer> f;
   private Consumer<Long> waitMillisCallback;
+  private final int fetchSecondsThreshold;
 
   public DecompressedShuffleBlock(
-      CompletableFuture<ByteBuffer> f, Consumer<Long> consumer, long 
taskAttemptId) {
+      CompletableFuture<ByteBuffer> f,
+      Consumer<Long> consumer,
+      long taskAttemptId,
+      int fetchSecondsThreshold) {
     super(taskAttemptId);
     this.f = f;
     this.waitMillisCallback = consumer;
+    this.fetchSecondsThreshold = fetchSecondsThreshold;
   }
 
   @Override
@@ -44,7 +50,8 @@ public class DecompressedShuffleBlock extends ShuffleBlock {
   public ByteBuffer getByteBuffer() {
     try {
       long startTime = System.currentTimeMillis();
-      ByteBuffer buffer = f.get();
+      ByteBuffer buffer =
+          fetchSecondsThreshold > 0 ? f.get(fetchSecondsThreshold, 
TimeUnit.SECONDS) : f.get();
       if (waitMillisCallback != null) {
         waitMillisCallback.accept(System.currentTimeMillis() - startTime);
       }
diff --git 
a/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java
 
b/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java
index bae129e54..5d41ac481 100644
--- 
a/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java
+++ 
b/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java
@@ -38,7 +38,8 @@ public class DecompressionWorkerTest {
 
   @Test
   public void testEmptyGet() throws Exception {
-    DecompressionWorker worker = new DecompressionWorker(Codec.newInstance(new 
RssConf()).get(), 1);
+    DecompressionWorker worker =
+        new DecompressionWorker(Codec.newInstance(new RssConf()).get(), 1, 10);
     assertNull(worker.get(1, 1));
   }
 
@@ -77,7 +78,7 @@ public class DecompressionWorkerTest {
     RssConf rssConf = new RssConf();
     rssConf.set(COMPRESSION_TYPE, Codec.Type.NOOP);
     Codec codec = Codec.newInstance(rssConf).get();
-    DecompressionWorker worker = new DecompressionWorker(codec, 1);
+    DecompressionWorker worker = new DecompressionWorker(codec, 1, 10);
 
     // create some data
     ShuffleDataResult shuffleDataResult = createShuffleDataResult(10, codec, 
100);
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index a7f67e1bc..0ce7be635 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -381,4 +381,12 @@ public class RssClientConf {
           .intType()
           .defaultValue(4)
           .withDescription("Next read segment count for shuffle-server read 
ahead");
+
+  public static final ConfigOption<Integer>
+      RSS_READ_OVERLAPPING_DECOMPRESSION_FETCH_SECONDS_THRESHOLD =
+          
ConfigOptions.key("rss.client.read.overlappingDecompressionFetchSecondsThreshold")
+              .intType()
+              .defaultValue(-1)
+              .withDescription(
+                  "Fetch seconds threshold for overlapping decompress shuffle 
blocks.");
 }

Reply via email to