yl09099 commented on code in PR #1147:
URL: 
https://github.com/apache/incubator-uniffle/pull/1147#discussion_r1386526713


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -473,4 +492,61 @@ Map<Integer, Set<Long>> getPartitionToBlockIds() {
   public WriteBufferManager getBufferManager() {
     return bufferManager;
   }
+
+  private static ShuffleManagerClient createShuffleManagerClient(String host, 
int port)
+      throws IOException {
+    ClientType grpc = ClientType.GRPC;
+    // Host can be inferred from `spark.driver.bindAddress`, which would be 
set when SparkContext is
+    // constructed.
+    return 
ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(grpc, 
host, port);
+  }
+
+  private RssException generateFetchFailedIfNecessary(Exception e) {
+    // The shuffleServer is registered only when a Block fails to be sent
+    if (e instanceof RssSendFailedException) {
+      Map<Long, BlockingQueue<ShuffleServerInfo>> failedBlockIds =
+          shuffleManager.getFailedBlockIdsWithShuffleServer(taskId);
+      List<ShuffleServerInfo> shuffleServerInfos = Lists.newArrayList();
+      for (Map.Entry<Long, BlockingQueue<ShuffleServerInfo>> longListEntry :
+          failedBlockIds.entrySet()) {
+        shuffleServerInfos.addAll(longListEntry.getValue());
+      }
+      RssReportShuffleWriteFailureRequest req =
+          new RssReportShuffleWriteFailureRequest(
+              appId,
+              shuffleId,
+              taskContext.stageAttemptNumber(),
+              shuffleServerInfos,
+              e.getMessage());
+      RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+      String driver = rssConf.getString("driver.host", "");
+      int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
+      try (ShuffleManagerClient shuffleManagerClient = 
createShuffleManagerClient(driver, port)) {
+        RssReportShuffleWriteFailureResponse response =
+            shuffleManagerClient.reportShuffleWriteFailure(req);
+        if (response.getReSubmitWholeStage()) {
+          RssReassignServersRequest rssReassignServersRequest =
+              new RssReassignServersRequest(
+                  taskContext.stageId(),
+                  taskContext.stageAttemptNumber(),
+                  shuffleId,
+                  partitioner.numPartitions());
+          RssReassignServersReponse rssReassignServersReponse =
+              
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
+          LOG.info(
+              "Whether the reassignment is successful: {}",
+              rssReassignServersReponse.isNeedReassign());
+          // since we are going to roll out the whole stage, mapIndex 
shouldn't matter, hence -1 is
+          // provided.
+          FetchFailedException ffe =
+              RssSparkShuffleUtils.createFetchFailedException(
+                  shuffleId, -1, taskContext.stageAttemptNumber(), e);
+          return new RssException(ffe);
+        }
+      } catch (IOException ioe) {
+        LOG.info("Error closing shuffle manager client with error:", ioe);
+      }
+    }
+    return new RssException(e);

Review Comment:
   > 
   
   Have been changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to