zuston commented on code in PR #1445:
URL:
https://github.com/apache/incubator-uniffle/pull/1445#discussion_r1451952159
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -1187,6 +1195,82 @@ public synchronized boolean reassignShuffleServers(
}
}
+ @Override
+ public ShuffleServerInfo reassignFaultyShuffleServer(
+ int shuffleId, Set<String> partitionIds, String faultyShuffleServerId) {
+ ShuffleServerInfo newShuffleServerInfo;
+ synchronized (this) {
+ if (getReassignedFaultyServers().containsKey(faultyShuffleServerId)) {
+ return getReassignedFaultyServers().get(faultyShuffleServerId);
+ }
+ newShuffleServerInfo = assignShuffleServer(shuffleId,
faultyShuffleServerId);
+ if (newShuffleServerInfo == null) {
+ return null;
+ }
+ getReassignedFaultyServers().put(faultyShuffleServerId,
newShuffleServerInfo);
+ }
+ ShuffleHandleInfo shuffleHandleInfo =
shuffleIdToShuffleHandleInfo.get(shuffleId);
+ for (String partitionId : partitionIds) {
+ List<ShuffleServerInfo> shuffleServerInfoList =
+ shuffleHandleInfo.getPartitionToServers().get(partitionId);
+ for (int i = 0; i < shuffleServerInfoList.size(); i++) {
+ if
(shuffleServerInfoList.get(i).getId().equals(faultyShuffleServerId)) {
+ shuffleHandleInfo
+ .getDynamicAssignedPartitionServers()
+ .computeIfAbsent(Integer.valueOf(partitionId), k ->
Maps.newHashMap());
+ shuffleHandleInfo
+ .getDynamicAssignedPartitionServers()
+ .get(partitionId)
+ .computeIfAbsent(i, j -> Lists.newArrayList())
+ .add(newShuffleServerInfo);
+ }
+ }
+ }
+ return newShuffleServerInfo;
+ }
+
+ private Map<String, ShuffleServerInfo> reassignedFaultyServers =
Maps.newConcurrentMap();
+
+ public Map<String, ShuffleServerInfo> getReassignedFaultyServers() {
+ return reassignedFaultyServers;
+ }
+
+ private ShuffleServerInfo assignShuffleServer(int shuffleId, String
faultyShuffleServerId) {
Review Comment:
I think the part of the code is duplicate with the registerShuffle Method?
Could you extract the general method?
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -1187,6 +1195,82 @@ public synchronized boolean reassignShuffleServers(
}
}
+ @Override
+ public ShuffleServerInfo reassignFaultyShuffleServer(
+ int shuffleId, Set<String> partitionIds, String faultyShuffleServerId) {
+ ShuffleServerInfo newShuffleServerInfo;
+ synchronized (this) {
+ if (getReassignedFaultyServers().containsKey(faultyShuffleServerId)) {
+ return getReassignedFaultyServers().get(faultyShuffleServerId);
+ }
+ newShuffleServerInfo = assignShuffleServer(shuffleId,
faultyShuffleServerId);
+ if (newShuffleServerInfo == null) {
+ return null;
+ }
+ getReassignedFaultyServers().put(faultyShuffleServerId,
newShuffleServerInfo);
+ }
+ ShuffleHandleInfo shuffleHandleInfo =
shuffleIdToShuffleHandleInfo.get(shuffleId);
+ for (String partitionId : partitionIds) {
+ List<ShuffleServerInfo> shuffleServerInfoList =
+ shuffleHandleInfo.getPartitionToServers().get(partitionId);
+ for (int i = 0; i < shuffleServerInfoList.size(); i++) {
+ if
(shuffleServerInfoList.get(i).getId().equals(faultyShuffleServerId)) {
+ shuffleHandleInfo
+ .getDynamicAssignedPartitionServers()
+ .computeIfAbsent(Integer.valueOf(partitionId), k ->
Maps.newHashMap());
+ shuffleHandleInfo
+ .getDynamicAssignedPartitionServers()
+ .get(partitionId)
+ .computeIfAbsent(i, j -> Lists.newArrayList())
+ .add(newShuffleServerInfo);
+ }
+ }
+ }
+ return newShuffleServerInfo;
+ }
+
+ private Map<String, ShuffleServerInfo> reassignedFaultyServers =
Maps.newConcurrentMap();
Review Comment:
Could you put this var defined in front of the class file ?
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -402,6 +416,105 @@ private void checkIfBlocksFailed() {
}
}
+ private boolean needReAssignShuffleServer(Set<Long> failedBlockIds) {
+ boolean result = true;
+ FailedBlockSendTracker failedBlockTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
Review Comment:
`failedBlockTracker` will be null, I don't see any initialization process
about this
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -402,6 +416,105 @@ private void checkIfBlocksFailed() {
}
}
+ private boolean needReAssignShuffleServer(Set<Long> failedBlockIds) {
+ boolean result = true;
+ FailedBlockSendTracker failedBlockTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
+ for (Long failedBlockId : failedBlockIds) {
+ result =
+ failedBlockTracker.getFailedBlockStatus(failedBlockId).stream()
+ // todo: more status need reassign
+ .allMatch(
+ trackBlockStatus -> trackBlockStatus.getStatusCode() ==
StatusCode.NO_BUFFER);
+ }
+ return result;
+ }
+
+ private void reAssignServerForFailedBlockIds(Set<Long> failedBlockIds) {
+ List<ShuffleBlockInfo> reAssignSeverBlockInfoList = Lists.newArrayList();
+ List<ShuffleBlockInfo> failedBlockInfoList = Lists.newArrayList();
+ for (Long failedBlockId : failedBlockIds) {
+ FailedBlockSendTracker failedBlockTracker =
+ shuffleManager.getBlockIdsFailedSendTracker(taskId);
+ Set<TrackBlockStatus> trackBlockStatusSet =
+ failedBlockTracker.getFailedBlockStatus(failedBlockId);
+
+ Map<ShuffleServerInfo, List<TrackBlockStatus>> faultyServerToPartitions =
+ trackBlockStatusSet.stream()
+ .collect(Collectors.groupingBy(d -> d.getShuffleServerInfo()));
+
+ faultyServerToPartitions.entrySet().stream()
+ .forEach(
+ t -> {
+ Set<String> partitionIds =
+ t.getValue().stream()
+ .map(x ->
String.valueOf(x.getShuffleBlockInfo().getPartitionId()))
+ .collect(Collectors.toSet());
+ ShuffleServerInfo dynamicShuffleServer =
+
shuffleManager.getReassignedFaultyServers().get(t.getKey().getId());
+ if (dynamicShuffleServer == null) {
+ dynamicShuffleServer =
+ reAssignFaultyShuffleServer(partitionIds,
t.getKey().getId());
+ if (dynamicShuffleServer == null) {
Review Comment:
throw exception directly? Otherwise the failed blocks will be ignored.
##########
proto/src/main/proto/Rss.proto:
##########
@@ -396,6 +396,19 @@ message GetShuffleServerRequest {
repeated string faultyServerIds = 12;
}
+message ReassignShuffleServerRequest {
Review Comment:
Is this unused?
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -1187,6 +1195,82 @@ public synchronized boolean reassignShuffleServers(
}
}
+ @Override
+ public ShuffleServerInfo reassignFaultyShuffleServer(
+ int shuffleId, Set<String> partitionIds, String faultyShuffleServerId) {
+ ShuffleServerInfo newShuffleServerInfo;
+ synchronized (this) {
+ if (getReassignedFaultyServers().containsKey(faultyShuffleServerId)) {
Review Comment:
How about extract the param of `getReassignedFaultyServers`
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java:
##########
@@ -347,6 +347,16 @@ public class RssSparkConfig {
+ " of concurrent tasks."))
.createWithDefault(RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE);
+ public static final ConfigEntry<Boolean>
RSS_DYNAMIC_SERVER_ASSIGNMENT_ENABLED =
Review Comment:
Please use the config style like this:
```
public static final ConfigOption<Boolean> RSS_TASK_FAILED_CALLBACK_ENABLED =
ConfigOptions.key("rss.task.failed.callback.enable")
.booleanType()
.defaultValue(true);
```
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -402,6 +416,105 @@ private void checkIfBlocksFailed() {
}
}
+ private boolean needReAssignShuffleServer(Set<Long> failedBlockIds) {
Review Comment:
I think we should check the blocks existence that meet the requirements,
current impl looks wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]