This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fa6d1625c [#825][part-1] feat(spark): Add the RPC interface for
reassigning ShuffleServer (#1137)
fa6d1625c is described below
commit fa6d1625c8426181f13c45a87607af4e31f49458
Author: yl09099 <[email protected]>
AuthorDate: Sun Oct 8 14:18:28 2023 +0800
[#825][part-1] feat(spark): Add the RPC interface for reassigning
ShuffleServer (#1137)
### What changes were proposed in this pull request?
We need to provide an RPC interface for reassigning ShuffleServer, with
support for excluding some ShuffleServer lists.
Ⅰ. Overall objective:
1. During the shuffle write phase, the ShuffleServer reports faulty nodes
and reallocates the ShuffleServer list;
2. Triggers a Stage level retry of SPARK. The shuffleServer node is
excluded and reallocated before the retry.
Ⅱ. Implementation logic diagram:

Ⅲ. As shown in the picture above:
1. During Shuffle registration, obtain the ShuffleServer list to be written
through the RPC interface of a Coordinator Client by following the solid blue
line step. The list is bound using ShuffleID.
2, the Task of Stage starts, solid steps, in accordance with the green by
ShuffleManager Client RPC interface gets to be written for
shuffleIdToShuffleHandleInfo ShuffleServer list;
3. In the Stage, if Task fails to write blocks to the ShuffleServer, press
the steps in red to report ShuffleServer to FailedShuffleServerList in
RSSShuffleManager through the RPC interface.
4. FailedShuffleServerList records the number of ShuffleServer failures.
After the number of failures reaches the maximum number of retries of the Task
level, follow the steps in dotted orange lines. Through the RPC interface of a
Coordinator Client, obtain the list of ShuffleServer files to be written (the
ShuffleServer files that fail to be written are excluded). After obtaining the
list, go to Step 5 of the dotted orange line. Throwing a FetchFailed Exception
triggers a stage-level [...]
5. Attempt 1 is generated by the SPARK Stage level again. Pull the
corresponding ShuffleServer list according to the green dotted line.
### Why are the changes needed?
Such an interface is required when dynamically allocating a shuffleServer
list.
Fix: #825
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add UT.
---
.../uniffle/client/api/ShuffleWriteClient.java | 14 +++++++++
.../client/impl/ShuffleWriteClientImpl.java | 24 ++++++++++++++-
.../apache/uniffle/coordinator/ClusterManager.java | 9 ++++++
.../coordinator/CoordinatorGrpcService.java | 10 +++++--
.../uniffle/coordinator/SimpleClusterManager.java | 22 ++++++++++++++
.../strategy/assignment/AssignmentStrategy.java | 9 ++++++
.../assignment/BasicAssignmentStrategy.java | 34 ++++++++++++++++++++--
.../PartitionBalanceAssignmentStrategy.java | 22 +++++++++++++-
.../uniffle/test/CoordinatorAssignmentTest.java | 29 ++++++++++++++++++
.../client/impl/grpc/CoordinatorGrpcClient.java | 8 +++--
.../request/RssGetShuffleAssignmentsRequest.java | 21 +++++++++++--
proto/src/main/proto/Rss.proto | 1 +
12 files changed, 190 insertions(+), 13 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index ce7c53785..b28320ca2 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -69,6 +69,20 @@ public interface ShuffleWriteClient {
Map<Integer, List<Long>> partitionToBlockIds,
int bitmapNum);
+ default ShuffleAssignmentsInfo getShuffleAssignments(
+ String appId,
+ int shuffleId,
+ int partitionNum,
+ int partitionNumPerRange,
+ Set<String> requiredTags,
+ int assignmentShuffleServerNumber,
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName()
+ + " doesn't implement getShuffleAssignments with faultyServerIds");
+ }
+
ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
int shuffleId,
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 71ee284ab..0c0b278c8 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -614,6 +614,27 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency) {
+ return getShuffleAssignments(
+ appId,
+ shuffleId,
+ partitionNum,
+ partitionNumPerRange,
+ requiredTags,
+ assignmentShuffleServerNumber,
+ estimateTaskConcurrency,
+ Sets.newConcurrentHashSet());
+ }
+
+ @Override
+ public ShuffleAssignmentsInfo getShuffleAssignments(
+ String appId,
+ int shuffleId,
+ int partitionNum,
+ int partitionNumPerRange,
+ Set<String> requiredTags,
+ int assignmentShuffleServerNumber,
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds) {
RssGetShuffleAssignmentsRequest request =
new RssGetShuffleAssignmentsRequest(
appId,
@@ -623,7 +644,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
replica,
requiredTags,
assignmentShuffleServerNumber,
- estimateTaskConcurrency);
+ estimateTaskConcurrency,
+ faultyServerIds);
RssGetShuffleAssignmentsResponse response =
new RssGetShuffleAssignmentsResponse(StatusCode.INTERNAL_ERROR);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
index 510312d4f..5cd76c7c1 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
@@ -40,6 +40,15 @@ public interface ClusterManager extends Closeable,
Reconfigurable {
*/
List<ServerNode> getServerList(Set<String> requiredTags);
+ /**
+ * Get available nodes from the cluster
+ *
+ * @param requiredTags tags for filter
+ * @param faultyServerIds servernode information that is excluded because of
an exception
+ * @return list of available server nodes
+ */
+ List<ServerNode> getServerList(Set<String> requiredTags, Set<String>
faultyServerIds);
+
/**
* Get lost nodes from the cluster
*
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index b658c329b..dffc3ff16 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.coordinator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -111,17 +112,19 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
final Set<String> requiredTags =
Sets.newHashSet(request.getRequireTagsList());
final int requiredShuffleServerNumber =
request.getAssignmentShuffleServerNumber();
final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
+ final Set<String> faultyServerIds = new
HashSet<>(request.getFaultyServerIdsList());
LOG.info(
"Request of getShuffleAssignments for appId[{}], shuffleId[{}],
partitionNum[{}], "
- + " partitionNumPerRange[{}], replica[{}], requiredTags[{}],
requiredShuffleServerNumber[{}]",
+ + " partitionNumPerRange[{}], replica[{}], requiredTags[{}],
requiredShuffleServerNumber[{}],faultyServerIds[{}]",
appId,
shuffleId,
partitionNum,
partitionNumPerRange,
replica,
requiredTags,
- requiredShuffleServerNumber);
+ requiredShuffleServerNumber,
+ faultyServerIds.size());
GetShuffleAssignmentsResponse response;
try {
@@ -138,7 +141,8 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
replica,
requiredTags,
requiredShuffleServerNumber,
- estimateTaskConcurrency);
+ estimateTaskConcurrency,
+ faultyServerIds);
response = CoordinatorUtils.toGetShuffleAssignmentsResponse(pra);
logAssignmentResult(appId, shuffleId, pra);
responseObserver.onNext(response);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 93b94024c..cc5c1f7d7 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -253,6 +253,28 @@ public class SimpleClusterManager implements
ClusterManager {
return availableNodes;
}
+ @Override
+ public List<ServerNode> getServerList(Set<String> requiredTags, Set<String>
faultyServerIds) {
+ List<ServerNode> availableNodes = Lists.newArrayList();
+ for (ServerNode node : servers.values()) {
+ if (!ServerStatus.ACTIVE.equals(node.getStatus())) {
+ continue;
+ }
+ if (isNodeAvailable(requiredTags, faultyServerIds, node)) {
+ availableNodes.add(node);
+ }
+ }
+ return availableNodes;
+ }
+
+ private boolean isNodeAvailable(
+ Set<String> requiredTags, Set<String> faultyServerIds, ServerNode node) {
+ if (faultyServerIds != null && faultyServerIds.contains(node.getId())) {
+ return false;
+ }
+ return !excludeNodes.contains(node.getId()) &&
node.getTags().containsAll(requiredTags);
+ }
+
@Override
public List<ServerNode> getLostServerList() {
return Lists.newArrayList(lostNodes);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
index 4ae08cc56..fc2545cb7 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
@@ -28,4 +28,13 @@ public interface AssignmentStrategy {
Set<String> requiredTags,
int requiredShuffleServerNumber,
int estimateTaskConcurrency);
+
+ PartitionRangeAssignment assign(
+ int totalPartitionNum,
+ int partitionNumPerRange,
+ int replica,
+ Set<String> requiredTags,
+ int requiredShuffleServerNumber,
+ int estimateTaskConcurrency,
+ Set<String> serversList);
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
index ee6d90ef5..e5b45c81d 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Set;
import java.util.SortedMap;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,12 +50,32 @@ public class BasicAssignmentStrategy extends
AbstractAssignmentStrategy {
Set<String> requiredTags,
int requiredShuffleServerNumber,
int estimateTaskConcurrency) {
+ return assign(
+ totalPartitionNum,
+ partitionNumPerRange,
+ replica,
+ requiredTags,
+ requiredShuffleServerNumber,
+ estimateTaskConcurrency,
+ Sets.newConcurrentHashSet());
+ }
+
+ @Override
+ public PartitionRangeAssignment assign(
+ int totalPartitionNum,
+ int partitionNumPerRange,
+ int replica,
+ Set<String> requiredTags,
+ int requiredShuffleServerNumber,
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds) {
int shuffleNodesMax = clusterManager.getShuffleNodesMax();
int expectedShuffleNodesNum = shuffleNodesMax;
if (requiredShuffleServerNumber < shuffleNodesMax &&
requiredShuffleServerNumber > 0) {
expectedShuffleNodesNum = requiredShuffleServerNumber;
}
- List<ServerNode> servers = getRequiredServers(requiredTags,
expectedShuffleNodesNum);
+ List<ServerNode> servers =
+ getRequiredServers(requiredTags, expectedShuffleNodesNum,
faultyServerIds);
if (servers.isEmpty() || servers.size() < replica) {
return new PartitionRangeAssignment(null);
}
@@ -66,8 +87,15 @@ public class BasicAssignmentStrategy extends
AbstractAssignmentStrategy {
return new PartitionRangeAssignment(assignments);
}
- private List<ServerNode> getRequiredServers(Set<String> requiredTags, int
expectedNum) {
- List<ServerNode> servers = clusterManager.getServerList(requiredTags);
+ /**
+ * @param requiredTags filter of tags
+ * @param expectedNum the number of expect servernode
+ * @param faultyServerIds servernode information that is excluded because of
an exception
+ * @return
+ */
+ private List<ServerNode> getRequiredServers(
+ Set<String> requiredTags, int expectedNum, Set<String> faultyServerIds) {
+ List<ServerNode> servers = clusterManager.getServerList(requiredTags,
faultyServerIds);
// shuffle server update the status according to heartbeat, if every
server is in initial
// status,
// random the order of list to avoid always pick same nodes
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
index d155681b0..402df7799 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.SortedMap;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +68,25 @@ public class PartitionBalanceAssignmentStrategy extends
AbstractAssignmentStrate
Set<String> requiredTags,
int requiredShuffleServerNumber,
int estimateTaskConcurrency) {
+ return assign(
+ totalPartitionNum,
+ partitionNumPerRange,
+ replica,
+ requiredTags,
+ requiredShuffleServerNumber,
+ estimateTaskConcurrency,
+ Sets.newConcurrentHashSet());
+ }
+
+ @Override
+ public PartitionRangeAssignment assign(
+ int totalPartitionNum,
+ int partitionNumPerRange,
+ int replica,
+ Set<String> requiredTags,
+ int requiredShuffleServerNumber,
+ int estimateTaskConcurrency,
+ Set<String> excludeServerNodes) {
if (partitionNumPerRange != 1) {
throw new RssException("PartitionNumPerRange must be one");
@@ -74,7 +94,7 @@ public class PartitionBalanceAssignmentStrategy extends
AbstractAssignmentStrate
SortedMap<PartitionRange, List<ServerNode>> assignments;
synchronized (this) {
- List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
+ List<ServerNode> nodes = clusterManager.getServerList(requiredTags,
excludeServerNodes);
Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos =
JavaUtils.newConcurrentMap();
for (ServerNode node : nodes) {
newPartitionInfos.computeIfAbsent(
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index aed654d14..4b892ede8 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -22,8 +22,12 @@ import java.io.FileWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.BeforeAll;
@@ -35,10 +39,12 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.ReconfigurableBase;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.SimpleClusterManager;
+import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
@@ -185,4 +191,27 @@ public class CoordinatorAssignmentTest extends
CoordinatorTestBase {
}
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
}
+
+ @Test
+ public void testGetReShuffleAssignments() {
+ ShuffleWriteClientImpl shuffleWriteClient =
+ new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1, 1, 1,
1, true, 1, 1, 10, 10);
+ shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
+ Set<String> excludeServer = Sets.newConcurrentHashSet();
+ List<ShuffleServer> excludeShuffleServer =
+ shuffleServers.stream().limit(3).collect(Collectors.toList());
+ excludeShuffleServer.stream().map(ss ->
ss.getId()).peek(excludeServer::add);
+ ShuffleAssignmentsInfo shuffleAssignmentsInfo =
+ shuffleWriteClient.getShuffleAssignments(
+ "app1", 0, 10, 1, TAGS, SERVER_NUM + 10, -1, excludeServer);
+ List<ShuffleServerInfo> resultShuffle = Lists.newArrayList();
+ for (List<ShuffleServerInfo> ssis :
shuffleAssignmentsInfo.getPartitionToServers().values()) {
+ resultShuffle.addAll(ssis);
+ }
+
+ List<String> resultShuffleServerId =
+ resultShuffle.stream().map(a ->
a.getId()).collect(Collectors.toList());
+ assertEquals(true, resultShuffleServerId.retainAll(excludeServer));
+ assertEquals(true, resultShuffleServerId.isEmpty());
+ }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index bf2209b18..c3da5f484 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -171,8 +171,8 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
int dataReplica,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
- int estimateTaskConcurrency) {
-
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds) {
RssProtos.GetShuffleServerRequest getServerRequest =
RssProtos.GetShuffleServerRequest.newBuilder()
.setApplicationId(appId)
@@ -183,6 +183,7 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
.addAllRequireTags(requiredTags)
.setAssignmentShuffleServerNumber(assignmentShuffleServerNumber)
.setEstimateTaskConcurrency(estimateTaskConcurrency)
+ .addAllFaultyServerIds(faultyServerIds)
.build();
return blockingStub.getShuffleAssignments(getServerRequest);
@@ -275,7 +276,8 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
request.getDataReplica(),
request.getRequiredTags(),
request.getAssignmentShuffleServerNumber(),
- request.getEstimateTaskConcurrency());
+ request.getEstimateTaskConcurrency(),
+ request.getFaultyServerIds());
RssGetShuffleAssignmentsResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
index 0cbf55404..5b1fa4722 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.client.request;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
public class RssGetShuffleAssignmentsRequest {
@@ -31,6 +32,7 @@ public class RssGetShuffleAssignmentsRequest {
private Set<String> requiredTags;
private int assignmentShuffleServerNumber;
private int estimateTaskConcurrency;
+ private Set<String> faultyServerIds;
@VisibleForTesting
public RssGetShuffleAssignmentsRequest(
@@ -40,7 +42,16 @@ public class RssGetShuffleAssignmentsRequest {
int partitionNumPerRange,
int dataReplica,
Set<String> requiredTags) {
- this(appId, shuffleId, partitionNum, partitionNumPerRange, dataReplica,
requiredTags, -1, -1);
+ this(
+ appId,
+ shuffleId,
+ partitionNum,
+ partitionNumPerRange,
+ dataReplica,
+ requiredTags,
+ -1,
+ -1,
+ Sets.newHashSet());
}
public RssGetShuffleAssignmentsRequest(
@@ -51,7 +62,8 @@ public class RssGetShuffleAssignmentsRequest {
int dataReplica,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
- int estimateTaskConcurrency) {
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionNum = partitionNum;
@@ -60,6 +72,7 @@ public class RssGetShuffleAssignmentsRequest {
this.requiredTags = requiredTags;
this.assignmentShuffleServerNumber = assignmentShuffleServerNumber;
this.estimateTaskConcurrency = estimateTaskConcurrency;
+ this.faultyServerIds = faultyServerIds;
}
public String getAppId() {
@@ -93,4 +106,8 @@ public class RssGetShuffleAssignmentsRequest {
public int getEstimateTaskConcurrency() {
return estimateTaskConcurrency;
}
+
+ public Set<String> getFaultyServerIds() {
+ return faultyServerIds;
+ }
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index d1e664645..1c8dfdd24 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -391,6 +391,7 @@ message GetShuffleServerRequest {
repeated string requireTags = 9;
int32 assignmentShuffleServerNumber = 10;
int32 estimateTaskConcurrency = 11;
+ repeated string faultyServerIds = 12;
}
message PartitionRangeAssignment {