This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7794f0f91 [#1791] feat(spark)(coordinator): Take more infos on getting 
assignment to track app reassign/stageRetry (#1792)
7794f0f91 is described below

commit 7794f0f913e221361a6d903e1e0fedbf27ef32b5
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jun 18 21:07:39 2024 +0800

    [#1791] feat(spark)(coordinator): Take more infos on getting assignment to 
track app reassign/stageRetry (#1792)
    
    ### What changes were proposed in this pull request?
    
    Take more infos on getting assignment to track app reassign/stageRetry
    
    ### Why are the changes needed?
    
    For #1791
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
---
 .../hadoop/mapred/SortWriteBufferManagerTest.java  |  6 +-
 .../hadoop/mapreduce/task/reduce/FetcherTest.java  |  6 +-
 .../shuffle/manager/RssShuffleManagerBase.java     | 66 +++++++++++++++++++---
 .../manager/RssShuffleManagerInterface.java        |  5 +-
 .../shuffle/manager/ShuffleManagerGrpcService.java |  2 +
 .../shuffle/manager/DummyRssShuffleManager.java    |  5 +-
 .../common/sort/buffer/WriteBufferManagerTest.java |  6 +-
 .../uniffle/client/api/ShuffleWriteClient.java     | 43 ++++++++++++--
 .../client/impl/ShuffleWriteClientImpl.java        | 30 +++-------
 .../coordinator/CoordinatorGrpcService.java        | 10 +++-
 .../client/impl/grpc/CoordinatorGrpcClient.java    | 13 ++++-
 .../request/RssGetShuffleAssignmentsRequest.java   | 26 ++++++++-
 proto/src/main/proto/Rss.proto                     |  3 +
 13 files changed, 173 insertions(+), 48 deletions(-)

diff --git 
a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
 
b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 430e2ff58..3cb017239 100644
--- 
a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ 
b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -574,7 +574,11 @@ public class SortWriteBufferManagerTest {
         int partitionNumPerRange,
         Set<String> requiredTags,
         int assignmentShuffleServerNumber,
-        int estimateTaskConcurrency) {
+        int estimateTaskConcurrency,
+        Set<String> faultyServerIds,
+        int stageId,
+        int stageAttemptNumber,
+        boolean reassign) {
       return null;
     }
 
diff --git 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 7664b47d6..6c5f6776d 100644
--- 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++ 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -542,7 +542,11 @@ public class FetcherTest {
         int partitionNumPerRange,
         Set<String> requiredTags,
         int assignmentShuffleServerNumber,
-        int estimateTaskConcurrency) {
+        int estimateTaskConcurrency,
+        Set<String> faultyServerIds,
+        int stageId,
+        int stageAttemptNumber,
+        boolean reassign) {
       return null;
     }
 
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 912bf998f..197035e3a 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -682,7 +682,9 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
                 requiredShuffleServerNumber,
                 estimateTaskConcurrency,
                 rssStageResubmitManager.getServerIdBlackList(),
-                stageAttemptNumber);
+                stageId,
+                stageAttemptNumber,
+                false);
         /**
          * we need to clear the metadata of the completed task, otherwise some 
of the stage's data
          * will be lost
@@ -713,7 +715,10 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
   /** this is only valid on driver side that exposed to being invoked by grpc 
server */
   @Override
   public MutableShuffleHandleInfo reassignOnBlockSendFailure(
-      int shuffleId, Map<Integer, List<ReceivingFailureServer>> 
partitionToFailureServers) {
+      int stageId,
+      int stageAttemptNumber,
+      int shuffleId,
+      Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
     long startTime = System.currentTimeMillis();
     MutableShuffleHandleInfo handleInfo =
         (MutableShuffleHandleInfo) shuffleHandleInfoManager.get(shuffleId);
@@ -742,7 +747,13 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
             excludedServers.add(serverId);
             replacements =
                 reassignServerForTask(
-                    shuffleId, Sets.newHashSet(partitionId), excludedServers, 
requiredServerNum);
+                    stageId,
+                    stageAttemptNumber,
+                    shuffleId,
+                    Sets.newHashSet(partitionId),
+                    excludedServers,
+                    requiredServerNum,
+                    true);
           } else {
             serverHasReplaced = true;
           }
@@ -805,10 +816,13 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
 
   /** Request the new shuffle-servers to replace faulty server. */
   private Set<ShuffleServerInfo> reassignServerForTask(
+      int stageId,
+      int stageAttemptNumber,
       int shuffleId,
       Set<Integer> partitionIds,
       Set<String> excludedServers,
-      int requiredServerNum) {
+      int requiredServerNum,
+      boolean reassign) {
     AtomicReference<Set<ShuffleServerInfo>> replacementsRef =
         new AtomicReference<>(new HashSet<>());
     requestShuffleAssignment(
@@ -828,7 +842,10 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
                   .collect(Collectors.toSet());
           replacementsRef.set(replacements);
           return createShuffleAssignmentsInfo(replacements, partitionIds);
-        });
+        },
+        stageId,
+        stageAttemptNumber,
+        reassign);
     return replacementsRef.get();
   }
 
