This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 25d0585dd [#1464] improvement(spark): print abnormal shuffle servers
that blocks fail to send (#1465)
25d0585dd is described below
commit 25d0585ddf6154e52ef01a7cbf9eb24809b7e585
Author: RickyMa <[email protected]>
AuthorDate: Thu Jan 18 10:18:14 2024 +0800
[#1464] improvement(spark): print abnormal shuffle servers that blocks fail
to send (#1465)
### What changes were proposed in this pull request?
Improve the error log message for checkBlockSendResult.
As described in
[#1464](https://github.com/apache/incubator-uniffle/issues/1464), it will print
out the shuffle server's IP and ports.
### Why are the changes needed?
For [#1464](https://github.com/apache/incubator-uniffle/issues/1464)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually.
---
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 10 ++++++++++
.../org/apache/spark/shuffle/writer/RssShuffleWriter.java | 8 ++++++--
.../apache/spark/shuffle/writer/RssShuffleWriterTest.java | 2 ++
.../org/apache/spark/shuffle/writer/RssShuffleWriter.java | 8 ++++++--
.../apache/spark/shuffle/writer/RssShuffleWriterTest.java | 12 +++++++++++-
.../java/org/apache/uniffle/common/ShuffleServerInfo.java | 13 ++-----------
.../org/apache/uniffle/common/ShuffleServerInfoTest.java | 12 ++----------
7 files changed, 39 insertions(+), 26 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index fdcc167b5..74c8ba048 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -33,6 +34,7 @@ import scala.collection.Iterator;
import scala.collection.Seq;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.ShuffleDependency;
@@ -658,6 +660,14 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
taskToFailedBlockIds.get(taskId).addAll(blockIds);
}
+ @VisibleForTesting
+ public void addTaskToFailedBlockIdsAndServer(
+ String taskId, Long blockId, ShuffleServerInfo shuffleServerInfo) {
+ taskToFailedBlockIdsAndServer.putIfAbsent(taskId, Maps.newHashMap());
+ taskToFailedBlockIdsAndServer.get(taskId).putIfAbsent(blockId, new
LinkedBlockingDeque<>());
+
taskToFailedBlockIdsAndServer.get(taskId).get(blockId).add(shuffleServerInfo);
+ }
+
@VisibleForTesting
public void addSuccessBlockIds(String taskId, Set<Long> blockIds) {
if (taskToSuccessBlockIds.get(taskId) == null) {
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 4c9a62d1c..8e31d458c 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
import scala.Function1;
import scala.Option;
@@ -364,7 +365,9 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
protected void checkBlockSendResult(Set<Long> blockIds) {
long start = System.currentTimeMillis();
while (true) {
- Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
+ Map<Long, BlockingQueue<ShuffleServerInfo>>
failedBlockIdsWithShuffleServer =
+ shuffleManager.getFailedBlockIdsWithShuffleServer(taskId);
+ Set<Long> failedBlockIds = failedBlockIdsWithShuffleServer.keySet();
Set<Long> successBlockIds = shuffleManager.getSuccessBlockIds(taskId);
// if failed when send data to shuffle server, mark task as failed
if (failedBlockIds.size() > 0) {
@@ -373,7 +376,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
+ taskId
+ "] failed because "
+ failedBlockIds.size()
- + " blocks can't be sent to shuffle server.";
+ + " blocks can't be sent to shuffle server: "
+ +
failedBlockIdsWithShuffleServer.values().stream().collect(Collectors.toSet());
LOG.error(errorMsg);
throw new RssSendFailedException(errorMsg);
}
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 13fb93f7a..8b150f9dd 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -146,6 +146,8 @@ public class RssShuffleWriterTest {
// case 3: partial blocks are sent failed, Runtime exception will be thrown
manager.addSuccessBlockIds(taskId, Sets.newHashSet(1L, 2L));
manager.addFailedBlockIds(taskId, Sets.newHashSet(3L));
+ ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo("127.0.0.1",
20001);
+ manager.addTaskToFailedBlockIdsAndServer(taskId, 3L, shuffleServerInfo);
Throwable e3 =
assertThrows(
RuntimeException.class,
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 2c0977071..100c7f092 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
import scala.Function1;
import scala.Option;
@@ -388,7 +389,9 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void checkIfBlocksFailed() {
- Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
+ Map<Long, BlockingQueue<ShuffleServerInfo>>
failedBlockIdsWithShuffleServer =
+ shuffleManager.getFailedBlockIdsWithShuffleServer(taskId);
+ Set<Long> failedBlockIds = failedBlockIdsWithShuffleServer.keySet();
if (!failedBlockIds.isEmpty()) {
String errorMsg =
"Send failed: Task["
@@ -396,7 +399,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
+ "]"
+ " failed because "
+ failedBlockIds.size()
- + " blocks can't be sent to shuffle server.";
+ + " blocks can't be sent to shuffle server: "
+ +
failedBlockIdsWithShuffleServer.values().stream().collect(Collectors.toSet());
LOG.error(errorMsg);
throw new RssSendFailedException(errorMsg);
}
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 206ceb33f..65c4b4f8d 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -85,10 +86,12 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
"127.0.0.1:12345,127.0.0.1:12346");
Map<String, Set<Long>> failBlocks = JavaUtils.newConcurrentMap();
Map<String, Set<Long>> successBlocks = JavaUtils.newConcurrentMap();
+ Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer =
+ JavaUtils.newConcurrentMap();
Serializer kryoSerializer = new KryoSerializer(conf);
RssShuffleManager manager =
TestUtils.createShuffleManager(
- conf, false, null, successBlocks, failBlocks,
JavaUtils.newConcurrentMap());
+ conf, false, null, successBlocks, failBlocks,
taskToFailedBlockIdsAndServer);
ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
Partitioner mockPartitioner = mock(Partitioner.class);
@@ -149,6 +152,13 @@ public class RssShuffleWriterTest {
// case 3: partial blocks are sent failed, Runtime exception will be thrown
successBlocks.put("taskId", Sets.newHashSet(1L, 2L));
failBlocks.put("taskId", Sets.newHashSet(3L));
+ Map<Long, BlockingQueue<ShuffleServerInfo>> blockIdToShuffleServerInfoMap =
+ JavaUtils.newConcurrentMap();
+ BlockingQueue blockingQueue = new LinkedBlockingQueue<>();
+ ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo("127.0.0.1",
20001);
+ blockingQueue.add(shuffleServerInfo);
+ blockIdToShuffleServerInfoMap.put(3L, blockingQueue);
+ taskToFailedBlockIdsAndServer.put("taskId", blockIdToShuffleServerInfoMap);
Throwable e3 =
assertThrows(
RuntimeException.class,
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
index bfe99eafa..5b9a6fbe2 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
@@ -91,9 +91,7 @@ public class ShuffleServerInfo implements Serializable {
@Override
public String toString() {
if (nettyPort > 0) {
- return "ShuffleServerInfo{id["
- + id
- + "], host["
+ return "ShuffleServerInfo{host["
+ host
+ "],"
+ " grpc port["
@@ -102,14 +100,7 @@ public class ShuffleServerInfo implements Serializable {
+ nettyPort
+ "]}";
} else {
- return "ShuffleServerInfo{id["
- + id
- + "], host["
- + host
- + "],"
- + " grpc port["
- + grpcPort
- + "]}";
+ return "ShuffleServerInfo{host[" + host + "]," + " grpc port[" +
grpcPort + "]}";
}
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java
b/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java
index 850a3bac9..feba4a4f5 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java
@@ -47,19 +47,11 @@ public class ShuffleServerInfoTest {
public void testToString() {
ShuffleServerInfo info = new ShuffleServerInfo("1", "localhost", 1234);
assertEquals(
- "ShuffleServerInfo{id["
- + info.getId()
- + "], host["
- + info.getHost()
- + "], grpc port["
- + info.getGrpcPort()
- + "]}",
+ "ShuffleServerInfo{host[" + info.getHost() + "], grpc port[" +
info.getGrpcPort() + "]}",
info.toString());
ShuffleServerInfo newInfo = new ShuffleServerInfo("1", "localhost", 1234,
5678);
assertEquals(
- "ShuffleServerInfo{id["
- + info.getId()
- + "], host["
+ "ShuffleServerInfo{host["
+ newInfo.getHost()
+ "], grpc port["
+ newInfo.getGrpcPort()