This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new fa6d1625c [#825][part-1] feat(spark): Add the RPC interface for 
reassigning ShuffleServer (#1137)
fa6d1625c is described below

commit fa6d1625c8426181f13c45a87607af4e31f49458
Author: yl09099 <[email protected]>
AuthorDate: Sun Oct 8 14:18:28 2023 +0800

    [#825][part-1] feat(spark): Add the RPC interface for reassigning 
ShuffleServer (#1137)
    
    ### What changes were proposed in this pull request?
    
    We need to provide an RPC interface for reassigning ShuffleServer, with 
support for excluding some ShuffleServer lists.
    
    Ⅰ. Overall objective:
    
    1. During the shuffle write phase, the ShuffleServer reports faulty nodes 
and reallocates the ShuffleServer list;
    2. Triggers a Stage level retry of SPARK. The shuffleServer node is 
excluded and reallocated before the retry.
    
    Ⅱ. Implementation logic diagram:
    
    
![image](https://github.com/apache/incubator-uniffle/assets/33595968/866c8292-e0ff-4532-b519-02f424f4c2fc)
    
    Ⅲ. As shown in the picture above:
    
    1. During Shuffle registration, obtain the ShuffleServer list to be written 
through the RPC interface of a Coordinator Client by following the solid blue 
line step. The list is bound using ShuffleID.
    2, the Task of Stage starts, solid steps, in accordance with the green by 
ShuffleManager Client RPC interface gets to be written for 
shuffleIdToShuffleHandleInfo ShuffleServer list;
    3. In the Stage, if Task fails to write blocks to the ShuffleServer, press 
the steps in red to report ShuffleServer to FailedShuffleServerList in 
RSSShuffleManager through the RPC interface.
    4. FailedShuffleServerList records the number of ShuffleServer failures. 
After the number of failures reaches the maximum number of retries of the Task 
level, follow the steps in dotted orange lines. Through the RPC interface of a 
Coordinator Client, obtain the list of ShuffleServer files to be written (the 
ShuffleServer files that fail to be written are excluded). After obtaining the 
list, go to Step 5 of the dotted orange line. Throwing a FetchFailed Exception 
triggers a stage-level [...]
    5. Attempt 1 is generated by the SPARK Stage level again. Pull the 
corresponding ShuffleServer list according to the green dotted line.
    
    ### Why are the changes needed?
    
    Such an interface is required when dynamically allocating a shuffleServer 
list.
    
    Fix: #825
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add UT.
---
 .../uniffle/client/api/ShuffleWriteClient.java     | 14 +++++++++
 .../client/impl/ShuffleWriteClientImpl.java        | 24 ++++++++++++++-
 .../apache/uniffle/coordinator/ClusterManager.java |  9 ++++++
 .../coordinator/CoordinatorGrpcService.java        | 10 +++++--
 .../uniffle/coordinator/SimpleClusterManager.java  | 22 ++++++++++++++
 .../strategy/assignment/AssignmentStrategy.java    |  9 ++++++
 .../assignment/BasicAssignmentStrategy.java        | 34 ++++++++++++++++++++--
 .../PartitionBalanceAssignmentStrategy.java        | 22 +++++++++++++-
 .../uniffle/test/CoordinatorAssignmentTest.java    | 29 ++++++++++++++++++
 .../client/impl/grpc/CoordinatorGrpcClient.java    |  8 +++--
 .../request/RssGetShuffleAssignmentsRequest.java   | 21 +++++++++++--
 proto/src/main/proto/Rss.proto                     |  1 +
 12 files changed, 190 insertions(+), 13 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java 
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index ce7c53785..b28320ca2 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -69,6 +69,20 @@ public interface ShuffleWriteClient {
       Map<Integer, List<Long>> partitionToBlockIds,
       int bitmapNum);
 
+  default ShuffleAssignmentsInfo getShuffleAssignments(
+      String appId,
+      int shuffleId,
+      int partitionNum,
+      int partitionNumPerRange,
+      Set<String> requiredTags,
+      int assignmentShuffleServerNumber,
+      int estimateTaskConcurrency,
+      Set<String> faultyServerIds) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName()
+            + " doesn't implement getShuffleAssignments with faultyServerIds");
+  }
+
   ShuffleAssignmentsInfo getShuffleAssignments(
       String appId,
       int shuffleId,
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 71ee284ab..0c0b278c8 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -614,6 +614,27 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
       Set<String> requiredTags,
       int assignmentShuffleServerNumber,
       int estimateTaskConcurrency) {
+    return getShuffleAssignments(
+        appId,
+        shuffleId,
+        partitionNum,
+        partitionNumPerRange,
+        requiredTags,
+        assignmentShuffleServerNumber,
+        estimateTaskConcurrency,
+        Sets.newConcurrentHashSet());
+  }
+
+  @Override
+  public ShuffleAssignmentsInfo getShuffleAssignments(
+      String appId,
+      int shuffleId,
+      int partitionNum,
+      int partitionNumPerRange,
+      Set<String> requiredTags,
+      int assignmentShuffleServerNumber,
+      int estimateTaskConcurrency,
+      Set<String> faultyServerIds) {
     RssGetShuffleAssignmentsRequest request =
         new RssGetShuffleAssignmentsRequest(
             appId,
@@ -623,7 +644,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
             replica,
             requiredTags,
             assignmentShuffleServerNumber,
-            estimateTaskConcurrency);
+            estimateTaskConcurrency,
+            faultyServerIds);
 
     RssGetShuffleAssignmentsResponse response =
         new RssGetShuffleAssignmentsResponse(StatusCode.INTERNAL_ERROR);
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
index 510312d4f..5cd76c7c1 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
@@ -40,6 +40,15 @@ public interface ClusterManager extends Closeable, 
Reconfigurable {
    */
   List<ServerNode> getServerList(Set<String> requiredTags);
 
+  /**
+   * Get available nodes from the cluster
+   *
+   * @param requiredTags tags for filter
+   * @param faultyServerIds servernode information that is excluded because of 
an exception
+   * @return list of available server nodes
+   */
+  List<ServerNode> getServerList(Set<String> requiredTags, Set<String> 
faultyServerIds);
+
   /**
    * Get lost nodes from the cluster
    *
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index b658c329b..dffc3ff16 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.coordinator;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -111,17 +112,19 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
     final Set<String> requiredTags = 
Sets.newHashSet(request.getRequireTagsList());
     final int requiredShuffleServerNumber = 
request.getAssignmentShuffleServerNumber();
     final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
+    final Set<String> faultyServerIds = new 
HashSet<>(request.getFaultyServerIdsList());
 
     LOG.info(
         "Request of getShuffleAssignments for appId[{}], shuffleId[{}], 
partitionNum[{}], "
-            + " partitionNumPerRange[{}], replica[{}], requiredTags[{}], 
requiredShuffleServerNumber[{}]",
+            + " partitionNumPerRange[{}], replica[{}], requiredTags[{}], 
requiredShuffleServerNumber[{}],faultyServerIds[{}]",
         appId,
         shuffleId,
         partitionNum,
         partitionNumPerRange,
         replica,
         requiredTags,
-        requiredShuffleServerNumber);
+        requiredShuffleServerNumber,
+        faultyServerIds.size());
 
     GetShuffleAssignmentsResponse response;
     try {
@@ -138,7 +141,8 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
                   replica,
                   requiredTags,
                   requiredShuffleServerNumber,
-                  estimateTaskConcurrency);
+                  estimateTaskConcurrency,
+                  faultyServerIds);
       response = CoordinatorUtils.toGetShuffleAssignmentsResponse(pra);
       logAssignmentResult(appId, shuffleId, pra);
       responseObserver.onNext(response);
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 93b94024c..cc5c1f7d7 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -253,6 +253,28 @@ public class SimpleClusterManager implements 
ClusterManager {
     return availableNodes;
   }
 
+  @Override
+  public List<ServerNode> getServerList(Set<String> requiredTags, Set<String> 
faultyServerIds) {
+    List<ServerNode> availableNodes = Lists.newArrayList();
+    for (ServerNode node : servers.values()) {
+      if (!ServerStatus.ACTIVE.equals(node.getStatus())) {
+        continue;
+      }
+      if (isNodeAvailable(requiredTags, faultyServerIds, node)) {
+        availableNodes.add(node);
+      }
+    }
+    return availableNodes;
+  }
+
+  private boolean isNodeAvailable(
+      Set<String> requiredTags, Set<String> faultyServerIds, ServerNode node) {
+    if (faultyServerIds != null && faultyServerIds.contains(node.getId())) {
+      return false;
+    }
+    return !excludeNodes.contains(node.getId()) && 
node.getTags().containsAll(requiredTags);
+  }
+
   @Override
   public List<ServerNode> getLostServerList() {
     return Lists.newArrayList(lostNodes);
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
index 4ae08cc56..fc2545cb7 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
@@ -28,4 +28,13 @@ public interface AssignmentStrategy {
       Set<String> requiredTags,
       int requiredShuffleServerNumber,
       int estimateTaskConcurrency);
+
+  PartitionRangeAssignment assign(
+      int totalPartitionNum,
+      int partitionNumPerRange,
+      int replica,
+      Set<String> requiredTags,
+      int requiredShuffleServerNumber,
+      int estimateTaskConcurrency,
+      Set<String> serversList);
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
index ee6d90ef5..e5b45c81d 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.SortedMap;
 
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,12 +50,32 @@ public class BasicAssignmentStrategy extends 
AbstractAssignmentStrategy {
       Set<String> requiredTags,
       int requiredShuffleServerNumber,
       int estimateTaskConcurrency) {
+    return assign(
+        totalPartitionNum,
+        partitionNumPerRange,
+        replica,
+        requiredTags,
+        requiredShuffleServerNumber,
+        estimateTaskConcurrency,
+        Sets.newConcurrentHashSet());
+  }
+
+  @Override
+  public PartitionRangeAssignment assign(
+      int totalPartitionNum,
+      int partitionNumPerRange,
+      int replica,
+      Set<String> requiredTags,
+      int requiredShuffleServerNumber,
+      int estimateTaskConcurrency,
+      Set<String> faultyServerIds) {
     int shuffleNodesMax = clusterManager.getShuffleNodesMax();
     int expectedShuffleNodesNum = shuffleNodesMax;
     if (requiredShuffleServerNumber < shuffleNodesMax && 
requiredShuffleServerNumber > 0) {
       expectedShuffleNodesNum = requiredShuffleServerNumber;
     }
-    List<ServerNode> servers = getRequiredServers(requiredTags, 
expectedShuffleNodesNum);
+    List<ServerNode> servers =
+        getRequiredServers(requiredTags, expectedShuffleNodesNum, 
faultyServerIds);
     if (servers.isEmpty() || servers.size() < replica) {
       return new PartitionRangeAssignment(null);
     }
@@ -66,8 +87,15 @@ public class BasicAssignmentStrategy extends 
AbstractAssignmentStrategy {
     return new PartitionRangeAssignment(assignments);
   }
 
-  private List<ServerNode> getRequiredServers(Set<String> requiredTags, int 
expectedNum) {
-    List<ServerNode> servers = clusterManager.getServerList(requiredTags);
+  /**
+   * @param requiredTags filter of tags
+   * @param expectedNum the number of expect servernode
+   * @param faultyServerIds servernode information that is excluded because of 
an exception
+   * @return
+   */
+  private List<ServerNode> getRequiredServers(
+      Set<String> requiredTags, int expectedNum, Set<String> faultyServerIds) {
+    List<ServerNode> servers = clusterManager.getServerList(requiredTags, 
faultyServerIds);
     // shuffle server update the status according to heartbeat, if every 
server is in initial
     // status,
     // random the order of list to avoid always pick same nodes
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
index d155681b0..402df7799 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.SortedMap;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +68,25 @@ public class PartitionBalanceAssignmentStrategy extends 
AbstractAssignmentStrate
       Set<String> requiredTags,
       int requiredShuffleServerNumber,
       int estimateTaskConcurrency) {
+    return assign(
+        totalPartitionNum,
+        partitionNumPerRange,
+        replica,
+        requiredTags,
+        requiredShuffleServerNumber,
+        estimateTaskConcurrency,
+        Sets.newConcurrentHashSet());
+  }
+
+  @Override
+  public PartitionRangeAssignment assign(
+      int totalPartitionNum,
+      int partitionNumPerRange,
+      int replica,
+      Set<String> requiredTags,
+      int requiredShuffleServerNumber,
+      int estimateTaskConcurrency,
+      Set<String> excludeServerNodes) {
 
     if (partitionNumPerRange != 1) {
       throw new RssException("PartitionNumPerRange must be one");
@@ -74,7 +94,7 @@ public class PartitionBalanceAssignmentStrategy extends 
AbstractAssignmentStrate
 
     SortedMap<PartitionRange, List<ServerNode>> assignments;
     synchronized (this) {
-      List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
+      List<ServerNode> nodes = clusterManager.getServerList(requiredTags, 
excludeServerNodes);
       Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos = 
JavaUtils.newConcurrentMap();
       for (ServerNode node : nodes) {
         newPartitionInfos.computeIfAbsent(
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index aed654d14..4b892ede8 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -22,8 +22,12 @@ import java.io.FileWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.BeforeAll;
@@ -35,10 +39,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.ReconfigurableBase;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.SimpleClusterManager;
+import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -185,4 +191,27 @@ public class CoordinatorAssignmentTest extends 
CoordinatorTestBase {
     }
     Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
   }
+
+  @Test
+  public void testGetReShuffleAssignments() {
+    ShuffleWriteClientImpl shuffleWriteClient =
+        new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1, 1, 1, 
1, true, 1, 1, 10, 10);
+    shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
+    Set<String> excludeServer = Sets.newConcurrentHashSet();
+    List<ShuffleServer> excludeShuffleServer =
+        shuffleServers.stream().limit(3).collect(Collectors.toList());
+    excludeShuffleServer.stream().map(ss -> 
ss.getId()).peek(excludeServer::add);
+    ShuffleAssignmentsInfo shuffleAssignmentsInfo =
+        shuffleWriteClient.getShuffleAssignments(
+            "app1", 0, 10, 1, TAGS, SERVER_NUM + 10, -1, excludeServer);
+    List<ShuffleServerInfo> resultShuffle = Lists.newArrayList();
+    for (List<ShuffleServerInfo> ssis : 
shuffleAssignmentsInfo.getPartitionToServers().values()) {
+      resultShuffle.addAll(ssis);
+    }
+
+    List<String> resultShuffleServerId =
+        resultShuffle.stream().map(a -> 
a.getId()).collect(Collectors.toList());
+    assertEquals(true, resultShuffleServerId.retainAll(excludeServer));
+    assertEquals(true, resultShuffleServerId.isEmpty());
+  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index bf2209b18..c3da5f484 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -171,8 +171,8 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
       int dataReplica,
       Set<String> requiredTags,
       int assignmentShuffleServerNumber,
-      int estimateTaskConcurrency) {
-
+      int estimateTaskConcurrency,
+      Set<String> faultyServerIds) {
     RssProtos.GetShuffleServerRequest getServerRequest =
         RssProtos.GetShuffleServerRequest.newBuilder()
             .setApplicationId(appId)
@@ -183,6 +183,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             .addAllRequireTags(requiredTags)
             .setAssignmentShuffleServerNumber(assignmentShuffleServerNumber)
             .setEstimateTaskConcurrency(estimateTaskConcurrency)
+            .addAllFaultyServerIds(faultyServerIds)
             .build();
 
     return blockingStub.getShuffleAssignments(getServerRequest);
@@ -275,7 +276,8 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             request.getDataReplica(),
             request.getRequiredTags(),
             request.getAssignmentShuffleServerNumber(),
-            request.getEstimateTaskConcurrency());
+            request.getEstimateTaskConcurrency(),
+            request.getFaultyServerIds());
 
     RssGetShuffleAssignmentsResponse response;
     RssProtos.StatusCode statusCode = rpcResponse.getStatus();
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
index 0cbf55404..5b1fa4722 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.client.request;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 
 public class RssGetShuffleAssignmentsRequest {
 
@@ -31,6 +32,7 @@ public class RssGetShuffleAssignmentsRequest {
   private Set<String> requiredTags;
   private int assignmentShuffleServerNumber;
   private int estimateTaskConcurrency;
+  private Set<String> faultyServerIds;
 
   @VisibleForTesting
   public RssGetShuffleAssignmentsRequest(
@@ -40,7 +42,16 @@ public class RssGetShuffleAssignmentsRequest {
       int partitionNumPerRange,
       int dataReplica,
       Set<String> requiredTags) {
-    this(appId, shuffleId, partitionNum, partitionNumPerRange, dataReplica, 
requiredTags, -1, -1);
+    this(
+        appId,
+        shuffleId,
+        partitionNum,
+        partitionNumPerRange,
+        dataReplica,
+        requiredTags,
+        -1,
+        -1,
+        Sets.newHashSet());
   }
 
   public RssGetShuffleAssignmentsRequest(
@@ -51,7 +62,8 @@ public class RssGetShuffleAssignmentsRequest {
       int dataReplica,
       Set<String> requiredTags,
       int assignmentShuffleServerNumber,
-      int estimateTaskConcurrency) {
+      int estimateTaskConcurrency,
+      Set<String> faultyServerIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionNum = partitionNum;
@@ -60,6 +72,7 @@ public class RssGetShuffleAssignmentsRequest {
     this.requiredTags = requiredTags;
     this.assignmentShuffleServerNumber = assignmentShuffleServerNumber;
     this.estimateTaskConcurrency = estimateTaskConcurrency;
+    this.faultyServerIds = faultyServerIds;
   }
 
   public String getAppId() {
@@ -93,4 +106,8 @@ public class RssGetShuffleAssignmentsRequest {
   public int getEstimateTaskConcurrency() {
     return estimateTaskConcurrency;
   }
+
+  public Set<String> getFaultyServerIds() {
+    return faultyServerIds;
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index d1e664645..1c8dfdd24 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -391,6 +391,7 @@ message GetShuffleServerRequest {
   repeated string requireTags = 9;
   int32 assignmentShuffleServerNumber = 10;
   int32 estimateTaskConcurrency = 11;
+  repeated string faultyServerIds = 12;
 }
 
 message PartitionRangeAssignment {

Reply via email to