This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 748acbfc [#825][part-2] feat(spark): Report failed blocks and a list
of ShuffleServer. (#1138)
748acbfc is described below
commit 748acbfc272ea6089c5081cbf6644afdc396ba88
Author: yl09099 <[email protected]>
AuthorDate: Fri Oct 20 23:43:20 2023 +0800
[#825][part-2] feat(spark): Report failed blocks and a list of
ShuffleServer. (#1138)
### What changes were proposed in this pull request?
The ShuffleServer corresponding to the block that failed to be sent needs
to be reported.
Ⅰ. Overall objective:
1. During the shuffle write phase, the ShuffleServer reports faulty nodes
and reallocates the ShuffleServer list;
2. Triggers a Stage level retry of SPARK. The shuffleServer node is
excluded and reallocated before the retry.
Ⅱ. Implementation logic diagram:

Ⅲ. As shown in the picture above:
1. During Shuffle registration, obtain the ShuffleServer list to be written
through the RPC interface of a Coordinator Client by following the solid blue
line step. The list is bound using ShuffleID.
2, the Task of Stage starts, solid steps, in accordance with the green by
ShuffleManager Client RPC interface gets to be written for
shuffleIdToShuffleHandleInfo ShuffleServer list;
3. In the Stage, if Task fails to write blocks to the ShuffleServer, press
the steps in red to report ShuffleServer to FailedShuffleServerList in
RSSShuffleManager through the RPC interface.
4. FailedShuffleServerList records the number of ShuffleServer failures.
After the number of failures reaches the maximum number of retries of the Task
level, follow the steps in dotted orange lines. Through the RPC interface of a
Coordinator Client, obtain the list of ShuffleServer files to be written (the
ShuffleServer files that fail to be written are excluded). After obtaining the
list, go to Step 5 of the dotted orange line. Throwing a FetchFailed Exception
triggers a stage-level [...]
5. Attempt 1 is generated by the SPARK Stage level again. Pull the
corresponding ShuffleServer list according to the green dotted line.
### Why are the changes needed?
Reports the ShuffleServer corresponding to the block that failed to be sent
Fix: #825
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT
---
.../apache/spark/shuffle/writer/DataPusher.java | 19 ++++++
.../spark/shuffle/writer/DataPusherTest.java | 12 +++-
.../apache/spark/shuffle/RssShuffleManager.java | 18 +++++
.../spark/shuffle/writer/RssShuffleWriterTest.java | 4 +-
.../apache/spark/shuffle/RssShuffleManager.java | 22 ++++++-
.../java/org/apache/spark/shuffle/TestUtils.java | 9 ++-
.../spark/shuffle/writer/RssShuffleWriterTest.java | 30 +++++++--
.../client/impl/ShuffleWriteClientImpl.java | 76 ++++++++++++----------
.../client/response/SendShuffleDataResult.java | 20 ++++++
9 files changed, 166 insertions(+), 44 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
index 2e1ab418..b578ac01 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
@@ -37,7 +37,9 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
/**
@@ -54,6 +56,7 @@ public class DataPusher implements Closeable {
private final Map<String, Set<Long>> taskToSuccessBlockIds;
// Must be thread safe
private final Map<String, Set<Long>> taskToFailedBlockIds;
+ private final Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer;
private String rssAppId;
// Must be thread safe
private final Set<String> failedTaskIds;
@@ -62,12 +65,14 @@ public class DataPusher implements Closeable {
ShuffleWriteClient shuffleWriteClient,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, Set<Long>> taskToFailedBlockIds,
+ Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime) {
this.shuffleWriteClient = shuffleWriteClient;
this.taskToSuccessBlockIds = taskToSuccessBlockIds;
this.taskToFailedBlockIds = taskToFailedBlockIds;
+ this.taskToFailedBlockIdsAndServer = taskToFailedBlockIdsAndServer;
this.failedTaskIds = failedTaskIds;
this.executorService =
new ThreadPoolExecutor(
@@ -93,6 +98,8 @@ public class DataPusher implements Closeable {
rssAppId, shuffleBlockInfoList, () ->
!isValidTask(taskId));
putBlockId(taskToSuccessBlockIds, taskId,
result.getSuccessBlockIds());
putBlockId(taskToFailedBlockIds, taskId,
result.getFailedBlockIds());
+ putSendFailedBlockIdAndShuffleServer(
+ taskToFailedBlockIdsAndServer, taskId,
result.getSendFailedBlockIds());
} finally {
List<Runnable> callbackChain =
Optional.of(event.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST);
@@ -118,6 +125,18 @@ public class DataPusher implements Closeable {
.addAll(blockIds);
}
+ private synchronized void putSendFailedBlockIdAndShuffleServer(
+ Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
+ String taskAttemptId,
+ Map<Long, List<ShuffleServerInfo>> blockIdsAndServer) {
+ if (blockIdsAndServer == null || blockIdsAndServer.isEmpty()) {
+ return;
+ }
+ taskToFailedBlockIdsAndServer
+ .computeIfAbsent(taskAttemptId, x -> JavaUtils.newConcurrentMap())
+ .putAll(blockIdsAndServer);
+ }
+
public boolean isValidTask(String taskId) {
return !failedTaskIds.contains(taskId);
}
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
index 979f9282..21a8499d 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
@@ -34,6 +34,8 @@ import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.JavaUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -79,11 +81,19 @@ public class DataPusherTest {
Map<String, Set<Long>> taskToSuccessBlockIds = Maps.newConcurrentMap();
Map<String, Set<Long>> taskToFailedBlockIds = Maps.newConcurrentMap();
+ Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer =
+ JavaUtils.newConcurrentMap();
Set<String> failedTaskIds = new HashSet<>();
DataPusher dataPusher =
new DataPusher(
- shuffleWriteClient, taskToSuccessBlockIds, taskToFailedBlockIds,
failedTaskIds, 1, 2);
+ shuffleWriteClient,
+ taskToSuccessBlockIds,
+ taskToFailedBlockIds,
+ taskToFailedBlockIdsAndServer,
+ failedTaskIds,
+ 1,
+ 2);
dataPusher.setRssAppId("testSendData_appId");
// sync send
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 3943b5c9..bf32826f 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -86,6 +86,9 @@ public class RssShuffleManager extends RssShuffleManagerBase {
private ShuffleWriteClient shuffleWriteClient;
private Map<String, Set<Long>> taskToSuccessBlockIds =
JavaUtils.newConcurrentMap();
private Map<String, Set<Long>> taskToFailedBlockIds =
JavaUtils.newConcurrentMap();
+ // Record both the block that failed to be sent and the ShuffleServer
+ private final Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer =
+ JavaUtils.newConcurrentMap();
private final int dataReplica;
private final int dataReplicaWrite;
private final int dataReplicaRead;
@@ -212,6 +215,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
shuffleWriteClient,
taskToSuccessBlockIds,
taskToFailedBlockIds,
+ taskToFailedBlockIdsAndServer,
failedTaskIds,
poolSize,
keepAliveTime);
@@ -692,4 +696,18 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
e, sparkConf, appId, shuffleId, stageAttemptId,
Sets.newHashSet(partitionId));
}
}
+
+ /**
+ * The ShuffleServer list of block sending failures is returned using the
shuffle task ID
+ *
+ * @param taskId Shuffle taskId
+ * @return List of failed ShuffleServer blocks
+ */
+ public Map<Long, List<ShuffleServerInfo>>
getFailedBlockIdsWithShuffleServer(String taskId) {
+ Map<Long, List<ShuffleServerInfo>> result =
taskToFailedBlockIdsAndServer.get(taskId);
+ if (result == null) {
+ result = JavaUtils.newConcurrentMap();
+ }
+ return result;
+ }
}
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 2ccfa32d..86115c69 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -155,13 +155,14 @@ public class RssShuffleWriterTest {
private final Function<AddBlockEvent, CompletableFuture<Long>> sendFunc;
FakedDataPusher(Function<AddBlockEvent, CompletableFuture<Long>> sendFunc)
{
- this(null, null, null, null, 1, 1, sendFunc);
+ this(null, null, null, null, null, 1, 1, sendFunc);
}
private FakedDataPusher(
ShuffleWriteClient shuffleWriteClient,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, Set<Long>> taskToFailedBlockIds,
+ Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime,
@@ -170,6 +171,7 @@ public class RssShuffleWriterTest {
shuffleWriteClient,
taskToSuccessBlockIds,
taskToFailedBlockIds,
+ taskToFailedBlockIdsAndServer,
failedTaskIds,
threadPoolSize,
threadKeepAliveTime);
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 2cb7d7e8..36225fec 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -97,6 +97,8 @@ public class RssShuffleManager extends RssShuffleManagerBase {
private final int dataCommitPoolSize;
private final Map<String, Set<Long>> taskToSuccessBlockIds;
private final Map<String, Set<Long>> taskToFailedBlockIds;
+ // Record both the block that failed to be sent and the ShuffleServer
+ private final Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer;
private ScheduledExecutorService heartBeatScheduledExecutorService;
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
@@ -204,6 +206,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
LOG.info("Disable shuffle data locality in RssShuffleManager.");
taskToSuccessBlockIds = JavaUtils.newConcurrentMap();
taskToFailedBlockIds = JavaUtils.newConcurrentMap();
+ this.taskToFailedBlockIdsAndServer = JavaUtils.newConcurrentMap();
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
@@ -235,6 +238,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
shuffleWriteClient,
taskToSuccessBlockIds,
taskToFailedBlockIds,
+ taskToFailedBlockIdsAndServer,
failedTaskIds,
poolSize,
keepAliveTime);
@@ -265,7 +269,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
boolean isDriver,
DataPusher dataPusher,
Map<String, Set<Long>> taskToSuccessBlockIds,
- Map<String, Set<Long>> taskToFailedBlockIds) {
+ Map<String, Set<Long>> taskToFailedBlockIds,
+ Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer) {
this.sparkConf = conf;
this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
this.dataDistributionType =
@@ -319,6 +324,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
.rssConf(RssSparkConfig.toRssConf(sparkConf)));
this.taskToSuccessBlockIds = taskToSuccessBlockIds;
this.taskToFailedBlockIds = taskToFailedBlockIds;
+ this.taskToFailedBlockIdsAndServer = taskToFailedBlockIdsAndServer;
this.heartBeatScheduledExecutorService = null;
this.dataPusher = dataPusher;
}
@@ -988,4 +994,18 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
e, sparkConf, appId, shuffleId, stageAttemptId, failedPartitions);
}
}
+
+ /**
+ * The ShuffleServer list of block sending failures is returned using the
shuffle task ID
+ *
+ * @param taskId Shuffle taskId
+ * @return List of failed ShuffleServer blocks
+ */
+ public Map<Long, List<ShuffleServerInfo>>
getFailedBlockIdsWithShuffleServer(String taskId) {
+ Map<Long, List<ShuffleServerInfo>> result =
taskToFailedBlockIdsAndServer.get(taskId);
+ if (result == null) {
+ result = JavaUtils.newConcurrentMap();
+ }
+ return result;
+ }
}
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
index 6ba649c7..d0424f4f 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -24,6 +25,8 @@ import org.apache.commons.lang3.SystemUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.writer.DataPusher;
+import org.apache.uniffle.common.ShuffleServerInfo;
+
public class TestUtils {
private TestUtils() {}
@@ -33,8 +36,10 @@ public class TestUtils {
Boolean isDriver,
DataPusher dataPusher,
Map<String, Set<Long>> successBlockIds,
- Map<String, Set<Long>> failBlockIds) {
- return new RssShuffleManager(conf, isDriver, dataPusher, successBlockIds,
failBlockIds);
+ Map<String, Set<Long>> failBlockIds,
+ Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer) {
+ return new RssShuffleManager(
+ conf, isDriver, dataPusher, successBlockIds, failBlockIds,
taskToFailedBlockIdsAndServer);
}
public static boolean isMacOnAppleSilicon() {
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 1be8a2a6..09eeff7c 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -82,9 +82,12 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
"127.0.0.1:12345,127.0.0.1:12346");
Map<String, Set<Long>> failBlocks = JavaUtils.newConcurrentMap();
Map<String, Set<Long>> successBlocks = JavaUtils.newConcurrentMap();
+ Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer =
+ JavaUtils.newConcurrentMap();
Serializer kryoSerializer = new KryoSerializer(conf);
RssShuffleManager manager =
- TestUtils.createShuffleManager(conf, false, null, successBlocks,
failBlocks);
+ TestUtils.createShuffleManager(
+ conf, false, null, successBlocks, failBlocks,
taskToFailedBlockIdsAndServer);
ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
Partitioner mockPartitioner = mock(Partitioner.class);
@@ -154,13 +157,14 @@ public class RssShuffleWriterTest {
private final Function<AddBlockEvent, CompletableFuture<Long>> sendFunc;
FakedDataPusher(Function<AddBlockEvent, CompletableFuture<Long>> sendFunc)
{
- this(null, null, null, null, 1, 1, sendFunc);
+ this(null, null, null, null, null, 1, 1, sendFunc);
}
private FakedDataPusher(
ShuffleWriteClient shuffleWriteClient,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, Set<Long>> taskToFailedBlockIds,
+ Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime,
@@ -169,6 +173,7 @@ public class RssShuffleWriterTest {
shuffleWriteClient,
taskToSuccessBlockIds,
taskToFailedBlockIds,
+ taskToFailedBlockIdsAndServer,
failedTaskIds,
threadPoolSize,
threadKeepAliveTime);
@@ -217,7 +222,12 @@ public class RssShuffleWriterTest {
final RssShuffleManager manager =
TestUtils.createShuffleManager(
- conf, false, dataPusher, successBlockIds,
JavaUtils.newConcurrentMap());
+ conf,
+ false,
+ dataPusher,
+ successBlockIds,
+ JavaUtils.newConcurrentMap(),
+ JavaUtils.newConcurrentMap());
WriteBufferManagerTest.FakedTaskMemoryManager fakedTaskMemoryManager =
new WriteBufferManagerTest.FakedTaskMemoryManager();
@@ -309,7 +319,12 @@ public class RssShuffleWriterTest {
final RssShuffleManager manager =
TestUtils.createShuffleManager(
- conf, false, dataPusher, successBlockIds,
JavaUtils.newConcurrentMap());
+ conf,
+ false,
+ dataPusher,
+ successBlockIds,
+ JavaUtils.newConcurrentMap(),
+ JavaUtils.newConcurrentMap());
Serializer kryoSerializer = new KryoSerializer(conf);
Partitioner mockPartitioner = mock(Partitioner.class);
final ShuffleWriteClient mockShuffleWriteClient =
mock(ShuffleWriteClient.class);
@@ -461,7 +476,12 @@ public class RssShuffleWriterTest {
RssShuffleManager mockShuffleManager =
spy(
TestUtils.createShuffleManager(
- sparkConf, false, dataPusher, Maps.newConcurrentMap(),
Maps.newConcurrentMap()));
+ sparkConf,
+ false,
+ dataPusher,
+ Maps.newConcurrentMap(),
+ Maps.newConcurrentMap(),
+ JavaUtils.newConcurrentMap()));
RssShuffleHandle<String, String, String> mockHandle =
mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 5c9932a5..f777f6ff 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -151,7 +151,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
String appId,
Map<ShuffleServerInfo, Map<Integer, Map<Integer,
List<ShuffleBlockInfo>>>> serverToBlocks,
Map<ShuffleServerInfo, List<Long>> serverToBlockIds,
- Map<Long, AtomicInteger> blockIdsTracker,
+ Map<Long, List<ShuffleServerInfo>> blockIdsSendSuccessTracker,
+ Map<Long, List<ShuffleServerInfo>> blockIdsSendFailTracker,
boolean allowFastFail,
Supplier<Boolean> needCancelRequest) {
@@ -193,7 +194,11 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
// mark a replica of block that has been sent
serverToBlockIds
.get(ssi)
- .forEach(block ->
blockIdsTracker.get(block).incrementAndGet());
+ .forEach(
+ blockId ->
+ blockIdsSendSuccessTracker
+ .computeIfAbsent(blockId, id ->
Lists.newArrayList())
+ .add(ssi));
if (defectiveServers != null) {
defectiveServers.remove(ssi);
}
@@ -201,6 +206,13 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
LOG.debug("{} successfully.", logMsg);
}
} else {
+ serverToBlockIds
+ .get(ssi)
+ .forEach(
+ blockId ->
+ blockIdsSendFailTracker
+ .computeIfAbsent(blockId, id ->
Lists.newArrayList())
+ .add(ssi));
if (defectiveServers != null) {
defectiveServers.add(ssi);
}
@@ -208,6 +220,13 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
return false;
}
} catch (Exception e) {
+ serverToBlockIds
+ .get(ssi)
+ .forEach(
+ blockId ->
+ blockIdsSendFailTracker
+ .computeIfAbsent(blockId, id ->
Lists.newArrayList())
+ .add(ssi));
if (defectiveServers != null) {
defectiveServers.add(ssi);
}
@@ -335,27 +354,9 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
false);
}
}
-
- // maintain the count of blocks that have been sent to the server
- // unnecessary to use concurrent hashmap here unless you need to insert or
delete entries in
- // other threads
- // AtomicInteger is enough to reflect value changes in other threads
- Map<Long, AtomicInteger> blockIdsTracker = Maps.newHashMap();
- primaryServerToBlockIds
- .values()
- .forEach(
- blockList ->
- blockList.forEach(block -> blockIdsTracker.put(block, new
AtomicInteger(0))));
- secondaryServerToBlockIds
- .values()
- .forEach(
- blockList ->
- blockList.forEach(block -> blockIdsTracker.put(block, new
AtomicInteger(0))));
-
- Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
- Set<Long> successBlockIds = Sets.newConcurrentHashSet();
- // if send block failed, the task will fail
- // todo: better to have fallback solution when send to multiple servers
+ /** Records the ShuffleServer that successfully or failed to send blocks */
+ Map<Long, List<ShuffleServerInfo>> blockIdSendSuccessTracker =
JavaUtils.newConcurrentMap();
+ Map<Long, List<ShuffleServerInfo>> blockIdsSendFailTracker =
JavaUtils.newConcurrentMap();
// sent the primary round of blocks.
boolean isAllSuccess =
@@ -363,7 +364,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
appId,
primaryServerToBlocks,
primaryServerToBlockIds,
- blockIdsTracker,
+ blockIdSendSuccessTracker,
+ blockIdsSendFailTracker,
secondaryServerToBlocks.isEmpty(),
needCancelRequest);
@@ -378,25 +380,31 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
appId,
secondaryServerToBlocks,
secondaryServerToBlockIds,
- blockIdsTracker,
+ blockIdSendSuccessTracker,
+ blockIdsSendFailTracker,
true,
needCancelRequest);
}
- // check success and failed blocks according to the replicaWrite
- blockIdsTracker
+ blockIdSendSuccessTracker
.entrySet()
.forEach(
- blockCt -> {
- long blockId = blockCt.getKey();
- int count = blockCt.getValue().get();
- if (count >= replicaWrite) {
- successBlockIds.add(blockId);
+ successBlockId -> {
+ if (successBlockId.getValue().size() < replicaWrite) {
+ // Removes blocks that do not reach replicaWrite from the
success queue
+ blockIdSendSuccessTracker.remove(successBlockId.getKey());
} else {
- failedBlockIds.add(blockId);
+ // If the replicaWrite to be sent is reached,
+ // no matter whether the block fails to be sent or not,
+ // the block is considered to have been sent successfully and
is removed from the
+ // failed block tracker
+ blockIdsSendFailTracker.remove(successBlockId.getKey());
}
});
- return new SendShuffleDataResult(successBlockIds, failedBlockIds);
+ return new SendShuffleDataResult(
+ blockIdSendSuccessTracker.keySet(),
+ blockIdsSendFailTracker.keySet(),
+ blockIdsSendFailTracker);
}
/**
diff --git
a/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
b/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
index 7569240f..d33a2986 100644
---
a/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
+++
b/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
@@ -17,16 +17,32 @@
package org.apache.uniffle.client.response;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.JavaUtils;
+
public class SendShuffleDataResult {
private Set<Long> successBlockIds;
private Set<Long> failedBlockIds;
+ private Map<Long, List<ShuffleServerInfo>> sendFailedBlockIds;
public SendShuffleDataResult(Set<Long> successBlockIds, Set<Long>
failedBlockIds) {
this.successBlockIds = successBlockIds;
this.failedBlockIds = failedBlockIds;
+ this.sendFailedBlockIds = JavaUtils.newConcurrentMap();
+ }
+
+ public SendShuffleDataResult(
+ Set<Long> successBlockIds,
+ Set<Long> failedBlockIds,
+ Map<Long, List<ShuffleServerInfo>> sendFailedBlockIds) {
+ this.successBlockIds = successBlockIds;
+ this.failedBlockIds = failedBlockIds;
+ this.sendFailedBlockIds = sendFailedBlockIds;
}
public Set<Long> getSuccessBlockIds() {
@@ -36,4 +52,8 @@ public class SendShuffleDataResult {
public Set<Long> getFailedBlockIds() {
return failedBlockIds;
}
+
+ public Map<Long, List<ShuffleServerInfo>> getSendFailedBlockIds() {
+ return sendFailedBlockIds;
+ }
}