advancedxy commented on code in PR #1137:
URL:
https://github.com/apache/incubator-uniffle/pull/1137#discussion_r1297927789
##########
internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java:
##########
@@ -40,7 +42,16 @@ public RssGetShuffleAssignmentsRequest(
int partitionNumPerRange,
int dataReplica,
Set<String> requiredTags) {
- this(appId, shuffleId, partitionNum, partitionNumPerRange, dataReplica,
requiredTags, -1, -1);
+ this(
+ appId,
+ shuffleId,
+ partitionNum,
+ partitionNumPerRange,
+ dataReplica,
+ requiredTags,
+ -1,
+ -1,
+ Sets.newConcurrentHashSet());
Review Comment:
ditto.
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java:
##########
@@ -101,7 +102,7 @@ public void getShuffleServerNum(
@Override
public void getShuffleAssignments(
- GetShuffleServerRequest request,
+ RssProtos.GetShuffleServerRequest request,
Review Comment:
why this change? I believe we should import `import
org.apache.uniffle.proto.RssProtos.GetShuffleServerRequest;` instead?
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java:
##########
@@ -111,9 +112,10 @@ public void getShuffleAssignments(
final Set<String> requiredTags =
Sets.newHashSet(request.getRequireTagsList());
final int requiredShuffleServerNumber =
request.getAssignmentShuffleServerNumber();
final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
+ final Set<String> faultyServerIds = new
HashSet<>(request.getFaultyServerIdsList());
LOG.info(
- "Request of getShuffleAssignments for appId[{}], shuffleId[{}],
partitionNum[{}], "
+ "Request of redo getShuffleAssignments for appId[{}], shuffleId[{}],
partitionNum[{}], "
+ " partitionNumPerRange[{}], replica[{}], requiredTags[{}],
requiredShuffleServerNumber[{}]",
appId,
Review Comment:
didn't notice this early, I don't think there's need to add `redo` in the
logging.
Also, please add the description of faultyServerIds in the logging. you may
print the first xx nodes
##########
client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java:
##########
@@ -69,6 +69,19 @@ void reportShuffleResult(
Map<Integer, List<Long>> partitionToBlockIds,
int bitmapNum);
+ default ShuffleAssignmentsInfo getShuffleAssignments(
+ String appId,
+ int shuffleId,
+ int partitionNum,
+ int partitionNumPerRange,
+ Set<String> requiredTags,
+ int assignmentShuffleServerNumber,
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds) {
+ throw new UnsupportedOperationException(
+ "Currently only SPARK2 or SPARK3 supports this method.");
Review Comment:
Nit: a good practice would be
```
new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement
getShuffleAssignments with faultyServerIds");
```
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java:
##########
@@ -242,14 +242,18 @@ public void add(ServerNode node) {
@Override
public List<ServerNode> getServerList(Set<String> requiredTags) {
+ return getServerList(requiredTags, Sets.newConcurrentHashSet());
Review Comment:
`newHashSet` should be enough?
Why do we need a concurrent one?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]