This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ca0f0ac2f [#1464][FOLLOWUP] improvement(spark): print abnormal shuffle 
servers that blocks fail to send (#1473)
ca0f0ac2f is described below

commit ca0f0ac2f7ea3409afe84a23fe6ba2f3bf88e175
Author: RickyMa <[email protected]>
AuthorDate: Mon Jan 22 11:38:50 2024 +0800

    [#1464][FOLLOWUP] improvement(spark): print abnormal shuffle servers that 
blocks fail to send (#1473)
    
    ### What changes were proposed in this pull request?
    
    The output of failed shuffle servers' result has not been successfully 
deduplicated.
    
    ### Why are the changes needed?
    
    It's a followup PR for 
[#1465](https://github.com/apache/incubator-uniffle/pull/1465).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 .../main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java  | 5 ++++-
 .../main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java  | 5 ++++-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 8e31d458c..b6348395a 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.writer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -377,7 +378,9 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
                 + "] failed because "
                 + failedBlockIds.size()
                 + " blocks can't be sent to shuffle server: "
-                + 
failedBlockIdsWithShuffleServer.values().stream().collect(Collectors.toSet());
+                + failedBlockIdsWithShuffleServer.values().stream()
+                    .flatMap(Collection::stream)
+                    .collect(Collectors.toSet());
         LOG.error(errorMsg);
         throw new RssSendFailedException(errorMsg);
       }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 100c7f092..612f4a281 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.writer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -400,7 +401,9 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
               + " failed because "
               + failedBlockIds.size()
               + " blocks can't be sent to shuffle server: "
-              + 
failedBlockIdsWithShuffleServer.values().stream().collect(Collectors.toSet());
+              + failedBlockIdsWithShuffleServer.values().stream()
+                  .flatMap(Collection::stream)
+                  .collect(Collectors.toSet());
       LOG.error(errorMsg);
       throw new RssSendFailedException(errorMsg);
     }

Reply via email to