This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 692808025 [#1608][part-6][FOLLOWUP] improvement(client): Check blockId
num after blocks all sent (#1761)
692808025 is described below
commit 69280802561cb78920a871e2528a6241037cd811
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jun 11 19:25:34 2024 +0800
[#1608][part-6][FOLLOWUP] improvement(client): Check blockId num after
blocks all sent (#1761)
### What changes were proposed in this pull request?
1. Rework the blockId number check in client side after all blocks are sent
### Why are the changes needed?
The subtask of #1608
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
---
.../spark/shuffle/writer/RssShuffleWriter.java | 29 ++++++++++++++--------
.../spark/shuffle/writer/RssShuffleWriter.java | 29 ++++++++++++++--------
2 files changed, 38 insertions(+), 20 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 969658373..81d54ec10 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -270,8 +270,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
long s = System.currentTimeMillis();
checkAllBufferSpilled();
checkSentRecordCount(recordCount);
+ checkBlockSendResult(new HashSet<>(blockIds));
checkSentBlockCount();
- checkBlockSendResult(blockIds);
final long checkDuration = System.currentTimeMillis() - s;
long commitDuration = 0;
if (!isMemoryShuffleEnabled) {
@@ -316,17 +316,26 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void checkSentBlockCount() {
- long tracked = 0;
- if (serverToPartitionToBlockIds != null) {
- Set<Long> blockIds = new HashSet<>();
- for (Map<Integer, Set<Long>> partitionBlockIds :
serverToPartitionToBlockIds.values()) {
- partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
- }
- tracked = blockIds.size();
+ long expected = blockIds.size();
+ long bufferManagerTracked = bufferManager.getBlockCount();
+
+ assert serverToPartitionToBlockIds != null;
+ // to filter the multiple replica's duplicate blockIds
+ Set<Long> blockIds = new HashSet<>();
+ for (Map<Integer, Set<Long>> partitionBlockIds :
serverToPartitionToBlockIds.values()) {
+ partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
}
- if (tracked != bufferManager.getBlockCount()) {
+ long serverTracked = blockIds.size();
+ if (expected != serverTracked || expected != bufferManagerTracked) {
throw new RssSendFailedException(
- "Potential block loss may occur when preparing to send blocks for
task[" + taskId + "]");
+ "Potential block loss may occur for task["
+ + taskId
+ + "]. BlockId number expected: "
+ + expected
+ + ", serverTracked: "
+ + serverTracked
+ + ", bufferManagerTracked: "
+ + bufferManagerTracked);
}
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 9d0e734fa..28a8eb6b5 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -316,8 +316,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
long checkStartTs = System.currentTimeMillis();
checkAllBufferSpilled();
checkSentRecordCount(recordCount);
+ checkBlockSendResult(new HashSet<>(blockIds));
checkSentBlockCount();
- checkBlockSendResult(blockIds);
long commitStartTs = System.currentTimeMillis();
long checkDuration = commitStartTs - checkStartTs;
if (!isMemoryShuffleEnabled) {
@@ -360,17 +360,26 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void checkSentBlockCount() {
- long tracked = 0;
- if (serverToPartitionToBlockIds != null) {
- Set<Long> blockIds = new HashSet<>();
- for (Map<Integer, Set<Long>> partitionBlockIds :
serverToPartitionToBlockIds.values()) {
- partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
- }
- tracked = blockIds.size();
+ long expected = blockIds.size();
+ long bufferManagerTracked = bufferManager.getBlockCount();
+
+ assert serverToPartitionToBlockIds != null;
+ // to filter the multiple replica's duplicate blockIds
+ Set<Long> blockIds = new HashSet<>();
+ for (Map<Integer, Set<Long>> partitionBlockIds :
serverToPartitionToBlockIds.values()) {
+ partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
}
- if (tracked != bufferManager.getBlockCount()) {
+ long serverTracked = blockIds.size();
+ if (expected != serverTracked || expected != bufferManagerTracked) {
throw new RssSendFailedException(
- "Potential block loss may occur when preparing to send blocks for
task[" + taskId + "]");
+ "Potential block loss may occur for task["
+ + taskId
+ + "]. BlockId number expected: "
+ + expected
+ + ", serverTracked: "
+ + serverTracked
+ + ", bufferManagerTracked: "
+ + bufferManagerTracked);
}
}