This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new a22f95a50 [#1799] improvement(spark): Rename shuffleManager rpc to
reassignOnStageResubmit (#1800)
a22f95a50 is described below
commit a22f95a50fef4354292f3687dbe33bf4562ab0af
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jun 18 20:59:03 2024 +0800
[#1799] improvement(spark): Rename shuffleManager rpc to
reassignOnStageResubmit (#1800)
---
.../org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java | 2 +-
.../main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 2 +-
.../main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 2 +-
.../main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java | 2 +-
.../org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java | 4 ++--
proto/src/main/proto/Rss.proto | 4 ++--
6 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 425f03e65..3a2f58cca 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -237,7 +237,7 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
}
@Override
- public void reassignShuffleServers(
+ public void reassignOnStageResubmit(
RssProtos.ReassignServersRequest request,
StreamObserver<RssProtos.ReassignServersResponse> responseObserver) {
int stageId = request.getStageId();
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index b40b55028..2689ee39c 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -559,7 +559,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
shuffleId,
partitioner.numPartitions());
RssReassignServersResponse rssReassignServersResponse =
-
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
+
shuffleManagerClient.reassignOnStageResubmit(rssReassignServersRequest);
LOG.info(
"Whether the reassignment is successful: {}",
rssReassignServersResponse.isNeedReassign());
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 1f6f0b81b..37948b6b8 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -845,7 +845,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
shuffleId,
partitioner.numPartitions());
RssReassignServersResponse rssReassignServersResponse =
-
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
+
shuffleManagerClient.reassignOnStageResubmit(rssReassignServersRequest);
LOG.info(
"Whether the reassignment is successful: {}",
rssReassignServersResponse.isNeedReassign());
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index 71dc66584..c5b412a9e 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -62,7 +62,7 @@ public interface ShuffleManagerClient extends Closeable {
RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
RssReportShuffleWriteFailureRequest req);
- RssReassignServersResponse reassignShuffleServers(RssReassignServersRequest
req);
+ RssReassignServersResponse reassignOnStageResubmit(RssReassignServersRequest
req);
RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure(
RssReassignOnBlockSendFailureRequest request);
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index 030afd03d..6dd9f4a1e 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -127,10 +127,10 @@ public class ShuffleManagerGrpcClient extends GrpcClient
implements ShuffleManag
}
@Override
- public RssReassignServersResponse
reassignShuffleServers(RssReassignServersRequest req) {
+ public RssReassignServersResponse
reassignOnStageResubmit(RssReassignServersRequest req) {
RssProtos.ReassignServersRequest reassignServersRequest = req.toProto();
RssProtos.ReassignServersResponse reassignServersResponse =
- getBlockingStub().reassignShuffleServers(reassignServersRequest);
+ getBlockingStub().reassignOnStageResubmit(reassignServersRequest);
return RssReassignServersResponse.fromProto(reassignServersResponse);
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 61afad299..dc5bdf230 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -537,8 +537,8 @@ service ShuffleManager {
rpc
getPartitionToShufflerServerWithBlockRetry(PartitionToShuffleServerRequest)
returns (ReassignOnBlockSendFailureResponse);
// Report write failures to ShuffleManager
rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns
(ReportShuffleWriteFailureResponse);
- // Reassign the RPC interface of the ShuffleServer list
- rpc reassignShuffleServers(ReassignServersRequest) returns
(ReassignServersResponse);
+ // Reassign on stage resubmit
+ rpc reassignOnStageResubmit(ReassignServersRequest) returns
(ReassignServersResponse);
// Reassign on block send failure that occurs in writer
rpc reassignOnBlockSendFailure(RssReassignOnBlockSendFailureRequest) returns
(ReassignOnBlockSendFailureResponse);
rpc reportShuffleResult (ReportShuffleResultRequest) returns
(ReportShuffleResultResponse);