This is an automated email from the ASF dual-hosted git repository.

jerrylei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new adafac4da [#825][part-5] feat(spark): Adds the RPC interface to 
reassign the ShuffleServer list. (#1146)
adafac4da is described below

commit adafac4dad84da6470c7707576ba3b85b1f301a8
Author: yl09099 <[email protected]>
AuthorDate: Fri Nov 3 14:12:04 2023 +0800

    [#825][part-5] feat(spark): Adds the RPC interface to reassign the 
ShuffleServer list. (#1146)
    
    ### What changes were proposed in this pull request?
    
    Adds the RPC interface to reassign the ShuffleServer list.
    
    ### Why are the changes needed?
    
    Fix: #825
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UT.
---
 .../manager/RssShuffleManagerInterface.java        |  2 +
 .../shuffle/manager/ShuffleManagerGrpcService.java | 21 ++++++
 .../shuffle/manager/DummyRssShuffleManager.java    |  6 ++
 .../apache/spark/shuffle/RssShuffleManager.java    | 82 +++++++++++++++++++++
 .../apache/spark/shuffle/RssShuffleManager.java    | 83 ++++++++++++++++++++++
 .../uniffle/client/api/ShuffleManagerClient.java   |  4 ++
 .../client/impl/grpc/ShuffleManagerGrpcClient.java | 10 +++
 .../client/request/RssReassignServersRequest.java  | 45 ++++++++++++
 .../client/response/RssReassignServersReponse.java | 43 +++++++++++
 proto/src/main/proto/Rss.proto                     | 15 ++++
 10 files changed, 311 insertions(+)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
index 4940277c8..73fdbdb07 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
@@ -70,4 +70,6 @@ public interface RssShuffleManagerInterface {
    * @param shuffleServerId
    */
   void addFailuresShuffleServerInfos(String shuffleServerId);
+
+  boolean reassignShuffleServers(int stageId, int stageAttemptNumber, int 
shuffleId, int numMaps);
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index a03c0439b..444e82cfb 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -230,6 +230,27 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
     responseObserver.onCompleted();
   }
 
+  @Override
+  public void reassignShuffleServers(
+      RssProtos.ReassignServersRequest request,
+      StreamObserver<RssProtos.ReassignServersReponse> responseObserver) {
+    int stageId = request.getStageId();
+    int stageAttemptNumber = request.getStageAttemptNumber();
+    int shuffleId = request.getShuffleId();
+    int numPartitions = request.getNumPartitions();
+    boolean needReassign =
+        shuffleManager.reassignShuffleServers(
+            stageId, stageAttemptNumber, shuffleId, numPartitions);
+    RssProtos.StatusCode code = RssProtos.StatusCode.SUCCESS;
+    RssProtos.ReassignServersReponse reply =
+        RssProtos.ReassignServersReponse.newBuilder()
+            .setStatus(code)
+            .setNeedReassign(needReassign)
+            .build();
+    responseObserver.onNext(reply);
+    responseObserver.onCompleted();
+  }
+
   /**
    * Remove the no longer used shuffle id's rss shuffle status. This is called 
when ShuffleManager
    * unregisters the corresponding shuffle id.
diff --git 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
index 24980afbf..0b9e4f858 100644
--- 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
+++ 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
@@ -57,4 +57,10 @@ public class DummyRssShuffleManager implements 
RssShuffleManagerInterface {
 
   @Override
   public void addFailuresShuffleServerInfos(String shuffleServerId) {}
+
+  @Override
+  public boolean reassignShuffleServers(
+      int stageId, int stageAttemptNumber, int shuffleId, int numMaps) {
+    return false;
+  }
 }
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 2cdf8636c..2f7e5105b 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkEnv;
+import org.apache.spark.SparkException;
 import org.apache.spark.TaskContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.executor.ShuffleWriteMetrics;
@@ -125,6 +126,11 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
   private boolean rssResubmitStage;
   /** A list of shuffleServer for Write failures */
   private Set<String> failuresShuffleServerIds = Sets.newHashSet();
+  /**
+   * Prevent multiple tasks from reporting FetchFailed, resulting in multiple 
ShuffleServer
+   * assignments, stageID, Attemptnumber Whether to reassign the combination 
flag;
+   */
+  private Map<String, Boolean> serverAssignedInfos = 
JavaUtils.newConcurrentMap();
 
   public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
     if (sparkConf.getBoolean("spark.sql.adaptive.enabled", false)) {
@@ -806,4 +812,80 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
   public void addFailuresShuffleServerInfos(String shuffleServerId) {
     failuresShuffleServerIds.add(shuffleServerId);
   }
+
+  /**
+   * Reassign the ShuffleServer list for ShuffleId
+   *
+   * @param shuffleId
+   * @param numPartitions
+   */
+  @Override
+  public synchronized boolean reassignShuffleServers(
+      int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
+    String stageIdAndAttempt = stageId + "_" + stageAttemptNumber;
+    Boolean needReassgin = 
serverAssignedInfos.computeIfAbsent(stageIdAndAttempt, id -> false);
+    if (!needReassgin) {
+      String storageType = 
sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
+      RemoteStorageInfo defaultRemoteStorage =
+          new 
RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), 
""));
+      RemoteStorageInfo remoteStorage =
+          ClientUtils.fetchRemoteStorage(
+              appId, defaultRemoteStorage, dynamicConfEnabled, storageType, 
shuffleWriteClient);
+      Set<String> assignmentTags = 
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
+      ClientUtils.validateClientType(clientType);
+      assignmentTags.add(clientType);
+      int requiredShuffleServerNumber =
+          RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
+      long retryInterval = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
+      int retryTimes = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
+      int estimateTaskConcurrency = 
RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
+      /** Before reassigning ShuffleServer, clear the ShuffleServer list in 
ShuffleWriteClient. */
+      shuffleWriteClient.unregisterShuffle(appId, shuffleId);
+      Map<Integer, List<ShuffleServerInfo>> partitionToServers;
+      try {
+        partitionToServers =
+            RetryUtils.retry(
+                () -> {
+                  ShuffleAssignmentsInfo response =
+                      shuffleWriteClient.getShuffleAssignments(
+                          appId,
+                          shuffleId,
+                          numPartitions,
+                          1,
+                          assignmentTags,
+                          requiredShuffleServerNumber,
+                          estimateTaskConcurrency,
+                          failuresShuffleServerIds);
+                  registerShuffleServers(
+                      appId, shuffleId, response.getServerToPartitionRanges(), 
remoteStorage);
+                  return response.getPartitionToServers();
+                },
+                retryInterval,
+                retryTimes);
+      } catch (Throwable throwable) {
+        throw new RssException("registerShuffle failed!", throwable);
+      }
+      /**
+       * we need to clear the metadata of the completed task, otherwise some 
of the stage's data
+       * will be lost
+       */
+      try {
+        unregisterAllMapOutput(shuffleId);
+      } catch (SparkException e) {
+        LOG.error("Clear MapoutTracker Meta failed!");
+        throw new RssException("Clear MapoutTracker Meta failed!", e);
+      }
+      ShuffleHandleInfo handleInfo =
+          new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
+      shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo);
+      serverAssignedInfos.put(stageIdAndAttempt, true);
+      return true;
+    } else {
+      LOG.info(
+          "The Stage:{} has been reassigned in an Attempt{},Return without 
performing any operation",
+          stageId,
+          stageAttemptNumber);
+      return false;
+    }
+  }
 }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 3ee108fd3..ceac60063 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -43,6 +43,7 @@ import org.apache.spark.MapOutputTracker;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkEnv;
