This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b40c5091d [#2665] feat(spark): Reconstruct the shuffle handle from 
initial spark handle it haven't bee updated (#2667)
b40c5091d is described below

commit b40c5091dc1d11d39df24047613468be1b17dde3
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Nov 13 20:23:19 2025 +0800

    [#2665] feat(spark): Reconstruct the shuffle handle from initial spark 
handle it haven't bee updated (#2667)
    
    ### What changes were proposed in this pull request?
    
    Reconstruct the shuffle handle information from static spark handle if it 
hasn’t been updated. This means the latest shuffle handle data will no longer 
need to be transmitted through the RPC layer, reducing the load on the 
driver—especially when there are a large number of tasks.
    
    ### Why are the changes needed?
    
    For issue #2665: Once partition reassignment is enabled, the shuffle 
information is always retrieved when a task starts, which puts significant 
pressure on the Spark driver. Although in most cases the shuffle information 
remains unchanged, this behavior provides a natural point for optimization.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests could cover this PR's change.
---
 .../shuffle/handle/MutableShuffleHandleInfo.java   |  8 +++++
 .../shuffle/manager/RssShuffleManagerBase.java     | 21 ++++++++---
 .../shuffle/manager/ShuffleManagerGrpcService.java | 17 ++++-----
 .../apache/spark/shuffle/RssShuffleManager.java    | 30 +++++-----------
 .../uniffle/client/api/ShuffleManagerClient.java   |  3 +-
 .../client/impl/grpc/ShuffleManagerGrpcClient.java | 11 +++---
 .../RssGetAssignmentForBlockRetryResponse.java     | 42 ++++++++++++++++++++++
 proto/src/main/proto/Rss.proto                     |  8 ++++-
 8 files changed, 100 insertions(+), 40 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
index 35aaa50be..4460c39bc 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -69,6 +70,8 @@ public class MutableShuffleHandleInfo extends 
ShuffleHandleInfoBase {
 
   private PartitionSplitMode partitionSplitMode = PartitionSplitMode.PIPELINE;
 
+  private AtomicBoolean isUpdated = new AtomicBoolean(false);
+
   public MutableShuffleHandleInfo(
       int shuffleId,
       Map<Integer, List<ShuffleServerInfo>> partitionToServers,
@@ -121,6 +124,10 @@ public class MutableShuffleHandleInfo extends 
ShuffleHandleInfoBase {
     return partitionReplicaAssignedServers;
   }
 
+  public boolean isUpdated() {
+    return isUpdated.get();
+  }
+
   public Set<ShuffleServerInfo> getReplacements(String faultyServerId) {
     return excludedServerToReplacements.get(faultyServerId);
   }
@@ -171,6 +178,7 @@ public class MutableShuffleHandleInfo extends 
ShuffleHandleInfoBase {
         }
       }
     }
+    isUpdated.set(true);
     return updatedServers;
   }
 
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index be5e56a05..5f9c48bb4 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -79,7 +79,7 @@ import 
org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
 import org.apache.uniffle.client.request.RssFetchClientConfRequest;
 import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
 import org.apache.uniffle.client.response.RssFetchClientConfResponse;
-import 
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
+import 
org.apache.uniffle.client.response.RssGetAssignmentForBlockRetryResponse;
 import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
 import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.ClientType;
@@ -914,8 +914,21 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
       // In partition block Retry mode, Get the ShuffleServer list from the 
Driver based on the
       // shuffleId.
-      return getRemoteShuffleHandleInfoWithBlockRetry(
-          stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
+      ShuffleHandleInfo handle =
+          getRemoteShuffleHandleInfoWithBlockRetry(
+              stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
+      if (handle == null) {
+        // if the handle is null, it means the shuffleHandle haven't been 
updated.
+        // we could re-construct from the spark's handle, that is to reduce 
the rpc data bytes
+        // transmit.
+        handle =
+            new MutableShuffleHandleInfo(
+                shuffleId,
+                rssHandle.getPartitionToServers(),
+                rssHandle.getRemoteStorage(),
+                partitionSplitMode);
+      }
+      return handle;
     } else {
       return new SimpleShuffleHandleInfo(
           shuffleId, rssHandle.getPartitionToServers(), 
rssHandle.getRemoteStorage());
@@ -954,7 +967,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
         new RssPartitionToShuffleServerRequest(
             stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
-    RssReassignOnBlockSendFailureResponse rpcPartitionToShufflerServer =
+    RssGetAssignmentForBlockRetryResponse rpcPartitionToShufflerServer =
         getOrCreateShuffleManagerClientSupplier()
             .get()
             
.getPartitionToShufflerServerWithBlockRetry(rssPartitionToShuffleServerRequest);
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 6c14fe929..1f3d4c8b2 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -287,22 +287,23 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
   @Override
   public void getPartitionToShufflerServerWithBlockRetry(
       RssProtos.PartitionToShuffleServerRequest request,
-      StreamObserver<RssProtos.ReassignOnBlockSendFailureResponse> 
responseObserver) {
-    RssProtos.ReassignOnBlockSendFailureResponse reply;
+      StreamObserver<RssProtos.GetAssignmentForBlockRetryResponse> 
responseObserver) {
+    RssProtos.GetAssignmentForBlockRetryResponse reply;
     RssProtos.StatusCode code;
     int shuffleId = request.getShuffleId();
     MutableShuffleHandleInfo shuffleHandle =
         (MutableShuffleHandleInfo) 
shuffleManager.getShuffleHandleInfoByShuffleId(shuffleId);
     if (shuffleHandle != null) {
       code = RssProtos.StatusCode.SUCCESS;
-      reply =
-          RssProtos.ReassignOnBlockSendFailureResponse.newBuilder()
-              .setStatus(code)
-              .setHandle(MutableShuffleHandleInfo.toProto(shuffleHandle))
-              .build();
+      RssProtos.GetAssignmentForBlockRetryResponse.Builder builder =
+          
RssProtos.GetAssignmentForBlockRetryResponse.newBuilder().setStatus(code);
+      if (shuffleHandle.isUpdated()) {
+        builder.setHandle(MutableShuffleHandleInfo.toProto(shuffleHandle));
+      }
+      reply = builder.build();
     } else {
       code = RssProtos.StatusCode.INVALID_REQUEST;
-      reply = 
RssProtos.ReassignOnBlockSendFailureResponse.newBuilder().setStatus(code).build();
+      reply = 
RssProtos.GetAssignmentForBlockRetryResponse.newBuilder().setStatus(code).build();
     }
     responseObserver.onNext(reply);
     responseObserver.onCompleted();
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 9a5cec0ac..650ece88e 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -370,30 +370,18 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     RssShuffleHandle<K, ?, C> rssShuffleHandle = (RssShuffleHandle<K, ?, C>) 
handle;
     final int partitionNum = 
rssShuffleHandle.getDependency().partitioner().numPartitions();
     int shuffleId = rssShuffleHandle.getShuffleId();
-    ShuffleHandleInfo shuffleHandleInfo;
 
-    if (shuffleManagerRpcServiceEnabled
-        && (rssStageRetryForWriteFailureEnabled || partitionReassignEnabled)) {
-      Supplier<ShuffleHandleInfo> func =
-          rssStageRetryForWriteFailureEnabled
-              ? () ->
-                  getRemoteShuffleHandleInfoWithStageRetry(
-                      context.stageId(), context.stageAttemptNumber(), 
shuffleId, false)
-              : () ->
-                  getRemoteShuffleHandleInfoWithBlockRetry(
-                      context.stageId(), context.stageAttemptNumber(), 
shuffleId, false);
-      if (readShuffleHandleCacheEnabled) {
-        shuffleHandleInfo = super.getOrFetchShuffleHandle(shuffleId, func);
-      } else {
-        shuffleHandleInfo = func.get();
-      }
+    ShuffleHandleInfo shuffleHandleInfo;
+    Supplier<ShuffleHandleInfo> func =
+        () ->
+            getShuffleHandleInfo(
+                context.stageId(), context.stageAttemptNumber(), 
rssShuffleHandle, false);
+    if (readShuffleHandleCacheEnabled) {
+      shuffleHandleInfo = super.getOrFetchShuffleHandle(shuffleId, func);
     } else {
-      shuffleHandleInfo =
-          new SimpleShuffleHandleInfo(
-              shuffleId,
-              rssShuffleHandle.getPartitionToServers(),
-              rssShuffleHandle.getRemoteStorage());
+      shuffleHandleInfo = func.get();
     }
+
     Map<ShuffleServerInfo, Set<Integer>> serverToPartitions =
         getPartitionDataServers(shuffleHandleInfo, startPartition, 
endPartition);
     long start = System.currentTimeMillis();
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index 381ad706f..98e3cd74e 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -26,6 +26,7 @@ import 
org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
 import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
+import 
org.apache.uniffle.client.response.RssGetAssignmentForBlockRetryResponse;
 import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
 import 
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
 import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
@@ -57,7 +58,7 @@ public interface ShuffleManagerClient extends 
StatefulCloseable {
    * @param req request
    * @return RssPartitionToShuffleServerResponse
    */
-  RssReassignOnBlockSendFailureResponse 
getPartitionToShufflerServerWithBlockRetry(
+  RssGetAssignmentForBlockRetryResponse 
getPartitionToShufflerServerWithBlockRetry(
       RssPartitionToShuffleServerRequest req);
 
   RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index 0040e56ef..9b3f29219 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -32,6 +32,7 @@ import 
org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
 import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
+import 
org.apache.uniffle.client.response.RssGetAssignmentForBlockRetryResponse;
 import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
 import 
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
 import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
@@ -102,14 +103,14 @@ public class ShuffleManagerGrpcClient extends GrpcClient 
implements ShuffleManag
   }
 
   @Override
-  public RssReassignOnBlockSendFailureResponse 
getPartitionToShufflerServerWithBlockRetry(
+  public RssGetAssignmentForBlockRetryResponse 
getPartitionToShufflerServerWithBlockRetry(
       RssPartitionToShuffleServerRequest req) {
     RssProtos.PartitionToShuffleServerRequest protoRequest = req.toProto();
-    RssProtos.ReassignOnBlockSendFailureResponse partitionToShufflerServer =
+    RssProtos.GetAssignmentForBlockRetryResponse partitionToShufflerServer =
         
getBlockingStub().getPartitionToShufflerServerWithBlockRetry(protoRequest);
-    RssReassignOnBlockSendFailureResponse 
rssReassignOnBlockSendFailureResponse =
-        
RssReassignOnBlockSendFailureResponse.fromProto(partitionToShufflerServer);
-    return rssReassignOnBlockSendFailureResponse;
+    RssGetAssignmentForBlockRetryResponse response =
+        
RssGetAssignmentForBlockRetryResponse.fromProto(partitionToShufflerServer);
+    return response;
   }
 
   @Override
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetAssignmentForBlockRetryResponse.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetAssignmentForBlockRetryResponse.java
new file mode 100644
index 000000000..8b89e7905
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetAssignmentForBlockRetryResponse.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssGetAssignmentForBlockRetryResponse extends ClientResponse {
+  private RssProtos.MutableShuffleHandleInfo handle;
+
+  public RssGetAssignmentForBlockRetryResponse(
+      StatusCode statusCode, String message, 
RssProtos.MutableShuffleHandleInfo handle) {
+    super(statusCode, message);
+    this.handle = handle;
+  }
+
+  public RssProtos.MutableShuffleHandleInfo getHandle() {
+    return handle;
+  }
+
+  public static RssGetAssignmentForBlockRetryResponse fromProto(
+      RssProtos.GetAssignmentForBlockRetryResponse response) {
+    RssProtos.MutableShuffleHandleInfo handle = response.hasHandle() ? 
response.getHandle() : null;
+    return new RssGetAssignmentForBlockRetryResponse(
+        StatusCode.valueOf(response.getStatus().name()), response.getMsg(), 
handle);
+  }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index fa869ac48..c48fe5c38 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -578,7 +578,7 @@ service ShuffleManager {
   // Gets the mapping between partitions and ShuffleServer from the 
ShuffleManager server on Stage Retry.
   rpc 
getPartitionToShufflerServerWithStageRetry(PartitionToShuffleServerRequest) 
returns (ReassignOnStageRetryResponse);
   // Gets the mapping between partitions and ShuffleServer from the 
ShuffleManager server on Block Retry.
-  rpc 
getPartitionToShufflerServerWithBlockRetry(PartitionToShuffleServerRequest) 
returns (ReassignOnBlockSendFailureResponse);
+  rpc 
getPartitionToShufflerServerWithBlockRetry(PartitionToShuffleServerRequest) 
returns (GetAssignmentForBlockRetryResponse);
   // Report write failures to ShuffleManager
   rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns 
(ReportShuffleWriteFailureResponse);
   // Reassign on block send failure that occurs in writer
@@ -787,6 +787,12 @@ message ReassignOnBlockSendFailureResponse {
   MutableShuffleHandleInfo handle = 3;
 }
 
+message GetAssignmentForBlockRetryResponse {
+  StatusCode status = 1;
+  string msg = 2;
+  optional MutableShuffleHandleInfo handle = 3;
+}
+
 message StartSortMergeRequest {
   string appId = 1;
   int32 shuffleId = 2;

Reply via email to