This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7794f0f91 [#1791] feat(spark)(coordinator): Take more infos on getting
assignment to track app reassign/stageRetry (#1792)
7794f0f91 is described below
commit 7794f0f913e221361a6d903e1e0fedbf27ef32b5
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jun 18 21:07:39 2024 +0800
[#1791] feat(spark)(coordinator): Take more infos on getting assignment to
track app reassign/stageRetry (#1792)
### What changes were proposed in this pull request?
Take more infos on getting assignment to track app reassign/stageRetry
### Why are the changes needed?
For #1791
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
---
.../hadoop/mapred/SortWriteBufferManagerTest.java | 6 +-
.../hadoop/mapreduce/task/reduce/FetcherTest.java | 6 +-
.../shuffle/manager/RssShuffleManagerBase.java | 66 +++++++++++++++++++---
.../manager/RssShuffleManagerInterface.java | 5 +-
.../shuffle/manager/ShuffleManagerGrpcService.java | 2 +
.../shuffle/manager/DummyRssShuffleManager.java | 5 +-
.../common/sort/buffer/WriteBufferManagerTest.java | 6 +-
.../uniffle/client/api/ShuffleWriteClient.java | 43 ++++++++++++--
.../client/impl/ShuffleWriteClientImpl.java | 30 +++-------
.../coordinator/CoordinatorGrpcService.java | 10 +++-
.../client/impl/grpc/CoordinatorGrpcClient.java | 13 ++++-
.../request/RssGetShuffleAssignmentsRequest.java | 26 ++++++++-
proto/src/main/proto/Rss.proto | 3 +
13 files changed, 173 insertions(+), 48 deletions(-)
diff --git
a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 430e2ff58..3cb017239 100644
---
a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++
b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -574,7 +574,11 @@ public class SortWriteBufferManagerTest {
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
- int estimateTaskConcurrency) {
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds,
+ int stageId,
+ int stageAttemptNumber,
+ boolean reassign) {
return null;
}
diff --git
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 7664b47d6..6c5f6776d 100644
---
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -542,7 +542,11 @@ public class FetcherTest {
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
- int estimateTaskConcurrency) {
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds,
+ int stageId,
+ int stageAttemptNumber,
+ boolean reassign) {
return null;
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 912bf998f..197035e3a 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -682,7 +682,9 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
requiredShuffleServerNumber,
estimateTaskConcurrency,
rssStageResubmitManager.getServerIdBlackList(),
- stageAttemptNumber);
+ stageId,
+ stageAttemptNumber,
+ false);
/**
* we need to clear the metadata of the completed task, otherwise some
of the stage's data
* will be lost
@@ -713,7 +715,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
/** this is only valid on driver side that exposed to being invoked by grpc
server */
@Override
public MutableShuffleHandleInfo reassignOnBlockSendFailure(
- int shuffleId, Map<Integer, List<ReceivingFailureServer>>
partitionToFailureServers) {
+ int stageId,
+ int stageAttemptNumber,
+ int shuffleId,
+ Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
long startTime = System.currentTimeMillis();
MutableShuffleHandleInfo handleInfo =
(MutableShuffleHandleInfo) shuffleHandleInfoManager.get(shuffleId);
@@ -742,7 +747,13 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
excludedServers.add(serverId);
replacements =
reassignServerForTask(
- shuffleId, Sets.newHashSet(partitionId), excludedServers,
requiredServerNum);
+ stageId,
+ stageAttemptNumber,
+ shuffleId,
+ Sets.newHashSet(partitionId),
+ excludedServers,
+ requiredServerNum,
+ true);
} else {
serverHasReplaced = true;
}
@@ -805,10 +816,13 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
/** Request the new shuffle-servers to replace faulty server. */
private Set<ShuffleServerInfo> reassignServerForTask(
+ int stageId,
+ int stageAttemptNumber,
int shuffleId,
Set<Integer> partitionIds,
Set<String> excludedServers,
- int requiredServerNum) {
+ int requiredServerNum,
+ boolean reassign) {
AtomicReference<Set<ShuffleServerInfo>> replacementsRef =
new AtomicReference<>(new HashSet<>());
requestShuffleAssignment(
@@ -828,7 +842,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
.collect(Collectors.toSet());
replacementsRef.set(replacements);
return createShuffleAssignmentsInfo(replacements, partitionIds);
- });
+ },
+ stageId,
+ stageAttemptNumber,
+ reassign);
return replacementsRef.get();
}
@@ -839,7 +856,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds,
- Function<ShuffleAssignmentsInfo, ShuffleAssignmentsInfo>
reassignmentHandler) {
+ Function<ShuffleAssignmentsInfo, ShuffleAssignmentsInfo>
reassignmentHandler,
+ int stageId,
+ int stageAttemptNumber,
+ boolean reassign) {
Set<String> assignmentTags =
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
ClientUtils.validateClientType(clientType);
assignmentTags.add(clientType);
@@ -858,7 +878,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
assignmentTags,
assignmentShuffleServerNumber,
estimateTaskConcurrency,
- faultyServerIds);
+ faultyServerIds,
+ stageId,
+ stageAttemptNumber,
+ reassign);
LOG.info("Finished reassign");
if (reassignmentHandler != null) {
response = reassignmentHandler.apply(response);
@@ -881,7 +904,9 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds,
- int stageAttemptNumber) {
+ int stageId,
+ int stageAttemptNumber,
+ boolean reassign) {
Set<String> assignmentTags =
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
ClientUtils.validateClientType(clientType);
assignmentTags.add(clientType);
@@ -901,7 +926,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
assignmentTags,
assignmentShuffleServerNumber,
estimateTaskConcurrency,
- faultyServerIds);
+ faultyServerIds,
+ stageId,
+ stageAttemptNumber,
+ reassign);
registerShuffleServers(
appId,
shuffleId,
@@ -917,6 +945,26 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
}
}
+ protected Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
+ int shuffleId,
+ int partitionNum,
+ int partitionNumPerRange,
+ int assignmentShuffleServerNumber,
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds,
+ int stageAttemptNumber) {
+ return requestShuffleAssignment(
+ shuffleId,
+ partitionNum,
+ partitionNumPerRange,
+ assignmentShuffleServerNumber,
+ estimateTaskConcurrency,
+ faultyServerIds,
+ -1,
+ stageAttemptNumber,
+ false);
+ }
+
protected void registerShuffleServers(
String appId,
int shuffleId,
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
index 52ded5db7..77379efb5 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
@@ -83,5 +83,8 @@ public interface RssShuffleManagerInterface {
boolean reassignOnStageResubmit(int stageId, int stageAttemptNumber, int
shuffleId, int numMaps);
MutableShuffleHandleInfo reassignOnBlockSendFailure(
- int shuffleId, Map<Integer, List<ReceivingFailureServer>>
partitionToFailureServers);
+ int stageId,
+ int stageAttemptNumber,
+ int shuffleId,
+ Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers);
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 3a2f58cca..b9828408c 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -275,6 +275,8 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
request.getExecutorId());
MutableShuffleHandleInfo handle =
shuffleManager.reassignOnBlockSendFailure(
+ request.getStageId(),
+ request.getStageAttemptNumber(),
request.getShuffleId(),
request.getFailurePartitionToServerIdsMap().entrySet().stream()
.collect(
diff --git
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
index df40e9d16..317d0cd9e 100644
---
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
+++
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
@@ -77,7 +77,10 @@ public class DummyRssShuffleManager implements
RssShuffleManagerInterface {
@Override
public MutableShuffleHandleInfo reassignOnBlockSendFailure(
- int shuffleId, Map<Integer, List<ReceivingFailureServer>>
partitionToFailureServers) {
+ int stageId,
+ int stageAttemptNumber,
+ int shuffleId,
+ Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
return null;
}
}
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index d0eab5ae1..70c1a8aed 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -645,7 +645,11 @@ public class WriteBufferManagerTest {
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
- int estimateTaskConcurrency) {
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds,
+ int stageId,
+ int stageAttemptNumber,
+ boolean reassign) {
return null;
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index efd39e35a..db0c91484 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.client.api;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -99,6 +100,19 @@ public interface ShuffleWriteClient {
long taskAttemptId,
int bitmapNum);
+ ShuffleAssignmentsInfo getShuffleAssignments(
+ String appId,
+ int shuffleId,
+ int partitionNum,
+ int partitionNumPerRange,
+ Set<String> requiredTags,
+ int assignmentShuffleServerNumber,
+ int estimateTaskConcurrency,
+ Set<String> faultyServerIds,
+ int stageId,
+ int stageAttemptNumber,
+ boolean reassign);
+
default ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
int shuffleId,
@@ -108,19 +122,38 @@ public interface ShuffleWriteClient {
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds) {
- throw new UnsupportedOperationException(
- this.getClass().getName()
- + " doesn't implement getShuffleAssignments with faultyServerIds");
+ return getShuffleAssignments(
+ appId,
+ shuffleId,
+ partitionNum,
+ partitionNumPerRange,
+ requiredTags,
+ assignmentShuffleServerNumber,
+ estimateTaskConcurrency,
+ faultyServerIds,
+ -1,
+ 0,
+ false);
}
- ShuffleAssignmentsInfo getShuffleAssignments(
+ default ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
int shuffleId,
int partitionNum,
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
- int estimateTaskConcurrency);
+ int estimateTaskConcurrency) {
+ return getShuffleAssignments(
+ appId,
+ shuffleId,
+ partitionNum,
+ partitionNumPerRange,
+ requiredTags,
+ assignmentShuffleServerNumber,
+ estimateTaskConcurrency,
+ Collections.emptySet());
+ }
Roaring64NavigableMap getShuffleResult(
String clientType,
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index a79a8557a..b16b88168 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -635,26 +635,6 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
return remoteStorage;
}
- @Override
- public ShuffleAssignmentsInfo getShuffleAssignments(
- String appId,
- int shuffleId,
- int partitionNum,
- int partitionNumPerRange,
- Set<String> requiredTags,
- int assignmentShuffleServerNumber,
- int estimateTaskConcurrency) {
- return getShuffleAssignments(
- appId,
- shuffleId,
- partitionNum,
- partitionNumPerRange,
- requiredTags,
- assignmentShuffleServerNumber,
- estimateTaskConcurrency,
- Sets.newConcurrentHashSet());
- }
-
@Override
public ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
@@ -664,7 +644,10 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
- Set<String> faultyServerIds) {
+ Set<String> faultyServerIds,
+ int stageId,
+ int stageAttemptNumber,
+ boolean reassign) {
RssGetShuffleAssignmentsRequest request =
new RssGetShuffleAssignmentsRequest(
appId,
@@ -675,7 +658,10 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
requiredTags,
assignmentShuffleServerNumber,
estimateTaskConcurrency,
- faultyServerIds);
+ faultyServerIds,
+ stageId,
+ stageAttemptNumber,
+ reassign);
RssGetShuffleAssignmentsResponse response =
new RssGetShuffleAssignmentsResponse(StatusCode.INTERNAL_ERROR);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 0b4b9d69e..ddca8a1e3 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -117,8 +117,9 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
final Set<String> faultyServerIds = new
HashSet<>(request.getFaultyServerIdsList());
LOG.info(
- "Request of getShuffleAssignments for appId[{}], shuffleId[{}],
partitionNum[{}], "
- + " partitionNumPerRange[{}], replica[{}], requiredTags[{}],
requiredShuffleServerNumber[{}],faultyServerIds[{}]",
+ "Request of getShuffleAssignments for appId[{}], shuffleId[{}],
partitionNum[{}],"
+ + " partitionNumPerRange[{}], replica[{}], requiredTags[{}],
requiredShuffleServerNumber[{}],"
+ + " faultyServerIds[{}], stageId[{}], stageAttemptNumber[{}],
isReassign[{}]",
appId,
shuffleId,
partitionNum,
@@ -126,7 +127,10 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
replica,
requiredTags,
requiredShuffleServerNumber,
- faultyServerIds.size());
+ faultyServerIds.size(),
+ request.getStageId(),
+ request.getStageAttemptNumber(),
+ request.getReassign());
GetShuffleAssignmentsResponse response;
try {
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index be6e70df3..484bb43d7 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -178,7 +178,10 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
- Set<String> faultyServerIds) {
+ Set<String> faultyServerIds,
+ int stageId,
+ int stageAttemptNumber,
+ boolean reassign) {
RssProtos.GetShuffleServerRequest getServerRequest =
RssProtos.GetShuffleServerRequest.newBuilder()
.setApplicationId(appId)
@@ -190,6 +193,9 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
.setAssignmentShuffleServerNumber(assignmentShuffleServerNumber)
.setEstimateTaskConcurrency(estimateTaskConcurrency)
.addAllFaultyServerIds(faultyServerIds)
+ .setStageId(stageId)
+ .setStageAttemptNumber(stageAttemptNumber)
+ .setReassign(reassign)
.build();
return blockingStub.getShuffleAssignments(getServerRequest);
@@ -283,7 +289,10 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
request.getRequiredTags(),
request.getAssignmentShuffleServerNumber(),
request.getEstimateTaskConcurrency(),
- request.getFaultyServerIds());
+ request.getFaultyServerIds(),
+ request.getStageId(),
+ request.getStageAttemptNumber(),
+ request.isReassign());
RssGetShuffleAssignmentsResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
index 4cbdc4448..b80b1f9c0 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
@@ -33,6 +33,9 @@ public class RssGetShuffleAssignmentsRequest {
private int assignmentShuffleServerNumber;
private int estimateTaskConcurrency;
private Set<String> faultyServerIds;
+ private int stageId = -1;
+ private int stageAttemptNumber = 0;
+ private boolean reassign = false;
@VisibleForTesting
public RssGetShuffleAssignmentsRequest(
@@ -74,7 +77,9 @@ public class RssGetShuffleAssignmentsRequest {
assignmentShuffleServerNumber,
estimateTaskConcurrency,
faultyServerIds,
- 0);
+ -1,
+ 0,
+ false);
}
public RssGetShuffleAssignmentsRequest(
@@ -87,7 +92,9 @@ public class RssGetShuffleAssignmentsRequest {
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds,
- int stageAttemptNumber) {
+ int stageId,
+ int stageAttemptNumber,
+ boolean reassign) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionNum = partitionNum;
@@ -97,6 +104,9 @@ public class RssGetShuffleAssignmentsRequest {
this.assignmentShuffleServerNumber = assignmentShuffleServerNumber;
this.estimateTaskConcurrency = estimateTaskConcurrency;
this.faultyServerIds = faultyServerIds;
+ this.stageId = stageId;
+ this.stageAttemptNumber = stageAttemptNumber;
+ this.reassign = reassign;
}
public String getAppId() {
@@ -134,4 +144,16 @@ public class RssGetShuffleAssignmentsRequest {
public Set<String> getFaultyServerIds() {
return faultyServerIds;
}
+
+ public int getStageId() {
+ return stageId;
+ }
+
+ public int getStageAttemptNumber() {
+ return stageAttemptNumber;
+ }
+
+ public boolean isReassign() {
+ return reassign;
+ }
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index dc5bdf230..97928bf20 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -416,6 +416,9 @@ message GetShuffleServerRequest {
int32 assignmentShuffleServerNumber = 10;
int32 estimateTaskConcurrency = 11;
repeated string faultyServerIds = 12;
+ int32 stageId = 13;
+ int32 stageAttemptNumber = 14;
+ bool reassign = 15;
}
message PartitionRangeAssignment {