+import org.apache.spark.SparkException;
 import org.apache.spark.TaskContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.executor.ShuffleReadMetrics;
@@ -136,6 +137,11 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
   private boolean rssResubmitStage;
   /** A list of shuffleServer for Write failures */
   private Set<String> failuresShuffleServerIds;
+  /**
+   * Prevent multiple tasks from reporting FetchFailed, resulting in multiple 
ShuffleServer
+   * assignments, stageID, Attemptnumber Whether to reassign the combination 
flag;
+   */
+  private Map<String, Boolean> serverAssignedInfos;
 
   public RssShuffleManager(SparkConf conf, boolean isDriver) {
     this.sparkConf = conf;
@@ -263,6 +269,8 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
             poolSize,
             keepAliveTime);
     this.shuffleIdToShuffleHandleInfo = JavaUtils.newConcurrentMap();
+    this.failuresShuffleServerIds = Sets.newHashSet();
+    this.serverAssignedInfos = JavaUtils.newConcurrentMap();
   }
 
   public CompletableFuture<Long> sendData(AddBlockEvent event) {
@@ -1104,4 +1112,79 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
   public void addFailuresShuffleServerInfos(String shuffleServerId) {
     failuresShuffleServerIds.add(shuffleServerId);
   }
+
+  /**
+   * Reassign the ShuffleServer list for ShuffleId
+   *
+   * @param shuffleId
+   * @param numPartitions
+   */
+  @Override
+  public synchronized boolean reassignShuffleServers(
+      int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
+    String stageIdAndAttempt = stageId + "_" + stageAttemptNumber;
+    Boolean needReassgin = 
serverAssignedInfos.computeIfAbsent(stageIdAndAttempt, id -> false);
+    if (!needReassgin) {
+      String storageType = 
sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
+      RemoteStorageInfo defaultRemoteStorage =
+          new 
RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), 
""));
+      RemoteStorageInfo remoteStorage =
+          ClientUtils.fetchRemoteStorage(
+              id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType, 
shuffleWriteClient);
+      Set<String> assignmentTags = 
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
+      int requiredShuffleServerNumber =
+          RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
+      long retryInterval = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
+      int retryTimes = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
+      int estimateTaskConcurrency = 
RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
+      /** Before reassigning ShuffleServer, clear the ShuffleServer list in 
ShuffleWriteClient. */
+      shuffleWriteClient.unregisterShuffle(id.get(), shuffleId);
+      Map<Integer, List<ShuffleServerInfo>> partitionToServers;
+      try {
+        partitionToServers =
+            RetryUtils.retry(
+                () -> {
+                  ShuffleAssignmentsInfo response =
+                      shuffleWriteClient.getShuffleAssignments(
+                          id.get(),
+                          shuffleId,
+                          numPartitions,
+                          1,
+                          assignmentTags,
+                          requiredShuffleServerNumber,
+                          estimateTaskConcurrency,
+                          failuresShuffleServerIds);
+                  registerShuffleServers(
+                      id.get(), shuffleId, 
response.getServerToPartitionRanges(), remoteStorage);
+                  return response.getPartitionToServers();
+                },
+                retryInterval,
+                retryTimes);
+
+      } catch (Throwable throwable) {
+        throw new RssException("registerShuffle failed!", throwable);
+      }
+      /**
+       * we need to clear the metadata of the completed task, otherwise some 
of the stage's data
+       * will be lost
+       */
+      try {
+        unregisterAllMapOutput(shuffleId);
+      } catch (SparkException e) {
+        LOG.error("Clear MapoutTracker Meta failed!");
+        throw new RssException("Clear MapoutTracker Meta failed!", e);
+      }
+      ShuffleHandleInfo handleInfo =
+          new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
+      shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo);
+      serverAssignedInfos.put(stageIdAndAttempt, true);
+      return true;
+    } else {
+      LOG.info(
+          "The Stage:{} has been reassigned in an Attempt{},Return without 
performing any operation",
+          stageId,
+          stageAttemptNumber);
+      return false;
+    }
+  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index cd2d79569..ddba67be0 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -20,9 +20,11 @@ package org.apache.uniffle.client.api;
 import java.io.Closeable;
 
 import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
