This is an automated email from the ASF dual-hosted git repository.
jerrylei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new adafac4da [#825][part-5] feat(spark): Adds the RPC interface to
reassign the ShuffleServer list. (#1146)
adafac4da is described below
commit adafac4dad84da6470c7707576ba3b85b1f301a8
Author: yl09099 <[email protected]>
AuthorDate: Fri Nov 3 14:12:04 2023 +0800
[#825][part-5] feat(spark): Adds the RPC interface to reassign the
ShuffleServer list. (#1146)
### What changes were proposed in this pull request?
Adds the RPC interface to reassign the ShuffleServer list.
### Why are the changes needed?
Fix: #825
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UT.
---
.../manager/RssShuffleManagerInterface.java | 2 +
.../shuffle/manager/ShuffleManagerGrpcService.java | 21 ++++++
.../shuffle/manager/DummyRssShuffleManager.java | 6 ++
.../apache/spark/shuffle/RssShuffleManager.java | 82 +++++++++++++++++++++
.../apache/spark/shuffle/RssShuffleManager.java | 83 ++++++++++++++++++++++
.../uniffle/client/api/ShuffleManagerClient.java | 4 ++
.../client/impl/grpc/ShuffleManagerGrpcClient.java | 10 +++
.../client/request/RssReassignServersRequest.java | 45 ++++++++++++
.../client/response/RssReassignServersReponse.java | 43 +++++++++++
proto/src/main/proto/Rss.proto | 15 ++++
10 files changed, 311 insertions(+)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
index 4940277c8..73fdbdb07 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
@@ -70,4 +70,6 @@ public interface RssShuffleManagerInterface {
* @param shuffleServerId
*/
void addFailuresShuffleServerInfos(String shuffleServerId);
+
+ boolean reassignShuffleServers(int stageId, int stageAttemptNumber, int
shuffleId, int numMaps);
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index a03c0439b..444e82cfb 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -230,6 +230,27 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
responseObserver.onCompleted();
}
+ @Override
+ public void reassignShuffleServers(
+ RssProtos.ReassignServersRequest request,
+ StreamObserver<RssProtos.ReassignServersReponse> responseObserver) {
+ int stageId = request.getStageId();
+ int stageAttemptNumber = request.getStageAttemptNumber();
+ int shuffleId = request.getShuffleId();
+ int numPartitions = request.getNumPartitions();
+ boolean needReassign =
+ shuffleManager.reassignShuffleServers(
+ stageId, stageAttemptNumber, shuffleId, numPartitions);
+ RssProtos.StatusCode code = RssProtos.StatusCode.SUCCESS;
+ RssProtos.ReassignServersReponse reply =
+ RssProtos.ReassignServersReponse.newBuilder()
+ .setStatus(code)
+ .setNeedReassign(needReassign)
+ .build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
/**
* Remove the no longer used shuffle id's rss shuffle status. This is called
when ShuffleManager
* unregisters the corresponding shuffle id.
diff --git
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
index 24980afbf..0b9e4f858 100644
---
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
+++
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
@@ -57,4 +57,10 @@ public class DummyRssShuffleManager implements
RssShuffleManagerInterface {
@Override
public void addFailuresShuffleServerInfos(String shuffleServerId) {}
+
+ @Override
+ public boolean reassignShuffleServers(
+ int stageId, int stageAttemptNumber, int shuffleId, int numMaps) {
+ return false;
+ }
}
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 2cdf8636c..2f7e5105b 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
+import org.apache.spark.SparkException;
import org.apache.spark.TaskContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.executor.ShuffleWriteMetrics;
@@ -125,6 +126,11 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
private boolean rssResubmitStage;
/** A list of shuffleServer for Write failures */
private Set<String> failuresShuffleServerIds = Sets.newHashSet();
+ /**
+ * Prevent multiple tasks from reporting FetchFailed, resulting in multiple
ShuffleServer
+ * assignments, stageID, Attemptnumber Whether to reassign the combination
flag;
+ */
+ private Map<String, Boolean> serverAssignedInfos =
JavaUtils.newConcurrentMap();
public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
if (sparkConf.getBoolean("spark.sql.adaptive.enabled", false)) {
@@ -806,4 +812,80 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
public void addFailuresShuffleServerInfos(String shuffleServerId) {
failuresShuffleServerIds.add(shuffleServerId);
}
+
+ /**
+ * Reassign the ShuffleServer list for ShuffleId
+ *
+ * @param shuffleId
+ * @param numPartitions
+ */
+ @Override
+ public synchronized boolean reassignShuffleServers(
+ int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
+ String stageIdAndAttempt = stageId + "_" + stageAttemptNumber;
+ Boolean needReassgin =
serverAssignedInfos.computeIfAbsent(stageIdAndAttempt, id -> false);
+ if (!needReassgin) {
+ String storageType =
sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
+ RemoteStorageInfo defaultRemoteStorage =
+ new
RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(),
""));
+ RemoteStorageInfo remoteStorage =
+ ClientUtils.fetchRemoteStorage(
+ appId, defaultRemoteStorage, dynamicConfEnabled, storageType,
shuffleWriteClient);
+ Set<String> assignmentTags =
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
+ ClientUtils.validateClientType(clientType);
+ assignmentTags.add(clientType);
+ int requiredShuffleServerNumber =
+ RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
+ long retryInterval =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
+ int retryTimes =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
+ int estimateTaskConcurrency =
RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
+ /** Before reassigning ShuffleServer, clear the ShuffleServer list in
ShuffleWriteClient. */
+ shuffleWriteClient.unregisterShuffle(appId, shuffleId);
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers;
+ try {
+ partitionToServers =
+ RetryUtils.retry(
+ () -> {
+ ShuffleAssignmentsInfo response =
+ shuffleWriteClient.getShuffleAssignments(
+ appId,
+ shuffleId,
+ numPartitions,
+ 1,
+ assignmentTags,
+ requiredShuffleServerNumber,
+ estimateTaskConcurrency,
+ failuresShuffleServerIds);
+ registerShuffleServers(
+ appId, shuffleId, response.getServerToPartitionRanges(),
remoteStorage);
+ return response.getPartitionToServers();
+ },
+ retryInterval,
+ retryTimes);
+ } catch (Throwable throwable) {
+ throw new RssException("registerShuffle failed!", throwable);
+ }
+ /**
+ * we need to clear the metadata of the completed task, otherwise some
of the stage's data
+ * will be lost
+ */
+ try {
+ unregisterAllMapOutput(shuffleId);
+ } catch (SparkException e) {
+ LOG.error("Clear MapoutTracker Meta failed!");
+ throw new RssException("Clear MapoutTracker Meta failed!", e);
+ }
+ ShuffleHandleInfo handleInfo =
+ new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
+ shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo);
+ serverAssignedInfos.put(stageIdAndAttempt, true);
+ return true;
+ } else {
+ LOG.info(
+ "The Stage:{} has been reassigned in an Attempt{},Return without
performing any operation",
+ stageId,
+ stageAttemptNumber);
+ return false;
+ }
+ }
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 3ee108fd3..ceac60063 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -43,6 +43,7 @@ import org.apache.spark.MapOutputTracker;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
+import org.apache.spark.SparkException;
import org.apache.spark.TaskContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.executor.ShuffleReadMetrics;
@@ -136,6 +137,11 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
private boolean rssResubmitStage;
/** A list of shuffleServer for Write failures */
private Set<String> failuresShuffleServerIds;
+ /**
+ * Prevent multiple tasks from reporting FetchFailed, resulting in multiple
ShuffleServer
+ * assignments, stageID, Attemptnumber Whether to reassign the combination
flag;
+ */
+ private Map<String, Boolean> serverAssignedInfos;
public RssShuffleManager(SparkConf conf, boolean isDriver) {
this.sparkConf = conf;
@@ -263,6 +269,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
poolSize,
keepAliveTime);
this.shuffleIdToShuffleHandleInfo = JavaUtils.newConcurrentMap();
+ this.failuresShuffleServerIds = Sets.newHashSet();
+ this.serverAssignedInfos = JavaUtils.newConcurrentMap();
}
public CompletableFuture<Long> sendData(AddBlockEvent event) {
@@ -1104,4 +1112,79 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
public void addFailuresShuffleServerInfos(String shuffleServerId) {
failuresShuffleServerIds.add(shuffleServerId);
}
+
+ /**
+ * Reassign the ShuffleServer list for ShuffleId
+ *
+ * @param shuffleId
+ * @param numPartitions
+ */
+ @Override
+ public synchronized boolean reassignShuffleServers(
+ int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
+ String stageIdAndAttempt = stageId + "_" + stageAttemptNumber;
+ Boolean needReassgin =
serverAssignedInfos.computeIfAbsent(stageIdAndAttempt, id -> false);
+ if (!needReassgin) {
+ String storageType =
sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
+ RemoteStorageInfo defaultRemoteStorage =
+ new
RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(),
""));
+ RemoteStorageInfo remoteStorage =
+ ClientUtils.fetchRemoteStorage(
+ id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType,
shuffleWriteClient);
+ Set<String> assignmentTags =
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
+ int requiredShuffleServerNumber =
+ RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
+ long retryInterval =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
+ int retryTimes =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
+ int estimateTaskConcurrency =
RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
+ /** Before reassigning ShuffleServer, clear the ShuffleServer list in
ShuffleWriteClient. */
+ shuffleWriteClient.unregisterShuffle(id.get(), shuffleId);
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers;
+ try {
+ partitionToServers =
+ RetryUtils.retry(
+ () -> {
+ ShuffleAssignmentsInfo response =
+ shuffleWriteClient.getShuffleAssignments(
+ id.get(),
+ shuffleId,
+ numPartitions,
+ 1,
+ assignmentTags,
+ requiredShuffleServerNumber,
+ estimateTaskConcurrency,
+ failuresShuffleServerIds);
+ registerShuffleServers(
+ id.get(), shuffleId,
response.getServerToPartitionRanges(), remoteStorage);
+ return response.getPartitionToServers();
+ },
+ retryInterval,
+ retryTimes);
+
+ } catch (Throwable throwable) {
+ throw new RssException("registerShuffle failed!", throwable);
+ }
+ /**
+ * we need to clear the metadata of the completed task, otherwise some
of the stage's data
+ * will be lost
+ */
+ try {
+ unregisterAllMapOutput(shuffleId);
+ } catch (SparkException e) {
+ LOG.error("Clear MapoutTracker Meta failed!");
+ throw new RssException("Clear MapoutTracker Meta failed!", e);
+ }
+ ShuffleHandleInfo handleInfo =
+ new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
+ shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo);
+ serverAssignedInfos.put(stageIdAndAttempt, true);
+ return true;
+ } else {
+ LOG.info(
+ "The Stage:{} has been reassigned in an Attempt{},Return without
performing any operation",
+ stageId,
+ stageAttemptNumber);
+ return false;
+ }
+ }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index cd2d79569..ddba67be0 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -20,9 +20,11 @@ package org.apache.uniffle.client.api;
import java.io.Closeable;
import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
+import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
+import org.apache.uniffle.client.response.RssReassignServersReponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
@@ -41,4 +43,6 @@ public interface ShuffleManagerClient extends Closeable {
RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
RssReportShuffleWriteFailureRequest req);
+
+ RssReassignServersReponse reassignShuffleServers(RssReassignServersRequest
req);
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index 8708a9b42..b38113c2e 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -24,9 +24,11 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
+import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
+import org.apache.uniffle.client.response.RssReassignServersReponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.config.RssBaseConf;
@@ -105,4 +107,12 @@ public class ShuffleManagerGrpcClient extends GrpcClient
implements ShuffleManag
throw new RssException(msg, e);
}
}
+
+ @Override
+ public RssReassignServersReponse
reassignShuffleServers(RssReassignServersRequest req) {
+ RssProtos.ReassignServersRequest reassignServersRequest = req.toProto();
+ RssProtos.ReassignServersReponse reassignServersReponse =
+ getBlockingStub().reassignShuffleServers(reassignServersRequest);
+ return RssReassignServersReponse.fromProto(reassignServersReponse);
+ }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignServersRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignServersRequest.java
new file mode 100644
index 000000000..5ce505ab1
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignServersRequest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReassignServersRequest {
+ private int stageId;
+ private int stageAttemptNumber;
+ private int shuffleId;
+ private int numPartitions;
+
+ public RssReassignServersRequest(
+ int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
+ this.stageId = stageId;
+ this.stageAttemptNumber = stageAttemptNumber;
+ this.shuffleId = shuffleId;
+ this.numPartitions = numPartitions;
+ }
+
+ public RssProtos.ReassignServersRequest toProto() {
+ RssProtos.ReassignServersRequest.Builder builder =
+ RssProtos.ReassignServersRequest.newBuilder()
+ .setStageId(stageId)
+ .setStageAttemptNumber(stageAttemptNumber)
+ .setShuffleId(shuffleId)
+ .setNumPartitions(numPartitions);
+ return builder.build();
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
new file mode 100644
index 000000000..4e332da32
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReassignServersReponse extends ClientResponse {
+
+ private boolean needReassign;
+
+ public RssReassignServersReponse(StatusCode statusCode, String message,
boolean needReassign) {
+ super(statusCode, message);
+ this.needReassign = needReassign;
+ }
+
+ public boolean isNeedReassign() {
+ return needReassign;
+ }
+
+ public static RssReassignServersReponse
fromProto(RssProtos.ReassignServersReponse response) {
+ return new RssReassignServersReponse(
+ // todo: [issue#780] add fromProto for StatusCode issue
+ StatusCode.valueOf(response.getStatus().name()),
+ response.getMsg(),
+ response.getNeedReassign());
+ }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 93bd2f67b..3b3ad8e59 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -501,6 +501,8 @@ service ShuffleManager {
rpc getPartitionToShufflerServer(PartitionToShuffleServerRequest) returns
(PartitionToShuffleServerResponse);
// Report write failures to ShuffleManager
rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns
(ReportShuffleWriteFailureResponse);
+ // Reassign the RPC interface of the ShuffleServer list
+ rpc reassignShuffleServers(ReassignServersRequest) returns
(ReassignServersReponse);
}
message ReportShuffleFetchFailureRequest {
@@ -550,3 +552,16 @@ message ReportShuffleWriteFailureResponse {
bool reSubmitWholeStage = 2;
string msg = 3;
}
+
+message ReassignServersRequest{
+ int32 stageId = 1;
+ int32 stageAttemptNumber = 2;
+ int32 shuffleId = 3;
+ int32 numPartitions = 4;
+}
+
+message ReassignServersReponse{
+ StatusCode status = 1;
+ bool needReassign = 2;
+ string msg = 3;
+}