zuston commented on code in PR #1615:
URL:
https://github.com/apache/incubator-uniffle/pull/1615#discussion_r1568117273
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfo.java:
##########
@@ -85,33 +126,125 @@ public int getShuffleId() {
return shuffleId;
}
- public boolean isExistingFaultyServer(String serverId) {
- return faultyServerReplacements.containsKey(serverId);
+ @VisibleForTesting
+ protected boolean isMarkedAsFaultyServer(String serverId) {
+ return faultyServerToReplacements.containsKey(serverId);
}
- public ShuffleServerInfo useExistingReassignmentForMultiPartitions(
- Set<Integer> partitionIds, String faultyServerId) {
- return createNewReassignmentForMultiPartitions(partitionIds,
faultyServerId, null);
+ public Set<ShuffleServerInfo> getExistingReplacements(String faultyServerId)
{
+ return faultyServerToReplacements.get(faultyServerId);
}
- public ShuffleServerInfo createNewReassignmentForMultiPartitions(
- Set<Integer> partitionIds, String faultyServerId, ShuffleServerInfo
replacement) {
- if (replacement != null) {
- faultyServerReplacements.put(faultyServerId, replacement);
+ public void updateReassignment(
+ Set<Integer> partitionIds, String faultyServerId, Set<ShuffleServerInfo>
replacements) {
+ if (replacements == null) {
+ return;
}
- replacement = faultyServerReplacements.get(faultyServerId);
+ faultyServerToReplacements.put(faultyServerId, replacements);
+ // todo: optimize the multiple for performance
for (Integer partitionId : partitionIds) {
- List<ShuffleServerInfo> replicaServers =
partitionToServers.get(partitionId);
- for (int i = 0; i < replicaServers.size(); i++) {
- if (replicaServers.get(i).getId().equals(faultyServerId)) {
- Map<Integer, ShuffleServerInfo> replicaReplacements =
- failoverPartitionServers.computeIfAbsent(
- partitionId, k -> JavaUtils.newConcurrentMap());
- replicaReplacements.put(i, replacement);
+ Map<Integer, List<ShuffleServerInfo>> replicaServers =
+ partitionReplicaAssignedServers.get(partitionId);
+ for (Map.Entry<Integer, List<ShuffleServerInfo>> serverEntry :
replicaServers.entrySet()) {
+ List<ShuffleServerInfo> servers = serverEntry.getValue();
+ if (servers.stream()
+ .map(x -> x.getId())
+ .collect(Collectors.toSet())
+ .contains(faultyServerId)) {
+ Set<ShuffleServerInfo> tempSet = new HashSet<>();
+ tempSet.addAll(replacements);
+ tempSet.removeAll(servers);
+ servers.addAll(tempSet);
Review Comment:
Yes.
The name of `faulty servers` here is not accurate, (But here I will not
change this, maybe later.) because due to tight memory, the NO_BUFFER will be
threw. For these cases, we'd better to reassign partition server to write data.
So the reserving these server is to fetch partial data when reading.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]