This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 74203516a [#2686] fix(client): Prefetch should be finished once
shuffle result is empty or null (#2696)
74203516a is described below
commit 74203516a37acc34c72e7e0fb830cd882faa74d7
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Dec 16 15:28:15 2025 +0800
[#2686] fix(client): Prefetch should be finished once shuffle result is
empty or null (#2696)
### What changes were proposed in this pull request?
Prefetch should be finished once shuffle result is empty or null
### Why are the changes needed?
fix #2686
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests
---
.../impl/PrefetchableClientReadHandler.java | 6 +-
.../impl/PrefetchableClientReadHandlerTest.java | 76 ++++++++++++++++++----
2 files changed, 67 insertions(+), 15 deletions(-)
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
index 286b2b4fc..2082911ac 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
@@ -78,6 +78,10 @@ public abstract class PrefetchableClientReadHandler extends
AbstractClientReadHa
}
}
+ public boolean isFinished() {
+ return finishedTag.get();
+ }
+
protected abstract ShuffleDataResult doReadShuffleData();
@Override
@@ -97,7 +101,7 @@ public abstract class PrefetchableClientReadHandler extends
AbstractClientReadHa
return;
}
ShuffleDataResult result = doReadShuffleData();
- if (result == null) {
+ if (result == null || result.isEmpty()) {
this.finishedTag.set(true);
}
prefetchResultQueue.offer(Optional.ofNullable(result));
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandlerTest.java
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandlerTest.java
index 9adac5a3d..67136035d 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandlerTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandlerTest.java
@@ -17,37 +17,58 @@
package org.apache.uniffle.storage.handler.impl;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class PrefetchableClientReadHandlerTest {
class MockedHandler extends PrefetchableClientReadHandler {
- private AtomicInteger readNum;
+ private AtomicInteger maxReadLoopNum;
private boolean markTimeout;
private boolean markFetchFailure;
+ private boolean markEmptyResult;
MockedHandler(
Optional<PrefetchOption> option,
- int readNum,
+ AtomicInteger maxReadLoopNum,
boolean markTimeout,
boolean markFetchFailure) {
+ this(option, maxReadLoopNum, markTimeout, markFetchFailure, false);
+ }
+
+ MockedHandler(
+ Optional<PrefetchOption> option,
+ AtomicInteger maxReadLoopNum,
+ boolean markTimeout,
+ boolean markFetchFailure,
+ boolean markEmptyResult) {
super(option);
- this.readNum = new AtomicInteger(readNum);
+ this.maxReadLoopNum = maxReadLoopNum;
this.markTimeout = markTimeout;
this.markFetchFailure = markFetchFailure;
+ this.markEmptyResult = markEmptyResult;
}
@Override
protected ShuffleDataResult doReadShuffleData() {
+ if (markEmptyResult) {
+ maxReadLoopNum.decrementAndGet();
+ return new ShuffleDataResult();
+ }
+
if (markFetchFailure) {
throw new RssException("");
}
@@ -59,19 +80,39 @@ public class PrefetchableClientReadHandlerTest {
// ignore
}
}
- if (readNum.get() > 0) {
- readNum.decrementAndGet();
- return new ShuffleDataResult();
+ if (maxReadLoopNum.get() > 0) {
+ maxReadLoopNum.decrementAndGet();
+ List<BufferSegment> segments = new ArrayList<>();
+ segments.add(new BufferSegment(1, 1, 1, 1, 1, 1));
+ return new ShuffleDataResult(new byte[10], segments);
}
return null;
}
}
@Test
- public void test_with_prefetch() {
+ public void testWithPrefetchWithEmptyResult() {
+ AtomicInteger maxReadLoopNum = new AtomicInteger(10);
PrefetchableClientReadHandler handler =
new MockedHandler(
- Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4,
10)), 10, false, false);
+ Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4,
10)),
+ maxReadLoopNum,
+ false,
+ false,
+ true);
+ assertTrue(handler.readShuffleData().isEmpty());
+ Awaitility.await().until(() -> handler.isFinished());
+ assertEquals(9, maxReadLoopNum.get());
+ }
+
+ @Test
+ public void testWithPrefetch() {
+ PrefetchableClientReadHandler handler =
+ new MockedHandler(
+ Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4,
10)),
+ new AtomicInteger(10),
+ false,
+ false);
int counter = 0;
while (true) {
if (handler.readShuffleData() != null) {
@@ -84,11 +125,14 @@ public class PrefetchableClientReadHandlerTest {
}
@Test
- public void test_with_timeout() {
+ public void testWithPrefetchButTimeout() {
try {
PrefetchableClientReadHandler handler =
new MockedHandler(
- Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4,
1)), 10, true, false);
+ Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4,
1)),
+ new AtomicInteger(10),
+ true,
+ false);
handler.readShuffleData();
fail();
} catch (Exception e) {
@@ -97,11 +141,14 @@ public class PrefetchableClientReadHandlerTest {
}
@Test
- public void test_with_fetch_failure() {
+ public void testWithPrefetchButFailure() {
try {
PrefetchableClientReadHandler handler =
new MockedHandler(
- Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4,
1)), 10, false, true);
+ Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4,
1)),
+ new AtomicInteger(10),
+ false,
+ true);
handler.readShuffleData();
fail();
} catch (Exception e) {
@@ -110,8 +157,9 @@ public class PrefetchableClientReadHandlerTest {
}
@Test
- public void test_without_prefetch() {
- PrefetchableClientReadHandler handler = new
MockedHandler(Optional.empty(), 10, true, false);
+ public void testWithoutPrefetch() {
+ PrefetchableClientReadHandler handler =
+ new MockedHandler(Optional.empty(), new AtomicInteger(10), true,
false);
handler.readShuffleData();
}
}