yl09099 commented on code in PR #1762:
URL: 
https://github.com/apache/incubator-uniffle/pull/1762#discussion_r1625942481


##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java:
##########
@@ -520,4 +574,417 @@ protected static RemoteStorageInfo 
getDefaultRemoteStorageInfo(SparkConf sparkCo
     return new RemoteStorageInfo(
         sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""), 
confItems);
   }
+
+  /**
+   * In Stage Retry mode, obtain the Shuffle Server list from the Driver based 
on shuffleId.
+   *
+   * @param shuffleId shuffleId
+   * @return ShuffleHandleInfo
+   */
+  protected synchronized StageAttemptShuffleHandleInfo 
getRemoteShuffleHandleInfoWithStageRetry(
+      int shuffleId) {
+    RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
+        new RssPartitionToShuffleServerRequest(shuffleId);
+    RssReassignOnStageRetryResponse rpcPartitionToShufflerServer =
+        getOrCreateShuffleManagerClient()
+            
.getPartitionToShufflerServerWithStageRetry(rssPartitionToShuffleServerRequest);
+    StageAttemptShuffleHandleInfo shuffleHandleInfo =
+        StageAttemptShuffleHandleInfo.fromProto(
+            rpcPartitionToShufflerServer.getShuffleHandleInfoProto());
+    return shuffleHandleInfo;
+  }
+
+  /**
+   * In Block Retry mode, obtain the Shuffle Server list from the Driver based 
on shuffleId.
+   *
+   * @param shuffleId shuffleId
+   * @return ShuffleHandleInfo
+   */
+  protected synchronized MutableShuffleHandleInfo 
getRemoteShuffleHandleInfoWithBlockRetry(
+      int shuffleId) {
+    RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
+        new RssPartitionToShuffleServerRequest(shuffleId);
+    RssReassignOnBlockSendFailureResponse rpcPartitionToShufflerServer =
+        getOrCreateShuffleManagerClient()
+            
.getPartitionToShufflerServerWithBlockRetry(rssPartitionToShuffleServerRequest);
+    MutableShuffleHandleInfo shuffleHandleInfo =
+        
MutableShuffleHandleInfo.fromProto(rpcPartitionToShufflerServer.getHandle());
+    return shuffleHandleInfo;
+  }
+
+  // todo: automatic close client when the client is idle to avoid too much 
connections for spark
+  // driver.
+  protected ShuffleManagerClient getOrCreateShuffleManagerClient() {
+    if (shuffleManagerClient == null) {
+      RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+      String driver = rssConf.getString("driver.host", "");
+      int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
+      this.shuffleManagerClient =
+          ShuffleManagerClientFactory.getInstance()
+              .createShuffleManagerClient(ClientType.GRPC, driver, port);
+    }
+    return shuffleManagerClient;
+  }
+
+  @Override
+  public ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int shuffleId) {
+    return shuffleHandleInfoManager.get(shuffleId);
+  }
+
+  /**
+   * @return the maximum number of fetch failures per shuffle partition before 
that shuffle stage
+   *     should be recomputed
+   */
+  @Override
+  public int getMaxFetchFailures() {
+    final String TASK_MAX_FAILURE = "spark.task.maxFailures";
+    return Math.max(1, sparkConf.getInt(TASK_MAX_FAILURE, 4) - 1);
+  }
+
+  /**
+   * Add the shuffleServer that failed to write to the failure list
+   *
+   * @param shuffleServerId
+   */
+  @Override
+  public void addFailuresShuffleServerInfos(String shuffleServerId) {
+    rssStageResubmitManager.recordFailuresShuffleServer(shuffleServerId);
+  }
+
+  /**
+   * Reassign the ShuffleServer list for ShuffleId
+   *
+   * @param shuffleId
+   * @param numPartitions
+   */
+  @Override
+  public boolean reassignOnStageResubmit(
+      int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
+    String stageIdAndAttempt = stageId + "_" + stageAttemptNumber;
+    RssStageResubmit rssStageResubmit =
+        rssStageResubmitManager.recordAndGetServerAssignedInfo(shuffleId, 
stageIdAndAttempt);
+    synchronized (rssStageResubmit) {
+      Boolean needReassign = rssStageResubmit.isReassigned();
+      if (!needReassign) {
+        int requiredShuffleServerNumber =
+            RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
+        int estimateTaskConcurrency = 
RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
+
+        /**
+         * this will clear up the previous stage attempt all data when 
registering the same
+         * shuffleId at the second time
+         */
+        Map<Integer, List<ShuffleServerInfo>> partitionToServers =
+            requestShuffleAssignment(
+                shuffleId,
+                numPartitions,
+                1,
+                requiredShuffleServerNumber,
+                estimateTaskConcurrency,
+                rssStageResubmitManager.getFailuresShuffleServerIds(),
+                stageAttemptNumber);
+        /**
+         * we need to clear the metadata of the completed task, otherwise some 
of the stage's data
+         * will be lost
+         */
+        try {
+          unregisterAllMapOutput(shuffleId);
+        } catch (SparkException e) {
+          LOG.error("Clear MapoutTracker Meta failed!");
+          throw new RssException("Clear MapoutTracker Meta failed!", e);
+        }
+        MutableShuffleHandleInfo shuffleHandleInfo =
+            new MutableShuffleHandleInfo(shuffleId, partitionToServers, 
getRemoteStorageInfo());
+        StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo =
+            (StageAttemptShuffleHandleInfo) 
shuffleHandleInfoManager.get(shuffleId);
+        
stageAttemptShuffleHandleInfo.replaceCurrentShuffleHandleInfo(shuffleHandleInfo);
+        rssStageResubmitManager.recordAndGetServerAssignedInfo(shuffleId, 
stageIdAndAttempt, true);
+        return true;
+      } else {
+        LOG.info(

Review Comment:
   > Change to:
   > 
   > ```
   > LOG.info("Do nothing that the stage: {} has been reassigned for 
attempt{}", stageId, stageAttemptNumber);
   > ```
   
   The above has been changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to