zuston commented on code in PR #2138:
URL: 
https://github.com/apache/incubator-uniffle/pull/2138#discussion_r1776428497


##########
server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java:
##########
@@ -45,10 +42,9 @@ public class RegisterHeartBeat {
   private final long heartBeatInterval;
   private final ShuffleServer shuffleServer;
   private final String coordinatorQuorum;
-  private final List<CoordinatorClient> coordinatorClients;
+  private final CoordinatorGrpcRetryableClient coordinatorClients;

Review Comment:
   `coordinatorClients` -> `coordinatorClient`



##########
internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java:
##########
@@ -64,7 +65,10 @@ public CoordinatorGrpcRetryableClient(
         ThreadUtils.getDaemonFixedThreadPool(heartBeatThreadNum, 
"client-heartbeat");
   }
 
-  public void sendAppHeartBeat(RssAppHeartBeatRequest request, long timeoutMs) 
{
+  @Override
+  public RssAppHeartBeatResponse sendAppHeartBeat(RssAppHeartBeatRequest 
request) {

Review Comment:
   This method name looks just like a operation, rather than daemon thread to 
always sendAppHeartbeat.
   
   From my sight, how about renaming this as the from `sendAppHeartBeat` to 
`scheduleAtFixedRateToSendAppHeartBeat` ? 



##########
internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java:
##########
@@ -64,7 +65,10 @@ public CoordinatorGrpcRetryableClient(
         ThreadUtils.getDaemonFixedThreadPool(heartBeatThreadNum, 
"client-heartbeat");
   }
 
-  public void sendAppHeartBeat(RssAppHeartBeatRequest request, long timeoutMs) 
{
+  @Override
+  public RssAppHeartBeatResponse sendAppHeartBeat(RssAppHeartBeatRequest 
request) {

Review Comment:
   WDYT? 



##########
server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java:
##########
@@ -58,11 +54,12 @@ public RegisterHeartBeat(ShuffleServer shuffleServer) {
     CoordinatorClientFactory factory = CoordinatorClientFactory.getInstance();
     this.coordinatorClients =
         factory.createCoordinatorClient(
-            conf.get(ShuffleServerConf.RSS_COORDINATOR_CLIENT_TYPE), 
this.coordinatorQuorum);
+            conf.get(ShuffleServerConf.RSS_COORDINATOR_CLIENT_TYPE),
+            this.coordinatorQuorum,
+            0,

Review Comment:
   The `0` could be as the default value in the initial 
`createCoordinatorClient`



##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java:
##########
@@ -947,23 +947,23 @@ protected Map<Integer, List<ShuffleServerInfo>> 
requestShuffleAssignment(
     int retryTimes = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
     faultyServerIds.addAll(rssStageResubmitManager.getServerIdBlackList());
     try {
-      ShuffleAssignmentsInfo response =
-          shuffleWriteClient.getShuffleAssignments(
-              appId,
-              shuffleId,
-              partitionNum,
-              partitionNumPerRange,
-              assignmentTags,
-              assignmentShuffleServerNumber,
-              estimateTaskConcurrency,
-              faultyServerIds,
-              stageId,
-              stageAttemptNumber,
-              reassign,
-              retryInterval,
-              retryTimes);
       return RetryUtils.retry(
           () -> {
+            ShuffleAssignmentsInfo response =

Review Comment:
   so the `throw new RssException("registerShuffle failed!", throwable);` is 
wrong, please correct it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to