advancedxy commented on code in PR #1137:
URL:
https://github.com/apache/incubator-uniffle/pull/1137#discussion_r1297233282
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java:
##########
@@ -405,4 +341,71 @@ private ServerNode
toServerNode(ShuffleServerHeartBeatRequest request) {
StorageInfoUtils.fromProto(request.getStorageInfoMap()),
request.getServerId().getNettyPort());
}
+
+ @Override
+ public void getShuffleAssignments(
Review Comment:
This generates too much changes. Please modify the previous method in L102 -
L166 directly
##########
client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java:
##########
@@ -69,6 +69,16 @@ void reportShuffleResult(
Map<Integer, List<Long>> partitionToBlockIds,
int bitmapNum);
+ ShuffleAssignmentsInfo getShuffleAssignments(
Review Comment:
I believe a better option maybe:
``` java
default ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
int shuffleId,
int partitionNum,
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds) {
throw new UnsupportedException(...);
}
```
##########
client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java:
##########
@@ -69,6 +69,16 @@ void reportShuffleResult(
Map<Integer, List<Long>> partitionToBlockIds,
int bitmapNum);
+ ShuffleAssignmentsInfo getShuffleAssignments(
Review Comment:
So the changes in mapred/mapreduce/tez is not needed any more.
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java:
##########
@@ -242,15 +242,30 @@ public void add(ServerNode node) {
@Override
public List<ServerNode> getServerList(Set<String> requiredTags) {
+ return getServerList(requiredTags, null);
+ }
+
+ @Override
+ public List<ServerNode> getServerList(Set<String> requiredTags, Set<String>
faultyServerIds) {
List<ServerNode> availableNodes = Lists.newArrayList();
for (ServerNode node : servers.values()) {
if (!ServerStatus.ACTIVE.equals(node.getStatus())) {
continue;
}
- if (!excludeNodes.contains(node.getId())
- && node.getTags().containsAll(requiredTags)
- && ServerStatus.ACTIVE.equals(node.getStatus())) {
- availableNodes.add(node);
+ // If the excludeServerNodes is null, the exclusion operation is not
performed
+ if (faultyServerIds != null && !faultyServerIds.isEmpty()) {
+ if (!faultyServerIds.contains(node.getId())
+ && !excludeNodes.contains(node.getId())
+ && node.getTags().containsAll(requiredTags)
+ && ServerStatus.ACTIVE.equals(node.getStatus())) {
+ availableNodes.add(node);
+ }
+ } else {
+ if (!excludeNodes.contains(node.getId())
+ && node.getTags().containsAll(requiredTags)
+ && ServerStatus.ACTIVE.equals(node.getStatus())) {
+ availableNodes.add(node);
+ }
Review Comment:
The status check is not needed as it's already been checked in L247.
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java:
##########
@@ -242,15 +242,30 @@ public void add(ServerNode node) {
@Override
public List<ServerNode> getServerList(Set<String> requiredTags) {
+ return getServerList(requiredTags, null);
Review Comment:
I would prefer passing `Collections.emptySet` to avoid potential NPE and
consistent with other places
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java:
##########
@@ -115,6 +135,7 @@ public int compare(ServerNode o1, ServerNode o2) {
});
if (nodes.isEmpty() || nodes.size() < replica) {
+ LOG.info("cishidenodeshi,{},{}", nodes.size(), replica);
Review Comment:
please remove this.
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java:
##########
@@ -242,15 +242,30 @@ public void add(ServerNode node) {
@Override
public List<ServerNode> getServerList(Set<String> requiredTags) {
+ return getServerList(requiredTags, null);
+ }
+
+ @Override
+ public List<ServerNode> getServerList(Set<String> requiredTags, Set<String>
faultyServerIds) {
List<ServerNode> availableNodes = Lists.newArrayList();
for (ServerNode node : servers.values()) {
if (!ServerStatus.ACTIVE.equals(node.getStatus())) {
continue;
}
- if (!excludeNodes.contains(node.getId())
- && node.getTags().containsAll(requiredTags)
- && ServerStatus.ACTIVE.equals(node.getStatus())) {
- availableNodes.add(node);
+ // If the excludeServerNodes is null, the exclusion operation is not
performed
+ if (faultyServerIds != null && !faultyServerIds.isEmpty()) {
+ if (!faultyServerIds.contains(node.getId())
+ && !excludeNodes.contains(node.getId())
+ && node.getTags().containsAll(requiredTags)
+ && ServerStatus.ACTIVE.equals(node.getStatus())) {
+ availableNodes.add(node);
+ }
+ } else {
+ if (!excludeNodes.contains(node.getId())
+ && node.getTags().containsAll(requiredTags)
+ && ServerStatus.ACTIVE.equals(node.getStatus())) {
+ availableNodes.add(node);
+ }
Review Comment:
let's refactor this and extract a method for it
```java
private boolean isNodeAvailable(Set<String> requiredTags, Set<String>
faultyServerIds, ServerNode node) {
if (faultyServerIds != null && faultyServerIds.contains(node.getId())) {
return false;
}
return !excludeNodes.contains(node.getId()) &&
node.getTags().containsAll(requiredTags);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]