This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 17d2b257e [#2649] feat(spark): Introduce timeout mechanism when
getting the decompressing data (#2651)
17d2b257e is described below
commit 17d2b257e3094d3d4d285d1ff0e584e51ecefae2
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Nov 4 17:11:52 2025 +0800
[#2649] feat(spark): Introduce timeout mechanism when getting the
decompressing data (#2651)
### What changes were proposed in this pull request?
This PR is to introduce the timeout mechanism when getting the overlapping
decompression data.
### Why are the changes needed?
If not having this PR, the blocking wait have the potential risk to forever
hang of the tasks when the rpc hang
### Does this PR introduce _any_ user-facing change?
`rss.client.read.overlappingDecompressionFetchSecondsThreshold=-1`, this
mechanism will be disabled by default.
### How was this patch tested?
Internal job tests
---
.../org/apache/uniffle/client/impl/DecompressionWorker.java | 8 ++++++--
.../org/apache/uniffle/client/impl/ShuffleReadClientImpl.java | 5 ++++-
.../uniffle/client/response/DecompressedShuffleBlock.java | 11 +++++++++--
.../apache/uniffle/client/impl/DecompressionWorkerTest.java | 5 +++--
.../java/org/apache/uniffle/common/config/RssClientConf.java | 8 ++++++++
5 files changed, 30 insertions(+), 7 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index db6dc8b48..bd50caff2 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -49,7 +49,9 @@ public class DecompressionWorker {
// the millis for the block get operation to measure profit from overlapping
decompression
private final AtomicLong waitMillis = new AtomicLong(0);
- public DecompressionWorker(Codec codec, int threads) {
+ private final int fetchSecondsThreshold;
+
+ public DecompressionWorker(Codec codec, int threads, int
fetchSecondsThreshold) {
if (codec == null) {
throw new IllegalArgumentException("Codec cannot be null");
}
@@ -60,6 +62,7 @@ public class DecompressionWorker {
this.executorService =
Executors.newFixedThreadPool(threads,
ThreadUtils.getThreadFactory("decompressionWorker"));
this.codec = codec;
+ this.fetchSecondsThreshold = fetchSecondsThreshold;
}
public void add(int batchIndex, ShuffleDataResult shuffleDataResult) {
@@ -104,7 +107,8 @@ public class DecompressionWorker {
new DecompressedShuffleBlock(
f,
waitMillis -> this.waitMillis.addAndGet(waitMillis),
- bufferSegment.getTaskAttemptId()));
+ bufferSegment.getTaskAttemptId(),
+ fetchSecondsThreshold));
}
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index ba21e2146..fe126d531 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -61,6 +61,7 @@ import
org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
import static
org.apache.uniffle.common.config.RssClientConf.READ_CLIENT_NEXT_SEGMENTS_REPORT_COUNT;
import static
org.apache.uniffle.common.config.RssClientConf.READ_CLIENT_NEXT_SEGMENTS_REPORT_ENABLED;
+import static
org.apache.uniffle.common.config.RssClientConf.RSS_READ_OVERLAPPING_DECOMPRESSION_FETCH_SECONDS_THRESHOLD;
public class ShuffleReadClientImpl implements ShuffleReadClient {
@@ -162,9 +163,11 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
private void init(ShuffleClientFactory.ReadClientBuilder builder) {
if (builder.isOverlappingDecompressionEnabled()) {
+ int fetchThreshold =
+
builder.getRssConf().get(RSS_READ_OVERLAPPING_DECOMPRESSION_FETCH_SECONDS_THRESHOLD);
this.decompressionWorker =
new DecompressionWorker(
- builder.getCodec(),
builder.getOverlappingDecompressionThreadNum());
+ builder.getCodec(),
builder.getOverlappingDecompressionThreadNum(), fetchThreshold);
}
this.shuffleId = builder.getShuffleId();
this.partitionId = builder.getPartitionId();
diff --git
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
index 22df42a2d..6f644ffd8 100644
---
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
+++
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.client.response;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.uniffle.common.exception.RssException;
@@ -26,12 +27,17 @@ import org.apache.uniffle.common.exception.RssException;
public class DecompressedShuffleBlock extends ShuffleBlock {
private CompletableFuture<ByteBuffer> f;
private Consumer<Long> waitMillisCallback;
+ private final int fetchSecondsThreshold;
public DecompressedShuffleBlock(
- CompletableFuture<ByteBuffer> f, Consumer<Long> consumer, long
taskAttemptId) {
+ CompletableFuture<ByteBuffer> f,
+ Consumer<Long> consumer,
+ long taskAttemptId,
+ int fetchSecondsThreshold) {
super(taskAttemptId);
this.f = f;
this.waitMillisCallback = consumer;
+ this.fetchSecondsThreshold = fetchSecondsThreshold;
}
@Override
@@ -44,7 +50,8 @@ public class DecompressedShuffleBlock extends ShuffleBlock {
public ByteBuffer getByteBuffer() {
try {
long startTime = System.currentTimeMillis();
- ByteBuffer buffer = f.get();
+ ByteBuffer buffer =
+ fetchSecondsThreshold > 0 ? f.get(fetchSecondsThreshold,
TimeUnit.SECONDS) : f.get();
if (waitMillisCallback != null) {
waitMillisCallback.accept(System.currentTimeMillis() - startTime);
}
diff --git
a/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java
b/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java
index bae129e54..5d41ac481 100644
---
a/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java
+++
b/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java
@@ -38,7 +38,8 @@ public class DecompressionWorkerTest {
@Test
public void testEmptyGet() throws Exception {
- DecompressionWorker worker = new DecompressionWorker(Codec.newInstance(new
RssConf()).get(), 1);
+ DecompressionWorker worker =
+ new DecompressionWorker(Codec.newInstance(new RssConf()).get(), 1, 10);
assertNull(worker.get(1, 1));
}
@@ -77,7 +78,7 @@ public class DecompressionWorkerTest {
RssConf rssConf = new RssConf();
rssConf.set(COMPRESSION_TYPE, Codec.Type.NOOP);
Codec codec = Codec.newInstance(rssConf).get();
- DecompressionWorker worker = new DecompressionWorker(codec, 1);
+ DecompressionWorker worker = new DecompressionWorker(codec, 1, 10);
// create some data
ShuffleDataResult shuffleDataResult = createShuffleDataResult(10, codec,
100);
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index a7f67e1bc..0ce7be635 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -381,4 +381,12 @@ public class RssClientConf {
.intType()
.defaultValue(4)
.withDescription("Next read segment count for shuffle-server read
ahead");
+
+ public static final ConfigOption<Integer>
+ RSS_READ_OVERLAPPING_DECOMPRESSION_FETCH_SECONDS_THRESHOLD =
+
ConfigOptions.key("rss.client.read.overlappingDecompressionFetchSecondsThreshold")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "Fetch seconds threshold for overlapping decompress shuffle
blocks.");
}