jerqi commented on code in PR #1138:
URL:
https://github.com/apache/incubator-uniffle/pull/1138#discussion_r1349823606
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -986,4 +992,18 @@ private Roaring64NavigableMap getShuffleResultForMultiPart(
e, sparkConf, appId, shuffleId, stageAttemptId, failedPartitions);
}
}
+
+ /**
+ * The ShuffleServer list of block sending failures is returned using the
shuffle task ID
+ *
+ * @param taskId Shuffle taskId
+ * @return List of failed ShuffleServer blocks
+ */
+ public Map<Long, List<ShuffleServerInfo>>
getFailedBlockIdsWithShuffleServer(String taskId) {
+ Map<Long, List<ShuffleServerInfo>> result =
taskToFailedBlockIdsAndServer.get(taskId);
+ if (result == null) {
+ result = Maps.newHashMap();
Review Comment:
JavaUtils.newConcurrentMap?
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java:
##########
@@ -118,6 +125,18 @@ private synchronized void putBlockId(
.addAll(blockIds);
}
+ private synchronized void putBlockIdAndShuffle(
+ Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
+ String taskAttemptId,
+ Map<Long, List<ShuffleServerInfo>> blockIdsAndServer) {
+ if (blockIdsAndServer == null || blockIdsAndServer.isEmpty()) {
+ return;
+ }
+ taskToFailedBlockIdsAndServer
+ .computeIfAbsent(taskAttemptId, x -> Maps.newConcurrentMap())
Review Comment:
JavaUtils.newConcurrentMap()?
##########
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java:
##########
@@ -366,35 +385,18 @@ public SendShuffleDataResult sendShuffleData(
false);
}
}
-
- // maintain the count of blocks that have been sent to the server
- // unnecessary to use concurrent hashmap here unless you need to insert or
delete entries in
- // other threads
- // AtomicInteger is enough to reflect value changes in other threads
- Map<Long, AtomicInteger> blockIdsTracker = Maps.newHashMap();
- primaryServerToBlockIds
- .values()
- .forEach(
- blockList ->
- blockList.forEach(block -> blockIdsTracker.put(block, new
AtomicInteger(0))));
- secondaryServerToBlockIds
- .values()
- .forEach(
- blockList ->
- blockList.forEach(block -> blockIdsTracker.put(block, new
AtomicInteger(0))));
-
- Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
- Set<Long> successBlockIds = Sets.newConcurrentHashSet();
- // if send block failed, the task will fail
- // todo: better to have fallback solution when send to multiple servers
+ /** Records the ShuffleServer that successfully or failed to send blocks */
+ Map<Long, List<ShuffleServerInfo>> blockIdSendSuccessTracker =
Maps.newConcurrentMap();
Review Comment:
JavaUtils.newConcurrentMap?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]