zuston commented on code in PR #1762:
URL: 
https://github.com/apache/incubator-uniffle/pull/1762#discussion_r1625584961


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmitManager.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.JavaUtils;
+
+public class RssStageResubmitManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RssStageResubmitManager.class);
+  /** A list of shuffleServer for Write failures */
+  private Set<String> failuresShuffleServerIds;
+  /**
+   * Prevent multiple tasks from reporting FetchFailed, resulting in multiple 
ShuffleServer
+   * assignments, stageID, Attemptnumber Whether to reassign the combination 
flag;
+   */
+  private Map<Integer, RssStageResubmit> serverAssignedInfos;
+
+  public RssStageResubmitManager() {
+    this.failuresShuffleServerIds = Sets.newConcurrentHashSet();
+    this.serverAssignedInfos = JavaUtils.newConcurrentMap();
+  }
+
+  public Set<String> getFailuresShuffleServerIds() {
+    return failuresShuffleServerIds;
+  }
+
+  public void setFailuresShuffleServerIds(Set<String> 
failuresShuffleServerIds) {

Review Comment:
   `setFailuresShuffleServerIds` -> `resetFailuresShuffleServerIds`



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmit.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+public class RssStageResubmit {

Review Comment:
   How about renaming to `RssStageInfo` ? 



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmitManager.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.JavaUtils;
+
+public class RssStageResubmitManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RssStageResubmitManager.class);
+  /** A list of shuffleServer for Write failures */
+  private Set<String> failuresShuffleServerIds;

Review Comment:
   How about making this named as `serverIdBlackList`



##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java:
##########
@@ -520,4 +574,417 @@ protected static RemoteStorageInfo 
getDefaultRemoteStorageInfo(SparkConf sparkCo
     return new RemoteStorageInfo(
         sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""), 
confItems);
   }