+import org.apache.uniffle.client.request.RssReassignServersRequest;
 import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
 import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
+import org.apache.uniffle.client.response.RssReassignServersReponse;
 import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
 import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
 
@@ -41,4 +43,6 @@ public interface ShuffleManagerClient extends Closeable {
 
   RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
       RssReportShuffleWriteFailureRequest req);
+
+  RssReassignServersReponse reassignShuffleServers(RssReassignServersRequest 
req);
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index 8708a9b42..b38113c2e 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -24,9 +24,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleManagerClient;
 import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
+import org.apache.uniffle.client.request.RssReassignServersRequest;
 import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
 import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
+import org.apache.uniffle.client.response.RssReassignServersReponse;
 import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
 import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
 import org.apache.uniffle.common.config.RssBaseConf;
@@ -105,4 +107,12 @@ public class ShuffleManagerGrpcClient extends GrpcClient 
implements ShuffleManag
       throw new RssException(msg, e);
     }
   }
+
+  @Override
+  public RssReassignServersReponse 
reassignShuffleServers(RssReassignServersRequest req) {
+    RssProtos.ReassignServersRequest reassignServersRequest = req.toProto();
+    RssProtos.ReassignServersReponse reassignServersReponse =
+        getBlockingStub().reassignShuffleServers(reassignServersRequest);
+    return RssReassignServersReponse.fromProto(reassignServersReponse);
+  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignServersRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignServersRequest.java
new file mode 100644
index 000000000..5ce505ab1
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignServersRequest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReassignServersRequest {
+  private int stageId;
+  private int stageAttemptNumber;
+  private int shuffleId;
+  private int numPartitions;
+
+  public RssReassignServersRequest(
+      int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
+    this.stageId = stageId;
+    this.stageAttemptNumber = stageAttemptNumber;
+    this.shuffleId = shuffleId;
+    this.numPartitions = numPartitions;
+  }
+
+  public RssProtos.ReassignServersRequest toProto() {
+    RssProtos.ReassignServersRequest.Builder builder =
+        RssProtos.ReassignServersRequest.newBuilder()
+            .setStageId(stageId)
+            .setStageAttemptNumber(stageAttemptNumber)
+            .setShuffleId(shuffleId)
+            .setNumPartitions(numPartitions);
+    return builder.build();
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
new file mode 100644
index 000000000..4e332da32
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReassignServersReponse extends ClientResponse {
+
+  private boolean needReassign;
+
+  public RssReassignServersReponse(StatusCode statusCode, String message, 
boolean needReassign) {
+    super(statusCode, message);
+    this.needReassign = needReassign;
+  }
+
+  public boolean isNeedReassign() {
+    return needReassign;
+  }
+
+  public static RssReassignServersReponse 
fromProto(RssProtos.ReassignServersReponse response) {
+    return new RssReassignServersReponse(
+        // todo: [issue#780] add fromProto for StatusCode issue
+        StatusCode.valueOf(response.getStatus().name()),
+        response.getMsg(),
+        response.getNeedReassign());
+  }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 93bd2f67b..3b3ad8e59 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -501,6 +501,8 @@ service ShuffleManager {
   rpc getPartitionToShufflerServer(PartitionToShuffleServerRequest) returns 
(PartitionToShuffleServerResponse);
   // Report write failures to ShuffleManager
   rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns 
(ReportShuffleWriteFailureResponse);
+  // Reassign the RPC interface of the ShuffleServer list
+  rpc reassignShuffleServers(ReassignServersRequest) returns 
(ReassignServersReponse);
 }
 
 message ReportShuffleFetchFailureRequest {
@@ -550,3 +552,16 @@ message ReportShuffleWriteFailureResponse {
   bool reSubmitWholeStage = 2;
   string msg = 3;
 }
+
+message ReassignServersRequest{
+  int32 stageId = 1;
+  int32 stageAttemptNumber = 2;
+  int32 shuffleId = 3;
+  int32 numPartitions = 4;
+}
+
+message ReassignServersReponse{
+  StatusCode status = 1;
+  bool needReassign = 2;
+  string msg = 3;
+}

Reply via email to