This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c72cb746 [#1608] improvement(spark3): Output more task level infos in 
driver side when reassigning on block sent failure (#1771)
1c72cb746 is described below

commit 1c72cb746753ef8ed3c193f458002a30ff16594c
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jun 11 15:22:11 2024 +0800

    [#1608] improvement(spark3): Output more task level infos in driver side 
when reassigning on block sent failure (#1771)
    
    ### What changes were proposed in this pull request?
    
    1. Output more task level logs (like taskId, executorId and so on) in 
driver side
    2. Fix typo of `ReassignServersResponse`
    
    ### Why are the changes needed?
    
    For better seeing the more contexts when reassign happens
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Needn't
---
 .../shuffle/manager/ShuffleManagerGrpcService.java    | 13 ++++++++++---
 .../apache/spark/shuffle/writer/RssShuffleWriter.java |  6 +++---
 .../org/apache/spark/shuffle/RssShuffleManager.java   |  7 +++++--
 .../apache/spark/shuffle/writer/RssShuffleWriter.java | 19 +++++++++++++++----
 .../uniffle/client/api/ShuffleManagerClient.java      |  4 ++--
 .../client/impl/grpc/ShuffleManagerGrpcClient.java    |  8 ++++----
 .../request/RssReassignOnBlockSendFailureRequest.java | 19 ++++++++++++++++++-
 ...rsReponse.java => RssReassignServersResponse.java} |  8 ++++----
 proto/src/main/proto/Rss.proto                        | 10 +++++++---
 9 files changed, 68 insertions(+), 26 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 5aaf23a71..a4ff727b5 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -214,7 +214,7 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
   @Override
   public void reassignShuffleServers(
       RssProtos.ReassignServersRequest request,
-      StreamObserver<RssProtos.ReassignServersReponse> responseObserver) {
+      StreamObserver<RssProtos.ReassignServersResponse> responseObserver) {
     int stageId = request.getStageId();
     int stageAttemptNumber = request.getStageAttemptNumber();
     int shuffleId = request.getShuffleId();
@@ -223,8 +223,8 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
         shuffleManager.reassignAllShuffleServersForWholeStage(
             stageId, stageAttemptNumber, shuffleId, numPartitions);
     RssProtos.StatusCode code = RssProtos.StatusCode.SUCCESS;
-    RssProtos.ReassignServersReponse reply =
-        RssProtos.ReassignServersReponse.newBuilder()
+    RssProtos.ReassignServersResponse reply =
+        RssProtos.ReassignServersResponse.newBuilder()
             .setStatus(code)
             .setNeedReassign(needReassign)
             .build();
@@ -241,6 +241,13 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
     RssProtos.StatusCode code = RssProtos.StatusCode.INTERNAL_ERROR;
     RssProtos.RssReassignOnBlockSendFailureResponse reply;
     try {
+      LOG.info(
+          "Accepted reassign request on block sent failure for shuffleId: {}, 
stageId: {}, stageAttemptNumber: {} from taskAttemptId: {} on executorId: {}",
+          request.getShuffleId(),
+          request.getStageId(),
+          request.getStageAttemptNumber(),
+          request.getTaskAttemptId(),
+          request.getExecutorId());
       MutableShuffleHandleInfo handle =
           shuffleManager.reassignOnBlockSendFailure(
               request.getShuffleId(),
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index e06896a53..969658373 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -68,7 +68,7 @@ import 
org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
 import org.apache.uniffle.client.impl.FailedBlockSendTracker;
 import org.apache.uniffle.client.request.RssReassignServersRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
-import org.apache.uniffle.client.response.RssReassignServersReponse;
+import org.apache.uniffle.client.response.RssReassignServersResponse;
 import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -548,11 +548,11 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
                   taskContext.stageAttemptNumber(),
                   shuffleId,
                   partitioner.numPartitions());
-          RssReassignServersReponse rssReassignServersReponse =
+          RssReassignServersResponse rssReassignServersResponse =
               
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
           LOG.info(
               "Whether the reassignment is successful: {}",
-              rssReassignServersReponse.isNeedReassign());
+              rssReassignServersResponse.isNeedReassign());
           // since we are going to roll out the whole stage, mapIndex 
shouldn't matter, hence -1 is
           // provided.
           FetchFailedException ffe =
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f70e38a7d..802d4d5ad 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -1380,10 +1380,13 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
                     assignmentShuffleServerNumber,
                     estimateTaskConcurrency,
                     faultyServerIds);
-            LOG.info("Finished reassign");
+            LOG.info("Finished the shuffle assignment request to 
coordinator.");
             if (reassignmentHandler != null) {
               response = reassignmentHandler.apply(response);
             }
+            LOG.info(
+                "Register the partition->servers assignment. {}",
+                response.getServerToPartitionRanges());
             registerShuffleServers(
                 id.get(), shuffleId, response.getServerToPartitionRanges(), 
getRemoteStorageInfo());
             return response.getPartitionToServers();
@@ -1391,7 +1394,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
           retryInterval,
           retryTimes);
     } catch (Throwable throwable) {
-      throw new RssException("registerShuffle failed!", throwable);
+      throw new RssException("Errors on requesting shuffle assignment!", 
throwable);
     }
   }
 
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 78975b6a8..9d0e734fa 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -52,6 +52,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.spark.Partitioner;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.scheduler.MapStatus;
@@ -76,7 +77,7 @@ import 
org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
 import org.apache.uniffle.client.request.RssReassignServersRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
 import 
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
-import org.apache.uniffle.client.response.RssReassignServersReponse;
+import org.apache.uniffle.client.response.RssReassignServersResponse;
 import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.ReceivingFailureServer;
@@ -582,8 +583,18 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     String driver = rssConf.getString("driver.host", "");
     int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
     try (ShuffleManagerClient shuffleManagerClient = 
createShuffleManagerClient(driver, port)) {
+      String executorId = SparkEnv.get().executorId();
+      long taskAttemptId = taskContext.taskAttemptId();
+      int stageId = taskContext.stageId();
+      int stageAttemptNum = taskContext.stageAttemptNumber();
       RssReassignOnBlockSendFailureRequest request =
-          new RssReassignOnBlockSendFailureRequest(shuffleId, 
failurePartitionToServers);
+          new RssReassignOnBlockSendFailureRequest(
+              shuffleId,
+              failurePartitionToServers,
+              executorId,
+              taskAttemptId,
+              stageId,
+              stageAttemptNum);
       RssReassignOnBlockSendFailureResponse response =
           shuffleManagerClient.reassignOnBlockSendFailure(request);
       if (response.getStatusCode() != StatusCode.SUCCESS) {
@@ -815,11 +826,11 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
                   taskContext.stageAttemptNumber(),
                   shuffleId,
                   partitioner.numPartitions());
-          RssReassignServersReponse rssReassignServersReponse =
+          RssReassignServersResponse rssReassignServersResponse =
               
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
           LOG.info(
               "Whether the reassignment is successful: {}",
-              rssReassignServersReponse.isNeedReassign());
+              rssReassignServersResponse.isNeedReassign());
           // since we are going to roll out the whole stage, mapIndex 
shouldn't matter, hence -1 is
           // provided.
           FetchFailedException ffe =
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index 45d570e77..6b6ee1ece 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -30,7 +30,7 @@ import 
org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
 import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
 import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
 import 
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
-import org.apache.uniffle.client.response.RssReassignServersReponse;
+import org.apache.uniffle.client.response.RssReassignServersResponse;
 import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
@@ -51,7 +51,7 @@ public interface ShuffleManagerClient extends Closeable {
   RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
       RssReportShuffleWriteFailureRequest req);
 
-  RssReassignServersReponse reassignShuffleServers(RssReassignServersRequest 
req);
+  RssReassignServersResponse reassignShuffleServers(RssReassignServersRequest 
req);
 
   RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure(
       RssReassignOnBlockSendFailureRequest request);
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index bebee8911..997778bc9 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -34,7 +34,7 @@ import 
org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
 import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
 import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
 import 
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
-import org.apache.uniffle.client.response.RssReassignServersReponse;
+import org.apache.uniffle.client.response.RssReassignServersResponse;
 import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
@@ -116,11 +116,11 @@ public class ShuffleManagerGrpcClient extends GrpcClient 
implements ShuffleManag
   }
 
   @Override
