This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 74203516a [#2686] fix(client): Prefetch should be finished once 
shuffle result is empty or null (#2696)
74203516a is described below

commit 74203516a37acc34c72e7e0fb830cd882faa74d7
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Dec 16 15:28:15 2025 +0800

    [#2686] fix(client): Prefetch should be finished once shuffle result is 
empty or null (#2696)
    
    ### What changes were proposed in this pull request?
    
    Prefetch should be finished once shuffle result is empty or null
    
    ### Why are the changes needed?
    
    fix #2686
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
---
 .../impl/PrefetchableClientReadHandler.java        |  6 +-
 .../impl/PrefetchableClientReadHandlerTest.java    | 76 ++++++++++++++++++----
 2 files changed, 67 insertions(+), 15 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
index 286b2b4fc..2082911ac 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
@@ -78,6 +78,10 @@ public abstract class PrefetchableClientReadHandler extends 
AbstractClientReadHa
     }
   }
 
+  public boolean isFinished() {
+    return finishedTag.get();
+  }
+
   protected abstract ShuffleDataResult doReadShuffleData();
 
   @Override
@@ -97,7 +101,7 @@ public abstract class PrefetchableClientReadHandler extends 
AbstractClientReadHa
                 return;
               }
               ShuffleDataResult result = doReadShuffleData();
-              if (result == null) {
+              if (result == null || result.isEmpty()) {
                 this.finishedTag.set(true);
               }
               prefetchResultQueue.offer(Optional.ofNullable(result));
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandlerTest.java
 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandlerTest.java
index 9adac5a3d..67136035d 100644
--- 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandlerTest.java
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandlerTest.java
@@ -17,37 +17,58 @@
 
 package org.apache.uniffle.storage.handler.impl;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.exception.RssException;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class PrefetchableClientReadHandlerTest {
 
   class MockedHandler extends PrefetchableClientReadHandler {
-    private AtomicInteger readNum;
+    private AtomicInteger maxReadLoopNum;
     private boolean markTimeout;
     private boolean markFetchFailure;
+    private boolean markEmptyResult;
 
     MockedHandler(
         Optional<PrefetchOption> option,
-        int readNum,
+        AtomicInteger maxReadLoopNum,
         boolean markTimeout,
         boolean markFetchFailure) {
+      this(option, maxReadLoopNum, markTimeout, markFetchFailure, false);
+    }
+
+    MockedHandler(
+        Optional<PrefetchOption> option,
+        AtomicInteger maxReadLoopNum,
+        boolean markTimeout,
+        boolean markFetchFailure,
+        boolean markEmptyResult) {
       super(option);
-      this.readNum = new AtomicInteger(readNum);
+      this.maxReadLoopNum = maxReadLoopNum;
       this.markTimeout = markTimeout;
       this.markFetchFailure = markFetchFailure;
+      this.markEmptyResult = markEmptyResult;
     }
 
     @Override
     protected ShuffleDataResult doReadShuffleData() {
+      if (markEmptyResult) {
+        maxReadLoopNum.decrementAndGet();
+        return new ShuffleDataResult();
+      }
+
       if (markFetchFailure) {
         throw new RssException("");
       }
@@ -59,19 +80,39 @@ public class PrefetchableClientReadHandlerTest {
           // ignore
         }
       }
-      if (readNum.get() > 0) {
-        readNum.decrementAndGet();
-        return new ShuffleDataResult();
+      if (maxReadLoopNum.get() > 0) {
+        maxReadLoopNum.decrementAndGet();
+        List<BufferSegment> segments = new ArrayList<>();
+        segments.add(new BufferSegment(1, 1, 1, 1, 1, 1));
+        return new ShuffleDataResult(new byte[10], segments);
       }
       return null;
     }
   }
 
   @Test
-  public void test_with_prefetch() {
+  public void testWithPrefetchWithEmptyResult() {
+    AtomicInteger maxReadLoopNum = new AtomicInteger(10);
     PrefetchableClientReadHandler handler =
         new MockedHandler(
-            Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4, 
10)), 10, false, false);
+            Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4, 
10)),
+            maxReadLoopNum,
+            false,
+            false,
+            true);
+    assertTrue(handler.readShuffleData().isEmpty());
+    Awaitility.await().until(() -> handler.isFinished());
+    assertEquals(9, maxReadLoopNum.get());
+  }
+
+  @Test
+  public void testWithPrefetch() {
+    PrefetchableClientReadHandler handler =
+        new MockedHandler(
+            Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4, 
10)),
+            new AtomicInteger(10),
+            false,
+            false);
     int counter = 0;
     while (true) {
       if (handler.readShuffleData() != null) {
@@ -84,11 +125,14 @@ public class PrefetchableClientReadHandlerTest {
   }
 
   @Test
-  public void test_with_timeout() {
+  public void testWithPrefetchButTimeout() {
     try {
       PrefetchableClientReadHandler handler =
           new MockedHandler(
-              Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4, 
1)), 10, true, false);
+              Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4, 
1)),
+              new AtomicInteger(10),
+              true,
+              false);
       handler.readShuffleData();
       fail();
     } catch (Exception e) {
@@ -97,11 +141,14 @@ public class PrefetchableClientReadHandlerTest {
   }
 
   @Test
-  public void test_with_fetch_failure() {
+  public void testWithPrefetchButFailure() {
     try {
       PrefetchableClientReadHandler handler =
           new MockedHandler(
-              Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4, 
1)), 10, false, true);
+              Optional.of(new PrefetchableClientReadHandler.PrefetchOption(4, 
1)),
+              new AtomicInteger(10),
+              false,
+              true);
       handler.readShuffleData();
       fail();
     } catch (Exception e) {
@@ -110,8 +157,9 @@ public class PrefetchableClientReadHandlerTest {
   }
 
   @Test
-  public void test_without_prefetch() {
-    PrefetchableClientReadHandler handler = new 
MockedHandler(Optional.empty(), 10, true, false);
+  public void testWithoutPrefetch() {
+    PrefetchableClientReadHandler handler =
+        new MockedHandler(Optional.empty(), new AtomicInteger(10), true, 
false);
     handler.readShuffleData();
   }
 }

Reply via email to