This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 25d0585dd [#1464] improvement(spark): print abnormal shuffle servers 
that blocks fail to send (#1465)
25d0585dd is described below

commit 25d0585ddf6154e52ef01a7cbf9eb24809b7e585
Author: RickyMa <[email protected]>
AuthorDate: Thu Jan 18 10:18:14 2024 +0800

    [#1464] improvement(spark): print abnormal shuffle servers that blocks fail 
to send (#1465)
    
    ### What changes were proposed in this pull request?
    
    Improve the error log message for checkBlockSendResult.
    As described in 
[#1464](https://github.com/apache/incubator-uniffle/issues/1464), it will print 
out the shuffle server's IP and ports.
    
    ### Why are the changes needed?
    
    For [#1464](https://github.com/apache/incubator-uniffle/issues/1464)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually.
---
 .../java/org/apache/spark/shuffle/RssShuffleManager.java    | 10 ++++++++++
 .../org/apache/spark/shuffle/writer/RssShuffleWriter.java   |  8 ++++++--
 .../apache/spark/shuffle/writer/RssShuffleWriterTest.java   |  2 ++
 .../org/apache/spark/shuffle/writer/RssShuffleWriter.java   |  8 ++++++--
 .../apache/spark/shuffle/writer/RssShuffleWriterTest.java   | 12 +++++++++++-
 .../java/org/apache/uniffle/common/ShuffleServerInfo.java   | 13 ++-----------
 .../org/apache/uniffle/common/ShuffleServerInfoTest.java    | 12 ++----------
 7 files changed, 39 insertions(+), 26 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index fdcc167b5..74c8ba048 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -33,6 +34,7 @@ import scala.collection.Iterator;
 import scala.collection.Seq;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.ShuffleDependency;
@@ -658,6 +660,14 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     taskToFailedBlockIds.get(taskId).addAll(blockIds);
   }
 
+  @VisibleForTesting
+  public void addTaskToFailedBlockIdsAndServer(
+      String taskId, Long blockId, ShuffleServerInfo shuffleServerInfo) {
+    taskToFailedBlockIdsAndServer.putIfAbsent(taskId, Maps.newHashMap());
+    taskToFailedBlockIdsAndServer.get(taskId).putIfAbsent(blockId, new 
LinkedBlockingDeque<>());
+    
taskToFailedBlockIdsAndServer.get(taskId).get(blockId).add(shuffleServerInfo);
+  }
+
   @VisibleForTesting
   public void addSuccessBlockIds(String taskId, Set<Long> blockIds) {
     if (taskToSuccessBlockIds.get(taskId) == null) {
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 4c9a62d1c..8e31d458c 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import scala.Function1;
 import scala.Option;
@@ -364,7 +365,9 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   protected void checkBlockSendResult(Set<Long> blockIds) {
     long start = System.currentTimeMillis();
     while (true) {
-      Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
+      Map<Long, BlockingQueue<ShuffleServerInfo>> 
failedBlockIdsWithShuffleServer =
+          shuffleManager.getFailedBlockIdsWithShuffleServer(taskId);
+      Set<Long> failedBlockIds = failedBlockIdsWithShuffleServer.keySet();
       Set<Long> successBlockIds = shuffleManager.getSuccessBlockIds(taskId);
       // if failed when send data to shuffle server, mark task as failed
       if (failedBlockIds.size() > 0) {
@@ -373,7 +376,8 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
                 + taskId
                 + "] failed because "
                 + failedBlockIds.size()
-                + " blocks can't be sent to shuffle server.";
+                + " blocks can't be sent to shuffle server: "
+                + 
failedBlockIdsWithShuffleServer.values().stream().collect(Collectors.toSet());
         LOG.error(errorMsg);
         throw new RssSendFailedException(errorMsg);
       }
diff --git 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 13fb93f7a..8b150f9dd 100644
--- 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++ 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -146,6 +146,8 @@ public class RssShuffleWriterTest {
     // case 3: partial blocks are sent failed, Runtime exception will be thrown
     manager.addSuccessBlockIds(taskId, Sets.newHashSet(1L, 2L));
     manager.addFailedBlockIds(taskId, Sets.newHashSet(3L));
+    ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo("127.0.0.1", 
20001);
+    manager.addTaskToFailedBlockIdsAndServer(taskId, 3L, shuffleServerInfo);
     Throwable e3 =
         assertThrows(
             RuntimeException.class,
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 2c0977071..100c7f092 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import scala.Function1;
 import scala.Option;
@@ -388,7 +389,9 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void checkIfBlocksFailed() {
-    Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
+    Map<Long, BlockingQueue<ShuffleServerInfo>> 
failedBlockIdsWithShuffleServer =
+        shuffleManager.getFailedBlockIdsWithShuffleServer(taskId);
+    Set<Long> failedBlockIds = failedBlockIdsWithShuffleServer.keySet();
     if (!failedBlockIds.isEmpty()) {
       String errorMsg =
           "Send failed: Task["
@@ -396,7 +399,8 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
               + "]"
               + " failed because "
               + failedBlockIds.size()
-              + " blocks can't be sent to shuffle server.";
+              + " blocks can't be sent to shuffle server: "
+              + 
failedBlockIdsWithShuffleServer.values().stream().collect(Collectors.toSet());
       LOG.error(errorMsg);
       throw new RssSendFailedException(errorMsg);
     }
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 206ceb33f..65c4b4f8d 100644
--- 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -85,10 +86,12 @@ public class RssShuffleWriterTest {
         .set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), 
"127.0.0.1:12345,127.0.0.1:12346");
     Map<String, Set<Long>> failBlocks = JavaUtils.newConcurrentMap();
     Map<String, Set<Long>> successBlocks = JavaUtils.newConcurrentMap();
+    Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>> 
taskToFailedBlockIdsAndServer =
+        JavaUtils.newConcurrentMap();
     Serializer kryoSerializer = new KryoSerializer(conf);
     RssShuffleManager manager =
         TestUtils.createShuffleManager(
-            conf, false, null, successBlocks, failBlocks, 
JavaUtils.newConcurrentMap());
+            conf, false, null, successBlocks, failBlocks, 
taskToFailedBlockIdsAndServer);
 
     ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
     Partitioner mockPartitioner = mock(Partitioner.class);
@@ -149,6 +152,13 @@ public class RssShuffleWriterTest {
     // case 3: partial blocks are sent failed, Runtime exception will be thrown
     successBlocks.put("taskId", Sets.newHashSet(1L, 2L));
     failBlocks.put("taskId", Sets.newHashSet(3L));
+    Map<Long, BlockingQueue<ShuffleServerInfo>> blockIdToShuffleServerInfoMap =
+        JavaUtils.newConcurrentMap();
+    BlockingQueue blockingQueue = new LinkedBlockingQueue<>();
+    ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo("127.0.0.1", 
20001);
+    blockingQueue.add(shuffleServerInfo);
+    blockIdToShuffleServerInfoMap.put(3L, blockingQueue);
+    taskToFailedBlockIdsAndServer.put("taskId", blockIdToShuffleServerInfoMap);
     Throwable e3 =
         assertThrows(
             RuntimeException.class,
diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java 
b/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
index bfe99eafa..5b9a6fbe2 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
@@ -91,9 +91,7 @@ public class ShuffleServerInfo implements Serializable {
   @Override
   public String toString() {
     if (nettyPort > 0) {
-      return "ShuffleServerInfo{id["
-          + id
-          + "], host["
+      return "ShuffleServerInfo{host["
           + host
           + "],"
           + " grpc port["
@@ -102,14 +100,7 @@ public class ShuffleServerInfo implements Serializable {
           + nettyPort
           + "]}";
     } else {
-      return "ShuffleServerInfo{id["
-          + id
-          + "], host["
-          + host
-          + "],"
-          + " grpc port["
-          + grpcPort
-          + "]}";
+      return "ShuffleServerInfo{host[" + host + "]," + " grpc port[" + 
grpcPort + "]}";
     }
   }
 
diff --git 
a/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java 
b/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java
index 850a3bac9..feba4a4f5 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java
@@ -47,19 +47,11 @@ public class ShuffleServerInfoTest {
   public void testToString() {
     ShuffleServerInfo info = new ShuffleServerInfo("1", "localhost", 1234);
     assertEquals(
-        "ShuffleServerInfo{id["
-            + info.getId()
-            + "], host["
-            + info.getHost()
-            + "], grpc port["
-            + info.getGrpcPort()
-            + "]}",
+        "ShuffleServerInfo{host[" + info.getHost() + "], grpc port[" + 
info.getGrpcPort() + "]}",
         info.toString());
     ShuffleServerInfo newInfo = new ShuffleServerInfo("1", "localhost", 1234, 
5678);
     assertEquals(
-        "ShuffleServerInfo{id["
-            + info.getId()
-            + "], host["
+        "ShuffleServerInfo{host["
             + newInfo.getHost()
             + "], grpc port["
             + newInfo.getGrpcPort()

Reply via email to