@@ -839,7 +856,10 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
       int assignmentShuffleServerNumber,
       int estimateTaskConcurrency,
       Set<String> faultyServerIds,
-      Function<ShuffleAssignmentsInfo, ShuffleAssignmentsInfo> 
reassignmentHandler) {
+      Function<ShuffleAssignmentsInfo, ShuffleAssignmentsInfo> 
reassignmentHandler,
+      int stageId,
+      int stageAttemptNumber,
+      boolean reassign) {
     Set<String> assignmentTags = 
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
     ClientUtils.validateClientType(clientType);
     assignmentTags.add(clientType);
@@ -858,7 +878,10 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
                     assignmentTags,
                     assignmentShuffleServerNumber,
                     estimateTaskConcurrency,
-                    faultyServerIds);
+                    faultyServerIds,
+                    stageId,
+                    stageAttemptNumber,
+                    reassign);
             LOG.info("Finished reassign");
             if (reassignmentHandler != null) {
               response = reassignmentHandler.apply(response);
@@ -881,7 +904,9 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
       int assignmentShuffleServerNumber,
       int estimateTaskConcurrency,
       Set<String> faultyServerIds,
-      int stageAttemptNumber) {
+      int stageId,
+      int stageAttemptNumber,
+      boolean reassign) {
     Set<String> assignmentTags = 
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
     ClientUtils.validateClientType(clientType);
     assignmentTags.add(clientType);
@@ -901,7 +926,10 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
                     assignmentTags,
                     assignmentShuffleServerNumber,
                     estimateTaskConcurrency,
-                    faultyServerIds);
+                    faultyServerIds,
+                    stageId,
+                    stageAttemptNumber,
+                    reassign);
             registerShuffleServers(
                 appId,
                 shuffleId,
@@ -917,6 +945,26 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     }
   }
 