-  public RssReassignServersReponse 
reassignShuffleServers(RssReassignServersRequest req) {
+  public RssReassignServersResponse 
reassignShuffleServers(RssReassignServersRequest req) {
     RssProtos.ReassignServersRequest reassignServersRequest = req.toProto();
-    RssProtos.ReassignServersReponse reassignServersReponse =
+    RssProtos.ReassignServersResponse reassignServersResponse =
         getBlockingStub().reassignShuffleServers(reassignServersRequest);
-    return RssReassignServersReponse.fromProto(reassignServersReponse);
+    return RssReassignServersResponse.fromProto(reassignServersResponse);
   }
 
   @Override
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignOnBlockSendFailureRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignOnBlockSendFailureRequest.java
index 7a28493f7..303499fb4 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignOnBlockSendFailureRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignOnBlockSendFailureRequest.java
@@ -27,11 +27,24 @@ import org.apache.uniffle.proto.RssProtos;
 public class RssReassignOnBlockSendFailureRequest {
   private int shuffleId;
   private Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers;
+  private String executorId;
+  private long taskAttemptId;
+  private int stageId;
+  private int stageAttemptNumber;
 
   public RssReassignOnBlockSendFailureRequest(
-      int shuffleId, Map<Integer, List<ReceivingFailureServer>> 
failurePartitionToServers) {
+      int shuffleId,
+      Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers,
+      String executorId,
+      long taskAttemptId,
+      int stageId,
+      int stageAttemptNum) {
     this.shuffleId = shuffleId;
     this.failurePartitionToServers = failurePartitionToServers;
+    this.executorId = executorId;
+    this.taskAttemptId = taskAttemptId;
+    this.stageId = stageId;
+    this.stageAttemptNumber = stageAttemptNum;
   }
 
   public static RssProtos.RssReassignOnBlockSendFailureRequest toProto(
@@ -43,6 +56,10 @@ public class RssReassignOnBlockSendFailureRequest {
                 .collect(
                     Collectors.toMap(
                         Map.Entry::getKey, x -> 
ReceivingFailureServer.toProto(x.getValue()))))
+        .setExecutorId(request.executorId)
+        .setStageId(request.stageId)
+        .setStageAttemptNumber(request.stageAttemptNumber)
+        .setTaskAttemptId(request.taskAttemptId)
         .build();
   }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersResponse.java
similarity index 81%
rename from 
internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
rename to 
internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersResponse.java
index 4e332da32..d85351d98 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersResponse.java
@@ -20,11 +20,11 @@ package org.apache.uniffle.client.response;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.proto.RssProtos;
 
-public class RssReassignServersReponse extends ClientResponse {
+public class RssReassignServersResponse extends ClientResponse {
 
   private boolean needReassign;
 
-  public RssReassignServersReponse(StatusCode statusCode, String message, 
boolean needReassign) {
+  public RssReassignServersResponse(StatusCode statusCode, String message, 
boolean needReassign) {
     super(statusCode, message);
     this.needReassign = needReassign;
   }
@@ -33,8 +33,8 @@ public class RssReassignServersReponse extends ClientResponse 
{
     return needReassign;
   }
 
-  public static RssReassignServersReponse 
fromProto(RssProtos.ReassignServersReponse response) {
-    return new RssReassignServersReponse(
+  public static RssReassignServersResponse 
fromProto(RssProtos.ReassignServersResponse response) {
+    return new RssReassignServersResponse(
         // todo: [issue#780] add fromProto for StatusCode issue
         StatusCode.valueOf(response.getStatus().name()),
         response.getMsg(),
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 5b3d9e133..a63e6d23a 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -533,7 +533,7 @@ service ShuffleManager {
   // Report write failures to ShuffleManager
   rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns 
(ReportShuffleWriteFailureResponse);
   // Reassign the RPC interface of the ShuffleServer list
-  rpc reassignShuffleServers(ReassignServersRequest) returns 
(ReassignServersReponse);
+  rpc reassignShuffleServers(ReassignServersRequest) returns 
(ReassignServersResponse);
   // Reassign on block send failure that occurs in writer
   rpc reassignOnBlockSendFailure(RssReassignOnBlockSendFailureRequest) returns 
(RssReassignOnBlockSendFailureResponse);
   rpc reportShuffleResult (ReportShuffleResultRequest) returns 
(ReportShuffleResultResponse);
@@ -609,15 +609,19 @@ message ReassignServersRequest{
   int32 numPartitions = 4;
 }
 
-message ReassignServersReponse{
+message ReassignServersResponse {
   StatusCode status = 1;
   bool needReassign = 2;
   string msg = 3;
 }
 
-message RssReassignOnBlockSendFailureRequest{
+message RssReassignOnBlockSendFailureRequest {
   int32 shuffleId  = 1;
   map<int32, ReceivingFailureServers> failurePartitionToServerIds = 2;
+  int64 taskAttemptId = 3;
+  int32 stageId = 4;
+  int32 stageAttemptNumber = 5;
+  string executorId = 6;
 }
 
 message ReceivingFailureServers {

Reply via email to