This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new bc46d4a12 [#1608][part-6] improvement(spark): verify the sent blocks 
count (#1690)
bc46d4a12 is described below

commit bc46d4a1210b09f6b077bdf1fa3950bcd0745131
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed May 15 13:39:35 2024 +0800

    [#1608][part-6] improvement(spark): verify the sent blocks count (#1690)
    
    ### What changes were proposed in this pull request?
    
    Verify the sent blocks count in write tasks for spark
    
    ### Why are the changes needed?
    
    For #1608.
    After introducing the reassign menchanism, the blocks' stored location will 
be dynamiclly changed.
    To ensure possible or potenial bugs, it's necessary to introduce the block 
count.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests are enough to ensure safe
---
 .../apache/spark/shuffle/writer/WriteBufferManager.java |  7 +++++++
 .../apache/spark/shuffle/writer/RssShuffleWriter.java   | 17 +++++++++++++++++
 .../apache/spark/shuffle/writer/RssShuffleWriter.java   | 17 +++++++++++++++++
 3 files changed, 41 insertions(+)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index c290f965a..772a55525 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -66,6 +66,8 @@ public class WriteBufferManager extends MemoryConsumer {
   private AtomicLong inSendListBytes = new AtomicLong(0);
   /** An atomic counter used to keep track of the number of records */
   private AtomicLong recordCounter = new AtomicLong(0);
+  /** An atomic counter used to keep track of the number of blocks */
+  private AtomicLong blockCounter = new AtomicLong(0);
   // it's part of blockId
   private Map<Integer, Integer> partitionToSeqNo = Maps.newHashMap();
   private long askExecutorMemory;
@@ -382,6 +384,7 @@ public class WriteBufferManager extends MemoryConsumer {
     final long crc32 = ChecksumUtils.getCrc32(compressed);
     final long blockId =
         blockIdLayout.getBlockId(getNextSeqNo(partitionId), partitionId, 
taskAttemptId);
+    blockCounter.incrementAndGet();
     uncompressedDataLen += data.length;
     shuffleWriteMetrics.incBytesWritten(compressed.length);
     // add memory to indicate bytes which will be sent to shuffle server
@@ -556,6 +559,10 @@ public class WriteBufferManager extends MemoryConsumer {
     return recordCounter.get();
   }
 
+  public long getBlockCount() {
+    return blockCounter.get();
+  }
+
   public void freeAllocatedMemory(long freeMemory) {
     freeMemory(freeMemory);
     allocatedBytes.addAndGet(-freeMemory);
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 2e116c72c..da24ea08e 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -266,8 +266,10 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     final long start = System.currentTimeMillis();
     shuffleBlockInfos = bufferManager.clear(1.0);
     processShuffleBlockInfos(shuffleBlockInfos);
+    @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
     long s = System.currentTimeMillis();
     checkSentRecordCount(recordCount);
+    checkSentBlockCount();
     checkBlockSendResult(blockIds);
     final long checkDuration = System.currentTimeMillis() - s;
     long commitDuration = 0;
@@ -305,6 +307,21 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
   }
 
+  private void checkSentBlockCount() {
+    long tracked = 0;
+    if (serverToPartitionToBlockIds != null) {
+      Set<Long> blockIds = new HashSet<>();
+      for (Map<Integer, Set<Long>> partitionBlockIds : 
serverToPartitionToBlockIds.values()) {
+        partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
+      }
+      tracked = blockIds.size();
+    }
+    if (tracked != bufferManager.getBlockCount()) {
+      throw new RssSendFailedException(
+          "Potential block loss may occur when preparing to send blocks for 
task[" + taskId + "]");
+    }
+  }
+
   /**
    * ShuffleBlock will be added to queue and send to shuffle server 
maintenance the following
    * information: 1. add blockId to set, check if it is send later 2. update 
shuffle server info,
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 3cdcf9aa8..85d325d63 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -311,8 +311,10 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     if (shuffleBlockInfos != null && !shuffleBlockInfos.isEmpty()) {
       processShuffleBlockInfos(shuffleBlockInfos);
     }
+    @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
     long checkStartTs = System.currentTimeMillis();
     checkSentRecordCount(recordCount);
+    checkSentBlockCount();
     checkBlockSendResult(blockIds);
     long commitStartTs = System.currentTimeMillis();
     long checkDuration = commitStartTs - checkStartTs;
@@ -348,6 +350,21 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
   }
 
+  private void checkSentBlockCount() {
+    long tracked = 0;
+    if (serverToPartitionToBlockIds != null) {
+      Set<Long> blockIds = new HashSet<>();
+      for (Map<Integer, Set<Long>> partitionBlockIds : 
serverToPartitionToBlockIds.values()) {
+        partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
+      }
+      tracked = blockIds.size();
+    }
+    if (tracked != bufferManager.getBlockCount()) {
+      throw new RssSendFailedException(
+          "Potential block loss may occur when preparing to send blocks for 
task[" + taskId + "]");
+    }
+  }
+
   // only push-based shuffle use this interface, but rss won't be used when 
push-based shuffle is
   // enabled.
   public long[] getPartitionLengths() {

Reply via email to