This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b40c5091d [#2665] feat(spark): Reconstruct the shuffle handle from
initial spark handle it haven't bee updated (#2667)
b40c5091d is described below
commit b40c5091dc1d11d39df24047613468be1b17dde3
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Nov 13 20:23:19 2025 +0800
[#2665] feat(spark): Reconstruct the shuffle handle from initial spark
handle it haven't bee updated (#2667)
### What changes were proposed in this pull request?
Reconstruct the shuffle handle information from static spark handle if it
hasn’t been updated. This means the latest shuffle handle data will no longer
need to be transmitted through the RPC layer, reducing the load on the
driver—especially when there are a large number of tasks.
### Why are the changes needed?
For issue #2665: Once partition reassignment is enabled, the shuffle
information is always retrieved when a task starts, which puts significant
pressure on the Spark driver. Although in most cases the shuffle information
remains unchanged, this behavior provides a natural point for optimization.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests could cover this PR's change.
---
.../shuffle/handle/MutableShuffleHandleInfo.java | 8 +++++
.../shuffle/manager/RssShuffleManagerBase.java | 21 ++++++++---
.../shuffle/manager/ShuffleManagerGrpcService.java | 17 ++++-----
.../apache/spark/shuffle/RssShuffleManager.java | 30 +++++-----------
.../uniffle/client/api/ShuffleManagerClient.java | 3 +-
.../client/impl/grpc/ShuffleManagerGrpcClient.java | 11 +++---
.../RssGetAssignmentForBlockRetryResponse.java | 42 ++++++++++++++++++++++
proto/src/main/proto/Rss.proto | 8 ++++-
8 files changed, 100 insertions(+), 40 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
index 35aaa50be..4460c39bc 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -69,6 +70,8 @@ public class MutableShuffleHandleInfo extends
ShuffleHandleInfoBase {
private PartitionSplitMode partitionSplitMode = PartitionSplitMode.PIPELINE;
+ private AtomicBoolean isUpdated = new AtomicBoolean(false);
+
public MutableShuffleHandleInfo(
int shuffleId,
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
@@ -121,6 +124,10 @@ public class MutableShuffleHandleInfo extends
ShuffleHandleInfoBase {
return partitionReplicaAssignedServers;
}
+ public boolean isUpdated() {
+ return isUpdated.get();
+ }
+
public Set<ShuffleServerInfo> getReplacements(String faultyServerId) {
return excludedServerToReplacements.get(faultyServerId);
}
@@ -171,6 +178,7 @@ public class MutableShuffleHandleInfo extends
ShuffleHandleInfoBase {
}
}
}
+ isUpdated.set(true);
return updatedServers;
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index be5e56a05..5f9c48bb4 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -79,7 +79,7 @@ import
org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
import org.apache.uniffle.client.request.RssFetchClientConfRequest;
import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
import org.apache.uniffle.client.response.RssFetchClientConfResponse;
-import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
+import
org.apache.uniffle.client.response.RssGetAssignmentForBlockRetryResponse;
import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ClientType;
@@ -914,8 +914,21 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In partition block Retry mode, Get the ShuffleServer list from the
Driver based on the
// shuffleId.
- return getRemoteShuffleHandleInfoWithBlockRetry(
- stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
+ ShuffleHandleInfo handle =
+ getRemoteShuffleHandleInfoWithBlockRetry(
+ stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
+ if (handle == null) {
+ // if the handle is null, it means the shuffleHandle haven't been
updated.
+ // we could re-construct from the spark's handle, that is to reduce
the rpc data bytes
+ // transmit.
+ handle =
+ new MutableShuffleHandleInfo(
+ shuffleId,
+ rssHandle.getPartitionToServers(),
+ rssHandle.getRemoteStorage(),
+ partitionSplitMode);
+ }
+ return handle;
} else {
return new SimpleShuffleHandleInfo(
shuffleId, rssHandle.getPartitionToServers(),
rssHandle.getRemoteStorage());
@@ -954,7 +967,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
new RssPartitionToShuffleServerRequest(
stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
- RssReassignOnBlockSendFailureResponse rpcPartitionToShufflerServer =
+ RssGetAssignmentForBlockRetryResponse rpcPartitionToShufflerServer =
getOrCreateShuffleManagerClientSupplier()
.get()
.getPartitionToShufflerServerWithBlockRetry(rssPartitionToShuffleServerRequest);
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 6c14fe929..1f3d4c8b2 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -287,22 +287,23 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
@Override
public void getPartitionToShufflerServerWithBlockRetry(
RssProtos.PartitionToShuffleServerRequest request,
- StreamObserver<RssProtos.ReassignOnBlockSendFailureResponse>
responseObserver) {
- RssProtos.ReassignOnBlockSendFailureResponse reply;
+ StreamObserver<RssProtos.GetAssignmentForBlockRetryResponse>
responseObserver) {
+ RssProtos.GetAssignmentForBlockRetryResponse reply;
RssProtos.StatusCode code;
int shuffleId = request.getShuffleId();
MutableShuffleHandleInfo shuffleHandle =
(MutableShuffleHandleInfo)
shuffleManager.getShuffleHandleInfoByShuffleId(shuffleId);
if (shuffleHandle != null) {
code = RssProtos.StatusCode.SUCCESS;
- reply =
- RssProtos.ReassignOnBlockSendFailureResponse.newBuilder()
- .setStatus(code)
- .setHandle(MutableShuffleHandleInfo.toProto(shuffleHandle))
- .build();
+ RssProtos.GetAssignmentForBlockRetryResponse.Builder builder =
+
RssProtos.GetAssignmentForBlockRetryResponse.newBuilder().setStatus(code);
+ if (shuffleHandle.isUpdated()) {
+ builder.setHandle(MutableShuffleHandleInfo.toProto(shuffleHandle));
+ }
+ reply = builder.build();
} else {
code = RssProtos.StatusCode.INVALID_REQUEST;
- reply =
RssProtos.ReassignOnBlockSendFailureResponse.newBuilder().setStatus(code).build();
+ reply =
RssProtos.GetAssignmentForBlockRetryResponse.newBuilder().setStatus(code).build();
}
responseObserver.onNext(reply);
responseObserver.onCompleted();
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 9a5cec0ac..650ece88e 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -370,30 +370,18 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
RssShuffleHandle<K, ?, C> rssShuffleHandle = (RssShuffleHandle<K, ?, C>)
handle;
final int partitionNum =
rssShuffleHandle.getDependency().partitioner().numPartitions();
int shuffleId = rssShuffleHandle.getShuffleId();
- ShuffleHandleInfo shuffleHandleInfo;
- if (shuffleManagerRpcServiceEnabled
- && (rssStageRetryForWriteFailureEnabled || partitionReassignEnabled)) {
- Supplier<ShuffleHandleInfo> func =
- rssStageRetryForWriteFailureEnabled
- ? () ->
- getRemoteShuffleHandleInfoWithStageRetry(
- context.stageId(), context.stageAttemptNumber(),
shuffleId, false)
- : () ->
- getRemoteShuffleHandleInfoWithBlockRetry(
- context.stageId(), context.stageAttemptNumber(),
shuffleId, false);
- if (readShuffleHandleCacheEnabled) {
- shuffleHandleInfo = super.getOrFetchShuffleHandle(shuffleId, func);
- } else {
- shuffleHandleInfo = func.get();
- }
+ ShuffleHandleInfo shuffleHandleInfo;
+ Supplier<ShuffleHandleInfo> func =
+ () ->
+ getShuffleHandleInfo(
+ context.stageId(), context.stageAttemptNumber(),
rssShuffleHandle, false);
+ if (readShuffleHandleCacheEnabled) {
+ shuffleHandleInfo = super.getOrFetchShuffleHandle(shuffleId, func);
} else {
- shuffleHandleInfo =
- new SimpleShuffleHandleInfo(
- shuffleId,
- rssShuffleHandle.getPartitionToServers(),
- rssShuffleHandle.getRemoteStorage());
+ shuffleHandleInfo = func.get();
}
+
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions =
getPartitionDataServers(shuffleHandleInfo, startPartition,
endPartition);
long start = System.currentTimeMillis();
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index 381ad706f..98e3cd74e 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -26,6 +26,7 @@ import
org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
+import
org.apache.uniffle.client.response.RssGetAssignmentForBlockRetryResponse;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
@@ -57,7 +58,7 @@ public interface ShuffleManagerClient extends
StatefulCloseable {
* @param req request
* @return RssPartitionToShuffleServerResponse
*/
- RssReassignOnBlockSendFailureResponse
getPartitionToShufflerServerWithBlockRetry(
+ RssGetAssignmentForBlockRetryResponse
getPartitionToShufflerServerWithBlockRetry(
RssPartitionToShuffleServerRequest req);
RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index 0040e56ef..9b3f29219 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -32,6 +32,7 @@ import
org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
+import
org.apache.uniffle.client.response.RssGetAssignmentForBlockRetryResponse;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
@@ -102,14 +103,14 @@ public class ShuffleManagerGrpcClient extends GrpcClient
implements ShuffleManag
}
@Override
- public RssReassignOnBlockSendFailureResponse
getPartitionToShufflerServerWithBlockRetry(
+ public RssGetAssignmentForBlockRetryResponse
getPartitionToShufflerServerWithBlockRetry(
RssPartitionToShuffleServerRequest req) {
RssProtos.PartitionToShuffleServerRequest protoRequest = req.toProto();
- RssProtos.ReassignOnBlockSendFailureResponse partitionToShufflerServer =
+ RssProtos.GetAssignmentForBlockRetryResponse partitionToShufflerServer =
getBlockingStub().getPartitionToShufflerServerWithBlockRetry(protoRequest);
- RssReassignOnBlockSendFailureResponse
rssReassignOnBlockSendFailureResponse =
-
RssReassignOnBlockSendFailureResponse.fromProto(partitionToShufflerServer);
- return rssReassignOnBlockSendFailureResponse;
+ RssGetAssignmentForBlockRetryResponse response =
+
RssGetAssignmentForBlockRetryResponse.fromProto(partitionToShufflerServer);
+ return response;
}
@Override
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetAssignmentForBlockRetryResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetAssignmentForBlockRetryResponse.java
new file mode 100644
index 000000000..8b89e7905
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetAssignmentForBlockRetryResponse.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssGetAssignmentForBlockRetryResponse extends ClientResponse {
+ private RssProtos.MutableShuffleHandleInfo handle;
+
+ public RssGetAssignmentForBlockRetryResponse(
+ StatusCode statusCode, String message,
RssProtos.MutableShuffleHandleInfo handle) {
+ super(statusCode, message);
+ this.handle = handle;
+ }
+
+ public RssProtos.MutableShuffleHandleInfo getHandle() {
+ return handle;
+ }
+
+ public static RssGetAssignmentForBlockRetryResponse fromProto(
+ RssProtos.GetAssignmentForBlockRetryResponse response) {
+ RssProtos.MutableShuffleHandleInfo handle = response.hasHandle() ?
response.getHandle() : null;
+ return new RssGetAssignmentForBlockRetryResponse(
+ StatusCode.valueOf(response.getStatus().name()), response.getMsg(),
handle);
+ }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index fa869ac48..c48fe5c38 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -578,7 +578,7 @@ service ShuffleManager {
// Gets the mapping between partitions and ShuffleServer from the
ShuffleManager server on Stage Retry.
rpc
getPartitionToShufflerServerWithStageRetry(PartitionToShuffleServerRequest)
returns (ReassignOnStageRetryResponse);
// Gets the mapping between partitions and ShuffleServer from the
ShuffleManager server on Block Retry.
- rpc
getPartitionToShufflerServerWithBlockRetry(PartitionToShuffleServerRequest)
returns (ReassignOnBlockSendFailureResponse);
+ rpc
getPartitionToShufflerServerWithBlockRetry(PartitionToShuffleServerRequest)
returns (GetAssignmentForBlockRetryResponse);
// Report write failures to ShuffleManager
rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns
(ReportShuffleWriteFailureResponse);
// Reassign on block send failure that occurs in writer
@@ -787,6 +787,12 @@ message ReassignOnBlockSendFailureResponse {
MutableShuffleHandleInfo handle = 3;
}
+message GetAssignmentForBlockRetryResponse {
+ StatusCode status = 1;
+ string msg = 2;
+ optional MutableShuffleHandleInfo handle = 3;
+}
+
message StartSortMergeRequest {
string appId = 1;
int32 shuffleId = 2;