This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d182a0396 [#1755] fix(spark): Avoid task failure of inconsistent
record number (#1756)
d182a0396 is described below
commit d182a039685e4c50d5b500b9ac7aee52dcec623c
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu May 30 14:51:22 2024 +0800
[#1755] fix(spark): Avoid task failure of inconsistent record number (#1756)
### What changes were proposed in this pull request?
1. When the spill ratio is `1.0` , the process of calculating target spill
size will be ignored to avoid potential race condition that the `usedBytes` and
`inSendBytes` are not thread safe. This could guarantee that the all data is
flushed to the shuffle server at the end of task.
2. Adding the `bufferManager's` buffer remaining check
### Why are the changes needed?
Due to the #1670 , the partial data held by the bufferManager will not be
flushed to shuffle servers in some corner cases,
this will make task fail fast rather than silently data loss that should
thanks the #1558
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
---
.../spark/shuffle/writer/WriteBufferManager.java | 5 ++++-
.../shuffle/writer/WriteBufferManagerTest.java | 21 +++++++++++++++++++++
.../spark/shuffle/writer/RssShuffleWriter.java | 8 ++++++++
.../spark/shuffle/writer/RssShuffleWriter.java | 8 ++++++++
4 files changed, 41 insertions(+), 1 deletion(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 772a55525..95add5048 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -329,14 +329,17 @@ public class WriteBufferManager extends MemoryConsumer {
List<ShuffleBlockInfo> result = Lists.newArrayList();
long dataSize = 0;
long memoryUsed = 0;
+
+ long targetSpillSize = Long.MAX_VALUE;
bufferSpillRatio = Math.max(0.1, Math.min(1.0, bufferSpillRatio));
List<Integer> partitionList = new ArrayList(buffers.keySet());
if (Double.compare(bufferSpillRatio, 1.0) < 0) {
partitionList.sort(
Comparator.comparingInt(o -> buffers.get(o) == null ? 0 :
buffers.get(o).getMemoryUsed())
.reversed());
+ targetSpillSize = (long) ((getUsedBytes() - getInSendListBytes()) *
bufferSpillRatio);
}
- long targetSpillSize = (long) ((usedBytes.get() - inSendListBytes.get()) *
bufferSpillRatio);
+
for (int partitionId : partitionList) {
WriterBuffer wb = buffers.get(partitionId);
if (wb == null) {
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
index 4734f442c..49ebeef25 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
@@ -433,6 +433,27 @@ public class WriteBufferManagerTest {
Awaitility.await().timeout(5, TimeUnit.SECONDS).until(() ->
spyManager.getUsedBytes() == 0);
}
+ @Test
+ public void testClearWithSpillRatio() {
+ SparkConf conf = getConf();
+ conf.set("spark.rss.client.send.size.limit",
String.valueOf(Integer.MAX_VALUE));
+ WriteBufferManager wbm = createManager(conf);
+ assertEquals(0, wbm.getUsedBytes());
+
+ String testKey = "Key";
+ String testValue = "Value";
+ wbm.addRecord(0, testKey, testValue);
+ wbm.addRecord(1, testKey, testValue);
+
+ assertEquals(64, wbm.getUsedBytes());
+ assertEquals(0, wbm.getInSendListBytes());
+ // Although the usedBytes-inSendListBytes don't meet requirements of
buffer spill ratio,
+ // But for the case of 1.0 ratio, it will ignore all.
+ when(wbm.getUsedBytes()).thenReturn(0L);
+ List<ShuffleBlockInfo> blocks = wbm.clear(1.0);
+ assertEquals(2, blocks.size());
+ }
+
@Test
public void spillPartial() {
SparkConf conf = getConf();
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index da24ea08e..e06896a53 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -268,6 +268,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
processShuffleBlockInfos(shuffleBlockInfos);
@SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
long s = System.currentTimeMillis();
+ checkAllBufferSpilled();
checkSentRecordCount(recordCount);
checkSentBlockCount();
checkBlockSendResult(blockIds);
@@ -297,6 +298,13 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
+ bufferManager.getManagerCostInfo());
}
+ private void checkAllBufferSpilled() {
+ if (bufferManager.getBuffers().size() > 0) {
+ throw new RssSendFailedException(
+ "Potential data loss due to existing remaining data buffers that are
not flushed. This should not happen.");
+ }
+ }
+
private void checkSentRecordCount(long recordCount) {
if (recordCount != bufferManager.getRecordCount()) {
String errorMsg =
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 781c2dbd6..78975b6a8 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -313,6 +313,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
@SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
long checkStartTs = System.currentTimeMillis();
+ checkAllBufferSpilled();
checkSentRecordCount(recordCount);
checkSentBlockCount();
checkBlockSendResult(blockIds);
@@ -340,6 +341,13 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
+ bufferManager.getManagerCostInfo());
}
+ private void checkAllBufferSpilled() {
+ if (bufferManager.getBuffers().size() > 0) {
+ throw new RssSendFailedException(
+ "Potential data loss due to existing remaining data buffers that are
not flushed. This should not happen.");
+ }
+ }
+
private void checkSentRecordCount(long recordCount) {
if (recordCount != bufferManager.getRecordCount()) {
String errorMsg =