This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new bc46d4a12 [#1608][part-6] improvement(spark): verify the sent blocks
count (#1690)
bc46d4a12 is described below
commit bc46d4a1210b09f6b077bdf1fa3950bcd0745131
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed May 15 13:39:35 2024 +0800
[#1608][part-6] improvement(spark): verify the sent blocks count (#1690)
### What changes were proposed in this pull request?
Verify the sent blocks count in write tasks for spark
### Why are the changes needed?
For #1608.
After introducing the reassign menchanism, the blocks' stored location will
be dynamiclly changed.
To ensure possible or potenial bugs, it's necessary to introduce the block
count.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests are enough to ensure safe
---
.../apache/spark/shuffle/writer/WriteBufferManager.java | 7 +++++++
.../apache/spark/shuffle/writer/RssShuffleWriter.java | 17 +++++++++++++++++
.../apache/spark/shuffle/writer/RssShuffleWriter.java | 17 +++++++++++++++++
3 files changed, 41 insertions(+)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index c290f965a..772a55525 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -66,6 +66,8 @@ public class WriteBufferManager extends MemoryConsumer {
private AtomicLong inSendListBytes = new AtomicLong(0);
/** An atomic counter used to keep track of the number of records */
private AtomicLong recordCounter = new AtomicLong(0);
+ /** An atomic counter used to keep track of the number of blocks */
+ private AtomicLong blockCounter = new AtomicLong(0);
// it's part of blockId
private Map<Integer, Integer> partitionToSeqNo = Maps.newHashMap();
private long askExecutorMemory;
@@ -382,6 +384,7 @@ public class WriteBufferManager extends MemoryConsumer {
final long crc32 = ChecksumUtils.getCrc32(compressed);
final long blockId =
blockIdLayout.getBlockId(getNextSeqNo(partitionId), partitionId,
taskAttemptId);
+ blockCounter.incrementAndGet();
uncompressedDataLen += data.length;
shuffleWriteMetrics.incBytesWritten(compressed.length);
// add memory to indicate bytes which will be sent to shuffle server
@@ -556,6 +559,10 @@ public class WriteBufferManager extends MemoryConsumer {
return recordCounter.get();
}
+ public long getBlockCount() {
+ return blockCounter.get();
+ }
+
public void freeAllocatedMemory(long freeMemory) {
freeMemory(freeMemory);
allocatedBytes.addAndGet(-freeMemory);
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 2e116c72c..da24ea08e 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -266,8 +266,10 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
final long start = System.currentTimeMillis();
shuffleBlockInfos = bufferManager.clear(1.0);
processShuffleBlockInfos(shuffleBlockInfos);
+ @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
long s = System.currentTimeMillis();
checkSentRecordCount(recordCount);
+ checkSentBlockCount();
checkBlockSendResult(blockIds);
final long checkDuration = System.currentTimeMillis() - s;
long commitDuration = 0;
@@ -305,6 +307,21 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
}
+ private void checkSentBlockCount() {
+ long tracked = 0;
+ if (serverToPartitionToBlockIds != null) {
+ Set<Long> blockIds = new HashSet<>();
+ for (Map<Integer, Set<Long>> partitionBlockIds :
serverToPartitionToBlockIds.values()) {
+ partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
+ }
+ tracked = blockIds.size();
+ }
+ if (tracked != bufferManager.getBlockCount()) {
+ throw new RssSendFailedException(
+ "Potential block loss may occur when preparing to send blocks for
task[" + taskId + "]");
+ }
+ }
+
/**
* ShuffleBlock will be added to queue and send to shuffle server
maintenance the following
* information: 1. add blockId to set, check if it is send later 2. update
shuffle server info,
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 3cdcf9aa8..85d325d63 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -311,8 +311,10 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
if (shuffleBlockInfos != null && !shuffleBlockInfos.isEmpty()) {
processShuffleBlockInfos(shuffleBlockInfos);
}
+ @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
long checkStartTs = System.currentTimeMillis();
checkSentRecordCount(recordCount);
+ checkSentBlockCount();
checkBlockSendResult(blockIds);
long commitStartTs = System.currentTimeMillis();
long checkDuration = commitStartTs - checkStartTs;
@@ -348,6 +350,21 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
}
+ private void checkSentBlockCount() {
+ long tracked = 0;
+ if (serverToPartitionToBlockIds != null) {
+ Set<Long> blockIds = new HashSet<>();
+ for (Map<Integer, Set<Long>> partitionBlockIds :
serverToPartitionToBlockIds.values()) {
+ partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
+ }
+ tracked = blockIds.size();
+ }
+ if (tracked != bufferManager.getBlockCount()) {
+ throw new RssSendFailedException(
+ "Potential block loss may occur when preparing to send blocks for
task[" + taskId + "]");
+ }
+ }
+
// only push-based shuffle use this interface, but rss won't be used when
push-based shuffle is
// enabled.
public long[] getPartitionLengths() {