This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ca0f0ac2f [#1464][FOLLOWUP] improvement(spark): print abnormal shuffle
servers that blocks fail to send (#1473)
ca0f0ac2f is described below
commit ca0f0ac2f7ea3409afe84a23fe6ba2f3bf88e175
Author: RickyMa <[email protected]>
AuthorDate: Mon Jan 22 11:38:50 2024 +0800
[#1464][FOLLOWUP] improvement(spark): print abnormal shuffle servers that
blocks fail to send (#1473)
### What changes were proposed in this pull request?
The output of failed shuffle servers' result has not been successfully
deduplicated.
### Why are the changes needed?
It's a followup PR for
[#1465](https://github.com/apache/incubator-uniffle/pull/1465).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 5 ++++-
.../main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 5 ++++-
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 8e31d458c..b6348395a 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.writer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -377,7 +378,9 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
+ "] failed because "
+ failedBlockIds.size()
+ " blocks can't be sent to shuffle server: "
- +
failedBlockIdsWithShuffleServer.values().stream().collect(Collectors.toSet());
+ + failedBlockIdsWithShuffleServer.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
LOG.error(errorMsg);
throw new RssSendFailedException(errorMsg);
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 100c7f092..612f4a281 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.writer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -400,7 +401,9 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
+ " failed because "
+ failedBlockIds.size()
+ " blocks can't be sent to shuffle server: "
- +
failedBlockIdsWithShuffleServer.values().stream().collect(Collectors.toSet());
+ + failedBlockIdsWithShuffleServer.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
LOG.error(errorMsg);
throw new RssSendFailedException(errorMsg);
}