This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 692808025 [#1608][part-6][FOLLOWUP] improvement(client): Check blockId 
num after blocks all sent (#1761)
692808025 is described below

commit 69280802561cb78920a871e2528a6241037cd811
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jun 11 19:25:34 2024 +0800

    [#1608][part-6][FOLLOWUP] improvement(client): Check blockId num after 
blocks all sent (#1761)
    
    ### What changes were proposed in this pull request?
    
    1. Rework the blockId number check in client side after all blocks are sent
    
    ### Why are the changes needed?
    
    The subtask of #1608
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
---
 .../spark/shuffle/writer/RssShuffleWriter.java     | 29 ++++++++++++++--------
 .../spark/shuffle/writer/RssShuffleWriter.java     | 29 ++++++++++++++--------
 2 files changed, 38 insertions(+), 20 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 969658373..81d54ec10 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -270,8 +270,8 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     long s = System.currentTimeMillis();
     checkAllBufferSpilled();
     checkSentRecordCount(recordCount);
+    checkBlockSendResult(new HashSet<>(blockIds));
     checkSentBlockCount();
-    checkBlockSendResult(blockIds);
     final long checkDuration = System.currentTimeMillis() - s;
     long commitDuration = 0;
     if (!isMemoryShuffleEnabled) {
@@ -316,17 +316,26 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void checkSentBlockCount() {
-    long tracked = 0;
-    if (serverToPartitionToBlockIds != null) {
-      Set<Long> blockIds = new HashSet<>();
-      for (Map<Integer, Set<Long>> partitionBlockIds : 
serverToPartitionToBlockIds.values()) {
-        partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
-      }
-      tracked = blockIds.size();
+    long expected = blockIds.size();
+    long bufferManagerTracked = bufferManager.getBlockCount();
+
+    assert serverToPartitionToBlockIds != null;
+    // to filter the multiple replica's duplicate blockIds
+    Set<Long> blockIds = new HashSet<>();
+    for (Map<Integer, Set<Long>> partitionBlockIds : 
serverToPartitionToBlockIds.values()) {
+      partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
     }
-    if (tracked != bufferManager.getBlockCount()) {
+    long serverTracked = blockIds.size();
+    if (expected != serverTracked || expected != bufferManagerTracked) {
       throw new RssSendFailedException(
-          "Potential block loss may occur when preparing to send blocks for 
task[" + taskId + "]");
+          "Potential block loss may occur for task["
+              + taskId
+              + "]. BlockId number expected: "
+              + expected
+              + ", serverTracked: "
+              + serverTracked
+              + ", bufferManagerTracked: "
+              + bufferManagerTracked);
     }
   }
 
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 9d0e734fa..28a8eb6b5 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -316,8 +316,8 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     long checkStartTs = System.currentTimeMillis();
     checkAllBufferSpilled();
     checkSentRecordCount(recordCount);
+    checkBlockSendResult(new HashSet<>(blockIds));
     checkSentBlockCount();
-    checkBlockSendResult(blockIds);
     long commitStartTs = System.currentTimeMillis();
     long checkDuration = commitStartTs - checkStartTs;
     if (!isMemoryShuffleEnabled) {
@@ -360,17 +360,26 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void checkSentBlockCount() {
-    long tracked = 0;
-    if (serverToPartitionToBlockIds != null) {
-      Set<Long> blockIds = new HashSet<>();
-      for (Map<Integer, Set<Long>> partitionBlockIds : 
serverToPartitionToBlockIds.values()) {
-        partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
-      }
-      tracked = blockIds.size();
+    long expected = blockIds.size();
+    long bufferManagerTracked = bufferManager.getBlockCount();
+
+    assert serverToPartitionToBlockIds != null;
+    // to filter the multiple replica's duplicate blockIds
+    Set<Long> blockIds = new HashSet<>();
+    for (Map<Integer, Set<Long>> partitionBlockIds : 
serverToPartitionToBlockIds.values()) {
+      partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
     }
-    if (tracked != bufferManager.getBlockCount()) {
+    long serverTracked = blockIds.size();
+    if (expected != serverTracked || expected != bufferManagerTracked) {
       throw new RssSendFailedException(
-          "Potential block loss may occur when preparing to send blocks for 
task[" + taskId + "]");
+          "Potential block loss may occur for task["
+              + taskId
+              + "]. BlockId number expected: "
+              + expected
+              + ", serverTracked: "
+              + serverTracked
+              + ", bufferManagerTracked: "
+              + bufferManagerTracked);
     }
   }
 

Reply via email to