This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new cf29d36f9 [#2674] improvement(client): use ack val to check the block 
send result (#2703)
cf29d36f9 is described below

commit cf29d36f9bdf6ab45cf2e9b2ee914122aa149ee2
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Dec 29 16:23:46 2025 +0800

    [#2674] improvement(client): use ack val to check the block send result 
(#2703)
    
    ### What changes were proposed in this pull request?
    Use ack val to check the block send result.
    
    ### Why are the changes needed?
    For better performance.
    Fix: #2674
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    CI
---
 .../spark/shuffle/writer/RssShuffleWriter.java     | 91 ++++++++++++++++------
 .../spark/shuffle/writer/RssShuffleWriter.java     | 23 ++++--
 2 files changed, 83 insertions(+), 31 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index fd7f72ccb..e7f7c49b6 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -24,10 +24,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -98,6 +100,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private long taskAttemptId;
   private ShuffleDependency<K, V, C> shuffleDependency;
   private ShuffleWriteMetrics shuffleWriteMetrics;
+  private final BlockingQueue<Object> finishEventQueue = new 
LinkedBlockingQueue<>();
   private Partitioner partitioner;
   private boolean shouldPartition;
   private WriteBufferManager bufferManager;
@@ -282,7 +285,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     long s = System.currentTimeMillis();
     checkAllBufferSpilled();
     checkSentRecordCount(recordCount);
-    checkBlockSendResult(new HashSet<>(blockIds));
+    checkBlockSendResult(blockIds);
     checkSentBlockCount();
     final long checkDuration = System.currentTimeMillis() - s;
     long commitDuration = 0;
@@ -396,6 +399,13 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     List<CompletableFuture<Long>> futures = new ArrayList<>();
     for (AddBlockEvent event : 
bufferManager.buildBlockEvents(shuffleBlockInfoList)) {
       futures.add(shuffleManager.sendData(event));
+      event.addCallback(
+          () -> {
+            boolean ret = finishEventQueue.add(new Object());
+            if (!ret) {
+              LOG.error("Add event " + event + " to finishEventQueue fail");
+            }
+          });
     }
     return futures;
   }
@@ -435,42 +445,73 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
   @VisibleForTesting
   protected void checkBlockSendResult(Set<Long> blockIds) {
-    long start = System.currentTimeMillis();
-    while (true) {
-      Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
-      Set<Long> successBlockIds = shuffleManager.getSuccessBlockIds(taskId);
-      // if failed when send data to shuffle server, mark task as failed
-      if (failedBlockIds.size() > 0) {
-        String errorMsg =
-            "Send failed: Task["
-                + taskId
-                + "] failed because "
-                + failedBlockIds.size()
-                + " blocks can't be sent to shuffle server: "
-                + 
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers();
-        LOG.error(errorMsg);
-        throw new RssSendFailedException(errorMsg);
-      }
+    boolean interrupted = false;
 
-      // remove blockIds which was sent successfully, if there has none left, 
all data are sent
-      blockIds.removeAll(successBlockIds);
-      if (blockIds.isEmpty()) {
-        break;
+    try {
+      long remainingMs = sendCheckTimeout;
+      long end = System.currentTimeMillis() + remainingMs;
+      long currentAckValue = 0;
+      for (Long blockId : blockIds) {
+        currentAckValue ^= blockId;
+      }
+      while (true) {
+        try {
+          finishEventQueue.clear();
+          checkDataIfAnyFailure();
+          Set<Long> successBlockIds = 
shuffleManager.getSuccessBlockIds(taskId);
+          if (blockIds.size() == successBlockIds.size()) {
+            for (Long successBlockId : successBlockIds) {
+              currentAckValue ^= successBlockId;
+            }
+            if (currentAckValue != 0) {
+              String errorMsg = "Ack value is not equal to 0, it should not 
happen!";
+              throw new RssSendFailedException(errorMsg);
+            }
+            break;
+          }
+          if (finishEventQueue.isEmpty()) {
+            remainingMs = Math.max(end - System.currentTimeMillis(), 0);
+            Object event = finishEventQueue.poll(remainingMs, 
TimeUnit.MILLISECONDS);
+            if (event == null) {
+              break;
+            }
+          }
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
       }
-      LOG.info("Wait " + blockIds.size() + " blocks sent to shuffle server");
-      Uninterruptibles.sleepUninterruptibly(sendCheckInterval, 
TimeUnit.MILLISECONDS);
-      if (System.currentTimeMillis() - start > sendCheckTimeout) {
+      Set<Long> successBlockIds = shuffleManager.getSuccessBlockIds(taskId);
+      if (currentAckValue != 0 || blockIds.size() != successBlockIds.size()) {
+        int failedBlockCount = blockIds.size() - successBlockIds.size();
         String errorMsg =
             "Timeout: Task["
                 + taskId
                 + "] failed because "
-                + blockIds.size()
+                + failedBlockCount
                 + " blocks can't be sent to shuffle server in "
                 + sendCheckTimeout
                 + " ms.";
         LOG.error(errorMsg);
         throw new RssWaitFailedException(errorMsg);
       }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  protected void checkDataIfAnyFailure() {
+    Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
+    if (failedBlockIds.size() > 0) {
+      String errorMsg =
+          "Send failed: Task["
+              + taskId
+              + "] failed because "
+              + failedBlockIds.size()
+              + " blocks can't be sent to shuffle server: "
+              + 
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers();
+      throw new RssSendFailedException(errorMsg);
     }
   }
 
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index fc9bfe53a..4deef511d 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -389,7 +389,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     long checkStartTs = System.currentTimeMillis();
     checkAllBufferSpilled();
     checkSentRecordCount(recordCount);
-    checkBlockSendResult(new HashSet<>(blockIds));
+    checkBlockSendResult(blockIds);
     checkSentBlockCount();
     bufferManager.getShuffleServerPushCostTracker().statistics();
     long commitStartTs = System.currentTimeMillis();
@@ -524,14 +524,23 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     try {
       long remainingMs = sendCheckTimeout;
       long end = System.currentTimeMillis() + remainingMs;
-
+      long currentAckValue = 0;
+      for (Long blockId : blockIds) {
+        currentAckValue ^= blockId;
+      }
       while (true) {
         try {
           finishEventQueue.clear();
           checkDataIfAnyFailure();
           Set<Long> successBlockIds = 
shuffleManager.getSuccessBlockIds(taskId);
-          blockIds.removeAll(successBlockIds);
-          if (blockIds.isEmpty()) {
+          if (blockIds.size() == successBlockIds.size()) {
+            for (Long successBlockId : successBlockIds) {
+              currentAckValue ^= successBlockId;
+            }
+            if (currentAckValue != 0) {
+              String errorMsg = "Ack value is not equal to 0, it should not 
happen!";
+              throw new RssSendFailedException(errorMsg);
+            }
             break;
           }
           if (finishEventQueue.isEmpty()) {
@@ -545,12 +554,14 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
           interrupted = true;
         }
       }
-      if (!blockIds.isEmpty()) {
+      Set<Long> successBlockIds = shuffleManager.getSuccessBlockIds(taskId);
+      if (currentAckValue != 0 || blockIds.size() != successBlockIds.size()) {
+        int failedBlockCount = blockIds.size() - successBlockIds.size();
         String errorMsg =
             "Timeout: Task["
                 + taskId
                 + "] failed because "
-                + blockIds.size()
+                + failedBlockCount
                 + " blocks can't be sent to shuffle server in "
                 + sendCheckTimeout
                 + " ms.";

Reply via email to