+
+  /**
+   * In Stage Retry mode, obtain the Shuffle Server list from the Driver based 
on shuffleId.
+   *
+   * @param shuffleId shuffleId
+   * @return ShuffleHandleInfo
+   */
+  protected synchronized StageAttemptShuffleHandleInfo 
getRemoteShuffleHandleInfoWithStageRetry(
+      int shuffleId) {
+    RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
+        new RssPartitionToShuffleServerRequest(shuffleId);
+    RssReassignOnStageRetryResponse rpcPartitionToShufflerServer =
+        getOrCreateShuffleManagerClient()
+            
.getPartitionToShufflerServerWithStageRetry(rssPartitionToShuffleServerRequest);
+    StageAttemptShuffleHandleInfo shuffleHandleInfo =
+        StageAttemptShuffleHandleInfo.fromProto(
+            rpcPartitionToShufflerServer.getShuffleHandleInfoProto());
+    return shuffleHandleInfo;
+  }
+
+  /**
+   * In Block Retry mode, obtain the Shuffle Server list from the Driver based 
on shuffleId.
+   *
+   * @param shuffleId shuffleId
+   * @return ShuffleHandleInfo
+   */
+  protected synchronized MutableShuffleHandleInfo 
getRemoteShuffleHandleInfoWithBlockRetry(
+      int shuffleId) {
+    RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
+        new RssPartitionToShuffleServerRequest(shuffleId);
+    RssReassignOnBlockSendFailureResponse rpcPartitionToShufflerServer =
+        getOrCreateShuffleManagerClient()
+            
.getPartitionToShufflerServerWithBlockRetry(rssPartitionToShuffleServerRequest);
+    MutableShuffleHandleInfo shuffleHandleInfo =
+        
MutableShuffleHandleInfo.fromProto(rpcPartitionToShufflerServer.getHandle());
+    return shuffleHandleInfo;
+  }
+
+  // todo: automatic close client when the client is idle to avoid too much 
connections for spark
+  // driver.
+  protected ShuffleManagerClient getOrCreateShuffleManagerClient() {
+    if (shuffleManagerClient == null) {
+      RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+      String driver = rssConf.getString("driver.host", "");
+      int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
+      this.shuffleManagerClient =
+          ShuffleManagerClientFactory.getInstance()
+              .createShuffleManagerClient(ClientType.GRPC, driver, port);
+    }
+    return shuffleManagerClient;
+  }
+
+  @Override
+  public ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int shuffleId) {
+    return shuffleHandleInfoManager.get(shuffleId);
+  }
+
+  /**
+   * @return the maximum number of fetch failures per shuffle partition before 
that shuffle stage
+   *     should be recomputed
+   */
+  @Override
+  public int getMaxFetchFailures() {
+    final String TASK_MAX_FAILURE = "spark.task.maxFailures";
+    return Math.max(1, sparkConf.getInt(TASK_MAX_FAILURE, 4) - 1);
+  }
+
+  /**
+   * Add the shuffleServer that failed to write to the failure list
+   *
+   * @param shuffleServerId
+   */
+  @Override
+  public void addFailuresShuffleServerInfos(String shuffleServerId) {
+    rssStageResubmitManager.recordFailuresShuffleServer(shuffleServerId);
+  }
+
+  /**
+   * Reassign the ShuffleServer list for ShuffleId
+   *
+   * @param shuffleId
+   * @param numPartitions
+   */
+  @Override
+  public boolean reassignOnStageResubmit(
+      int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
+    String stageIdAndAttempt = stageId + "_" + stageAttemptNumber;
+    RssStageResubmit rssStageResubmit =
+        rssStageResubmitManager.recordAndGetServerAssignedInfo(shuffleId, 
stageIdAndAttempt);
+    synchronized (rssStageResubmit) {
+      Boolean needReassign = rssStageResubmit.isReassigned();
+      if (!needReassign) {
+        int requiredShuffleServerNumber =
+            RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
+        int estimateTaskConcurrency = 
RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
+
+        /**
+         * this will clear up the previous stage attempt all data when 
registering the same
+         * shuffleId at the second time
+         */
+        Map<Integer, List<ShuffleServerInfo>> partitionToServers =
+            requestShuffleAssignment(
+                shuffleId,
+                numPartitions,
+                1,
+                requiredShuffleServerNumber,
+                estimateTaskConcurrency,
+                rssStageResubmitManager.getFailuresShuffleServerIds(),
+                stageAttemptNumber);
+        /**
+         * we need to clear the metadata of the completed task, otherwise some 
of the stage's data
+         * will be lost
+         */
+        try {
+          unregisterAllMapOutput(shuffleId);
+        } catch (SparkException e) {
+          LOG.error("Clear MapoutTracker Meta failed!");
+          throw new RssException("Clear MapoutTracker Meta failed!", e);
+        }
+        MutableShuffleHandleInfo shuffleHandleInfo =
+            new MutableShuffleHandleInfo(shuffleId, partitionToServers, 
getRemoteStorageInfo());
+        StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo =
+            (StageAttemptShuffleHandleInfo) 
shuffleHandleInfoManager.get(shuffleId);
+        
stageAttemptShuffleHandleInfo.replaceCurrentShuffleHandleInfo(shuffleHandleInfo);
+        rssStageResubmitManager.recordAndGetServerAssignedInfo(shuffleId, 
stageIdAndAttempt, true);
+        return true;
+      } else {
+        LOG.info(

Review Comment:
   Change to: 
   
   ```
   LOG.info("Do nothing that the stage: {} has been reassigned for attempt{}", 
stageId, stageAttemptNumber);
   ```



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java:
##########
@@ -157,6 +157,36 @@ public void registerShuffle(
     int shuffleId = req.getShuffleId();
     String remoteStoragePath = req.getRemoteStorage().getPath();
     String user = req.getUser();
+    int stageAttemptNumber = req.getStageAttemptNumber();
+
+    if (stageAttemptNumber > 0) {
+      ShuffleTaskInfo taskInfo = 
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
+      int attemptNumber = taskInfo.getLatestStageAttemptNumber(shuffleId);
+      if (stageAttemptNumber > attemptNumber) {
+        taskInfo.refreshLatestStageAttemptNumber(shuffleId, 
stageAttemptNumber);
+        try {
+          long start = System.currentTimeMillis();
+          shuffleServer.getShuffleTaskManager().removeShuffleDataSync(appId, 
shuffleId);
+          LOG.info(
+              "Deleted the previous stage attempt data due to stage 
recomputing for app: {}, "
+                  + "shuffleId: {}. It costs {} ms",
+              appId,
+              shuffleId,
+              System.currentTimeMillis() - start);
+        } catch (Exception e) {
+          LOG.error(

Review Comment:
   If the exception is thrown, whether the client will fast fail? I think we 
should make this fail rather than retry again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to