This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d182a0396 [#1755] fix(spark): Avoid task failure of inconsistent 
record number (#1756)
d182a0396 is described below

commit d182a039685e4c50d5b500b9ac7aee52dcec623c
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu May 30 14:51:22 2024 +0800

    [#1755] fix(spark): Avoid task failure of inconsistent record number (#1756)
    
    ### What changes were proposed in this pull request?
    
    1. When the spill ratio is `1.0` , the process of calculating target spill 
size will be ignored to avoid potential race condition that the `usedBytes` and 
`inSendBytes` are not thread safe. This could guarantee that the all data is 
flushed to the shuffle server at the end of task.
    2. Adding the `bufferManager's` buffer remaining check
    
    ### Why are the changes needed?
    
    Due to the #1670 , the partial data held by the bufferManager will not be 
flushed to shuffle servers in some corner cases,
    this will make task fail fast rather than silently data loss that should 
thanks the #1558
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
---
 .../spark/shuffle/writer/WriteBufferManager.java    |  5 ++++-
 .../shuffle/writer/WriteBufferManagerTest.java      | 21 +++++++++++++++++++++
 .../spark/shuffle/writer/RssShuffleWriter.java      |  8 ++++++++
 .../spark/shuffle/writer/RssShuffleWriter.java      |  8 ++++++++
 4 files changed, 41 insertions(+), 1 deletion(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 772a55525..95add5048 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -329,14 +329,17 @@ public class WriteBufferManager extends MemoryConsumer {
     List<ShuffleBlockInfo> result = Lists.newArrayList();
     long dataSize = 0;
     long memoryUsed = 0;
+
+    long targetSpillSize = Long.MAX_VALUE;
     bufferSpillRatio = Math.max(0.1, Math.min(1.0, bufferSpillRatio));
     List<Integer> partitionList = new ArrayList(buffers.keySet());
     if (Double.compare(bufferSpillRatio, 1.0) < 0) {
       partitionList.sort(
           Comparator.comparingInt(o -> buffers.get(o) == null ? 0 : 
buffers.get(o).getMemoryUsed())
               .reversed());
+      targetSpillSize = (long) ((getUsedBytes() - getInSendListBytes()) * 
bufferSpillRatio);
     }
-    long targetSpillSize = (long) ((usedBytes.get() - inSendListBytes.get()) * 
bufferSpillRatio);
+
     for (int partitionId : partitionList) {
       WriterBuffer wb = buffers.get(partitionId);
       if (wb == null) {
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
index 4734f442c..49ebeef25 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
@@ -433,6 +433,27 @@ public class WriteBufferManagerTest {
     Awaitility.await().timeout(5, TimeUnit.SECONDS).until(() -> 
spyManager.getUsedBytes() == 0);
   }
 
+  @Test
+  public void testClearWithSpillRatio() {
+    SparkConf conf = getConf();
+    conf.set("spark.rss.client.send.size.limit", 
String.valueOf(Integer.MAX_VALUE));
+    WriteBufferManager wbm = createManager(conf);
+    assertEquals(0, wbm.getUsedBytes());
+
+    String testKey = "Key";
+    String testValue = "Value";
+    wbm.addRecord(0, testKey, testValue);
+    wbm.addRecord(1, testKey, testValue);
+
+    assertEquals(64, wbm.getUsedBytes());
+    assertEquals(0, wbm.getInSendListBytes());
+    // Although the usedBytes-inSendListBytes don't meet requirements of 
buffer spill ratio,
+    // But for the case of 1.0 ratio, it will ignore all.
+    when(wbm.getUsedBytes()).thenReturn(0L);
+    List<ShuffleBlockInfo> blocks = wbm.clear(1.0);
+    assertEquals(2, blocks.size());
+  }
+
   @Test
   public void spillPartial() {
     SparkConf conf = getConf();
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index da24ea08e..e06896a53 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -268,6 +268,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     processShuffleBlockInfos(shuffleBlockInfos);
     @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
     long s = System.currentTimeMillis();
+    checkAllBufferSpilled();
     checkSentRecordCount(recordCount);
     checkSentBlockCount();
     checkBlockSendResult(blockIds);
@@ -297,6 +298,13 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             + bufferManager.getManagerCostInfo());
   }
 
+  private void checkAllBufferSpilled() {
+    if (bufferManager.getBuffers().size() > 0) {
+      throw new RssSendFailedException(
+          "Potential data loss due to existing remaining data buffers that are 
not flushed. This should not happen.");
+    }
+  }
+
   private void checkSentRecordCount(long recordCount) {
     if (recordCount != bufferManager.getRecordCount()) {
       String errorMsg =
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 781c2dbd6..78975b6a8 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -313,6 +313,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
     @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
     long checkStartTs = System.currentTimeMillis();
+    checkAllBufferSpilled();
     checkSentRecordCount(recordCount);
     checkSentBlockCount();
     checkBlockSendResult(blockIds);
@@ -340,6 +341,13 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             + bufferManager.getManagerCostInfo());
   }
 
+  private void checkAllBufferSpilled() {
+    if (bufferManager.getBuffers().size() > 0) {
+      throw new RssSendFailedException(
+          "Potential data loss due to existing remaining data buffers that are 
not flushed. This should not happen.");
+    }
+  }
+
   private void checkSentRecordCount(long recordCount) {
     if (recordCount != bufferManager.getRecordCount()) {
       String errorMsg =

Reply via email to