+  protected Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
+      int shuffleId,
+      int partitionNum,
+      int partitionNumPerRange,
+      int assignmentShuffleServerNumber,
+      int estimateTaskConcurrency,
+      Set<String> faultyServerIds,
+      int stageAttemptNumber) {
+    return requestShuffleAssignment(
+        shuffleId,
+        partitionNum,
+        partitionNumPerRange,
+        assignmentShuffleServerNumber,
+        estimateTaskConcurrency,
+        faultyServerIds,
+        -1,
+        stageAttemptNumber,
+        false);
+  }
+
   protected void registerShuffleServers(
       String appId,
       int shuffleId,
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
index 52ded5db7..77379efb5 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
@@ -83,5 +83,8 @@ public interface RssShuffleManagerInterface {
   boolean reassignOnStageResubmit(int stageId, int stageAttemptNumber, int 
shuffleId, int numMaps);
 
   MutableShuffleHandleInfo reassignOnBlockSendFailure(
-      int shuffleId, Map<Integer, List<ReceivingFailureServer>> 
partitionToFailureServers);
+      int stageId,
+      int stageAttemptNumber,
+      int shuffleId,
+      Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers);
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 3a2f58cca..b9828408c 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -275,6 +275,8 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
           request.getExecutorId());
       MutableShuffleHandleInfo handle =
           shuffleManager.reassignOnBlockSendFailure(
+              request.getStageId(),
+              request.getStageAttemptNumber(),
               request.getShuffleId(),
               request.getFailurePartitionToServerIdsMap().entrySet().stream()
                   .collect(
diff --git 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
index df40e9d16..317d0cd9e 100644
--- 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
+++ 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
@@ -77,7 +77,10 @@ public class DummyRssShuffleManager implements 
RssShuffleManagerInterface {
 
   @Override
   public MutableShuffleHandleInfo reassignOnBlockSendFailure(
-      int shuffleId, Map<Integer, List<ReceivingFailureServer>> 
partitionToFailureServers) {
+      int stageId,
+      int stageAttemptNumber,
+      int shuffleId,
+      Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
     return null;
   }
 }
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index d0eab5ae1..70c1a8aed 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -645,7 +645,11 @@ public class WriteBufferManagerTest {
         int partitionNumPerRange,
         Set<String> requiredTags,
         int assignmentShuffleServerNumber,
-        int estimateTaskConcurrency) {
+        int estimateTaskConcurrency,
+        Set<String> faultyServerIds,
+        int stageId,
+        int stageAttemptNumber,
+        boolean reassign) {
       return null;
     }
 
diff --git 
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java 
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index efd39e35a..db0c91484 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.client.api;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -99,6 +100,19 @@ public interface ShuffleWriteClient {
       long taskAttemptId,
       int bitmapNum);
 
+  ShuffleAssignmentsInfo getShuffleAssignments(
+      String appId,
+      int shuffleId,
+      int partitionNum,
+      int partitionNumPerRange,
+      Set<String> requiredTags,
+      int assignmentShuffleServerNumber,
+      int estimateTaskConcurrency,
+      Set<String> faultyServerIds,
+      int stageId,
+      int stageAttemptNumber,
+      boolean reassign);
+
   default ShuffleAssignmentsInfo getShuffleAssignments(
       String appId,
       int shuffleId,
@@ -108,19 +122,38 @@ public interface ShuffleWriteClient {
       int assignmentShuffleServerNumber,
       int estimateTaskConcurrency,
       Set<String> faultyServerIds) {
-    throw new UnsupportedOperationException(
-        this.getClass().getName()
-            + " doesn't implement getShuffleAssignments with faultyServerIds");
+    return getShuffleAssignments(
+        appId,
+        shuffleId,
+        partitionNum,
+        partitionNumPerRange,
+        requiredTags,
+        assignmentShuffleServerNumber,
+        estimateTaskConcurrency,
+        faultyServerIds,
+        -1,
+        0,
+        false);
   }
 
-  ShuffleAssignmentsInfo getShuffleAssignments(
+  default ShuffleAssignmentsInfo getShuffleAssignments(
       String appId,
       int shuffleId,
       int partitionNum,
       int partitionNumPerRange,
       Set<String> requiredTags,
       int assignmentShuffleServerNumber,
-      int estimateTaskConcurrency);
+      int estimateTaskConcurrency) {
+    return getShuffleAssignments(
+        appId,
+        shuffleId,
+        partitionNum,
+        partitionNumPerRange,
+        requiredTags,
+        assignmentShuffleServerNumber,
+        estimateTaskConcurrency,
+        Collections.emptySet());
+  }
 
   Roaring64NavigableMap getShuffleResult(
       String clientType,
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index a79a8557a..b16b88168 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -635,26 +635,6 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     return remoteStorage;
   }
 
-  @Override
-  public ShuffleAssignmentsInfo getShuffleAssignments(
-      String appId,
-      int shuffleId,
-      int partitionNum,
-      int partitionNumPerRange,
-      Set<String> requiredTags,
-      int assignmentShuffleServerNumber,
-      int estimateTaskConcurrency) {
-    return getShuffleAssignments(
-        appId,
-        shuffleId,
-        partitionNum,
-        partitionNumPerRange,
-        requiredTags,
-        assignmentShuffleServerNumber,
-        estimateTaskConcurrency,
-        Sets.newConcurrentHashSet());
-  }
-
   @Override
   public ShuffleAssignmentsInfo getShuffleAssignments(
       String appId,
@@ -664,7 +644,10 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
       Set<String> requiredTags,
       int assignmentShuffleServerNumber,
       int estimateTaskConcurrency,
-      Set<String> faultyServerIds) {
+      Set<String> faultyServerIds,
+      int stageId,
+      int stageAttemptNumber,
+      boolean reassign) {
     RssGetShuffleAssignmentsRequest request =
         new RssGetShuffleAssignmentsRequest(
             appId,
@@ -675,7 +658,10 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
             requiredTags,
             assignmentShuffleServerNumber,
             estimateTaskConcurrency,
-            faultyServerIds);
+            faultyServerIds,
+            stageId,
+            stageAttemptNumber,
+            reassign);
 
     RssGetShuffleAssignmentsResponse response =
         new RssGetShuffleAssignmentsResponse(StatusCode.INTERNAL_ERROR);
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 0b4b9d69e..ddca8a1e3 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -117,8 +117,9 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
     final Set<String> faultyServerIds = new 
HashSet<>(request.getFaultyServerIdsList());
 
     LOG.info(
-        "Request of getShuffleAssignments for appId[{}], shuffleId[{}], 
partitionNum[{}], "
-            + " partitionNumPerRange[{}], replica[{}], requiredTags[{}], 
requiredShuffleServerNumber[{}],faultyServerIds[{}]",
+        "Request of getShuffleAssignments for appId[{}], shuffleId[{}], 
partitionNum[{}],"
+            + " partitionNumPerRange[{}], replica[{}], requiredTags[{}], 
requiredShuffleServerNumber[{}],"
+            + " faultyServerIds[{}], stageId[{}], stageAttemptNumber[{}], 
isReassign[{}]",
         appId,
         shuffleId,
         partitionNum,
@@ -126,7 +127,10 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
         replica,
         requiredTags,
         requiredShuffleServerNumber,
-        faultyServerIds.size());
+        faultyServerIds.size(),
+        request.getStageId(),
+        request.getStageAttemptNumber(),
+        request.getReassign());
 
     GetShuffleAssignmentsResponse response;
     try {
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index be6e70df3..484bb43d7 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -178,7 +178,10 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
       Set<String> requiredTags,
       int assignmentShuffleServerNumber,
       int estimateTaskConcurrency,
-      Set<String> faultyServerIds) {
+      Set<String> faultyServerIds,
+      int stageId,
+      int stageAttemptNumber,
+      boolean reassign) {
     RssProtos.GetShuffleServerRequest getServerRequest =
         RssProtos.GetShuffleServerRequest.newBuilder()
             .setApplicationId(appId)
@@ -190,6 +193,9 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             .setAssignmentShuffleServerNumber(assignmentShuffleServerNumber)
             .setEstimateTaskConcurrency(estimateTaskConcurrency)
             .addAllFaultyServerIds(faultyServerIds)
+            .setStageId(stageId)
+            .setStageAttemptNumber(stageAttemptNumber)
+            .setReassign(reassign)
             .build();
 
     return blockingStub.getShuffleAssignments(getServerRequest);
@@ -283,7 +289,10 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             request.getRequiredTags(),
             request.getAssignmentShuffleServerNumber(),
             request.getEstimateTaskConcurrency(),
-            request.getFaultyServerIds());
+            request.getFaultyServerIds(),
+            request.getStageId(),
+            request.getStageAttemptNumber(),
+            request.isReassign());
 
     RssGetShuffleAssignmentsResponse response;
     RssProtos.StatusCode statusCode = rpcResponse.getStatus();
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
index 4cbdc4448..b80b1f9c0 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
@@ -33,6 +33,9 @@ public class RssGetShuffleAssignmentsRequest {
   private int assignmentShuffleServerNumber;
   private int estimateTaskConcurrency;
   private Set<String> faultyServerIds;
+  private int stageId = -1;
+  private int stageAttemptNumber = 0;
+  private boolean reassign = false;
 
   @VisibleForTesting
   public RssGetShuffleAssignmentsRequest(
@@ -74,7 +77,9 @@ public class RssGetShuffleAssignmentsRequest {
         assignmentShuffleServerNumber,
         estimateTaskConcurrency,
         faultyServerIds,
-        0);
+        -1,
+        0,
+        false);
   }
 
   public RssGetShuffleAssignmentsRequest(
@@ -87,7 +92,9 @@ public class RssGetShuffleAssignmentsRequest {
       int assignmentShuffleServerNumber,
       int estimateTaskConcurrency,
       Set<String> faultyServerIds,
-      int stageAttemptNumber) {
+      int stageId,
+      int stageAttemptNumber,
+      boolean reassign) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionNum = partitionNum;
@@ -97,6 +104,9 @@ public class RssGetShuffleAssignmentsRequest {
     this.assignmentShuffleServerNumber = assignmentShuffleServerNumber;
     this.estimateTaskConcurrency = estimateTaskConcurrency;
     this.faultyServerIds = faultyServerIds;
+    this.stageId = stageId;
+    this.stageAttemptNumber = stageAttemptNumber;
+    this.reassign = reassign;
   }
 
   public String getAppId() {
@@ -134,4 +144,16 @@ public class RssGetShuffleAssignmentsRequest {
   public Set<String> getFaultyServerIds() {
     return faultyServerIds;
   }
+
+  public int getStageId() {
+    return stageId;
+  }
+
+  public int getStageAttemptNumber() {
+    return stageAttemptNumber;
+  }
+
+  public boolean isReassign() {
+    return reassign;
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index dc5bdf230..97928bf20 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -416,6 +416,9 @@ message GetShuffleServerRequest {
   int32 assignmentShuffleServerNumber = 10;
   int32 estimateTaskConcurrency = 11;
   repeated string faultyServerIds = 12;
+  int32 stageId = 13;
+  int32 stageAttemptNumber = 14;
+  bool reassign = 15;
 }
 
 message PartitionRangeAssignment {

Reply via email to