This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1c72cb746 [#1608] improvement(spark3): Output more task level infos in
driver side when reassigning on block sent failure (#1771)
1c72cb746 is described below
commit 1c72cb746753ef8ed3c193f458002a30ff16594c
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jun 11 15:22:11 2024 +0800
[#1608] improvement(spark3): Output more task level infos in driver side
when reassigning on block sent failure (#1771)
### What changes were proposed in this pull request?
1. Output more task level logs (like taskId, executorId and so on) in
driver side
2. Fix typo of `ReassignServersResponse`
### Why are the changes needed?
For better seeing the more contexts when reassign happens
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Needn't
---
.../shuffle/manager/ShuffleManagerGrpcService.java | 13 ++++++++++---
.../apache/spark/shuffle/writer/RssShuffleWriter.java | 6 +++---
.../org/apache/spark/shuffle/RssShuffleManager.java | 7 +++++--
.../apache/spark/shuffle/writer/RssShuffleWriter.java | 19 +++++++++++++++----
.../uniffle/client/api/ShuffleManagerClient.java | 4 ++--
.../client/impl/grpc/ShuffleManagerGrpcClient.java | 8 ++++----
.../request/RssReassignOnBlockSendFailureRequest.java | 19 ++++++++++++++++++-
...rsReponse.java => RssReassignServersResponse.java} | 8 ++++----
proto/src/main/proto/Rss.proto | 10 +++++++---
9 files changed, 68 insertions(+), 26 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 5aaf23a71..a4ff727b5 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -214,7 +214,7 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
@Override
public void reassignShuffleServers(
RssProtos.ReassignServersRequest request,
- StreamObserver<RssProtos.ReassignServersReponse> responseObserver) {
+ StreamObserver<RssProtos.ReassignServersResponse> responseObserver) {
int stageId = request.getStageId();
int stageAttemptNumber = request.getStageAttemptNumber();
int shuffleId = request.getShuffleId();
@@ -223,8 +223,8 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
shuffleManager.reassignAllShuffleServersForWholeStage(
stageId, stageAttemptNumber, shuffleId, numPartitions);
RssProtos.StatusCode code = RssProtos.StatusCode.SUCCESS;
- RssProtos.ReassignServersReponse reply =
- RssProtos.ReassignServersReponse.newBuilder()
+ RssProtos.ReassignServersResponse reply =
+ RssProtos.ReassignServersResponse.newBuilder()
.setStatus(code)
.setNeedReassign(needReassign)
.build();
@@ -241,6 +241,13 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
RssProtos.StatusCode code = RssProtos.StatusCode.INTERNAL_ERROR;
RssProtos.RssReassignOnBlockSendFailureResponse reply;
try {
+ LOG.info(
+ "Accepted reassign request on block sent failure for shuffleId: {},
stageId: {}, stageAttemptNumber: {} from taskAttemptId: {} on executorId: {}",
+ request.getShuffleId(),
+ request.getStageId(),
+ request.getStageAttemptNumber(),
+ request.getTaskAttemptId(),
+ request.getExecutorId());
MutableShuffleHandleInfo handle =
shuffleManager.reassignOnBlockSendFailure(
request.getShuffleId(),
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index e06896a53..969658373 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -68,7 +68,7 @@ import
org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
-import org.apache.uniffle.client.response.RssReassignServersReponse;
+import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -548,11 +548,11 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
taskContext.stageAttemptNumber(),
shuffleId,
partitioner.numPartitions());
- RssReassignServersReponse rssReassignServersReponse =
+ RssReassignServersResponse rssReassignServersResponse =
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
LOG.info(
"Whether the reassignment is successful: {}",
- rssReassignServersReponse.isNeedReassign());
+ rssReassignServersResponse.isNeedReassign());
// since we are going to roll out the whole stage, mapIndex
shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f70e38a7d..802d4d5ad 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -1380,10 +1380,13 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
assignmentShuffleServerNumber,
estimateTaskConcurrency,
faultyServerIds);
- LOG.info("Finished reassign");
+ LOG.info("Finished the shuffle assignment request to
coordinator.");
if (reassignmentHandler != null) {
response = reassignmentHandler.apply(response);
}
+ LOG.info(
+ "Register the partition->servers assignment. {}",
+ response.getServerToPartitionRanges());
registerShuffleServers(
id.get(), shuffleId, response.getServerToPartitionRanges(),
getRemoteStorageInfo());
return response.getPartitionToServers();
@@ -1391,7 +1394,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
retryInterval,
retryTimes);
} catch (Throwable throwable) {
- throw new RssException("registerShuffle failed!", throwable);
+ throw new RssException("Errors on requesting shuffle assignment!",
throwable);
}
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 78975b6a8..9d0e734fa 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -52,6 +52,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
@@ -76,7 +77,7 @@ import
org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
-import org.apache.uniffle.client.response.RssReassignServersReponse;
+import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ReceivingFailureServer;
@@ -582,8 +583,18 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
String driver = rssConf.getString("driver.host", "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
try (ShuffleManagerClient shuffleManagerClient =
createShuffleManagerClient(driver, port)) {
+ String executorId = SparkEnv.get().executorId();
+ long taskAttemptId = taskContext.taskAttemptId();
+ int stageId = taskContext.stageId();
+ int stageAttemptNum = taskContext.stageAttemptNumber();
RssReassignOnBlockSendFailureRequest request =
- new RssReassignOnBlockSendFailureRequest(shuffleId,
failurePartitionToServers);
+ new RssReassignOnBlockSendFailureRequest(
+ shuffleId,
+ failurePartitionToServers,
+ executorId,
+ taskAttemptId,
+ stageId,
+ stageAttemptNum);
RssReassignOnBlockSendFailureResponse response =
shuffleManagerClient.reassignOnBlockSendFailure(request);
if (response.getStatusCode() != StatusCode.SUCCESS) {
@@ -815,11 +826,11 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
taskContext.stageAttemptNumber(),
shuffleId,
partitioner.numPartitions());
- RssReassignServersReponse rssReassignServersReponse =
+ RssReassignServersResponse rssReassignServersResponse =
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
LOG.info(
"Whether the reassignment is successful: {}",
- rssReassignServersReponse.isNeedReassign());
+ rssReassignServersResponse.isNeedReassign());
// since we are going to roll out the whole stage, mapIndex
shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index 45d570e77..6b6ee1ece 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -30,7 +30,7 @@ import
org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
-import org.apache.uniffle.client.response.RssReassignServersReponse;
+import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
@@ -51,7 +51,7 @@ public interface ShuffleManagerClient extends Closeable {
RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
RssReportShuffleWriteFailureRequest req);
- RssReassignServersReponse reassignShuffleServers(RssReassignServersRequest
req);
+ RssReassignServersResponse reassignShuffleServers(RssReassignServersRequest
req);
RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure(
RssReassignOnBlockSendFailureRequest request);
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index bebee8911..997778bc9 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -34,7 +34,7 @@ import
org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
-import org.apache.uniffle.client.response.RssReassignServersReponse;
+import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
@@ -116,11 +116,11 @@ public class ShuffleManagerGrpcClient extends GrpcClient
implements ShuffleManag
}
@Override
- public RssReassignServersReponse
reassignShuffleServers(RssReassignServersRequest req) {
+ public RssReassignServersResponse
reassignShuffleServers(RssReassignServersRequest req) {
RssProtos.ReassignServersRequest reassignServersRequest = req.toProto();
- RssProtos.ReassignServersReponse reassignServersReponse =
+ RssProtos.ReassignServersResponse reassignServersResponse =
getBlockingStub().reassignShuffleServers(reassignServersRequest);
- return RssReassignServersReponse.fromProto(reassignServersReponse);
+ return RssReassignServersResponse.fromProto(reassignServersResponse);
}
@Override
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignOnBlockSendFailureRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignOnBlockSendFailureRequest.java
index 7a28493f7..303499fb4 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignOnBlockSendFailureRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignOnBlockSendFailureRequest.java
@@ -27,11 +27,24 @@ import org.apache.uniffle.proto.RssProtos;
public class RssReassignOnBlockSendFailureRequest {
private int shuffleId;
private Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers;
+ private String executorId;
+ private long taskAttemptId;
+ private int stageId;
+ private int stageAttemptNumber;
public RssReassignOnBlockSendFailureRequest(
- int shuffleId, Map<Integer, List<ReceivingFailureServer>>
failurePartitionToServers) {
+ int shuffleId,
+ Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers,
+ String executorId,
+ long taskAttemptId,
+ int stageId,
+ int stageAttemptNum) {
this.shuffleId = shuffleId;
this.failurePartitionToServers = failurePartitionToServers;
+ this.executorId = executorId;
+ this.taskAttemptId = taskAttemptId;
+ this.stageId = stageId;
+ this.stageAttemptNumber = stageAttemptNum;
}
public static RssProtos.RssReassignOnBlockSendFailureRequest toProto(
@@ -43,6 +56,10 @@ public class RssReassignOnBlockSendFailureRequest {
.collect(
Collectors.toMap(
Map.Entry::getKey, x ->
ReceivingFailureServer.toProto(x.getValue()))))
+ .setExecutorId(request.executorId)
+ .setStageId(request.stageId)
+ .setStageAttemptNumber(request.stageAttemptNumber)
+ .setTaskAttemptId(request.taskAttemptId)
.build();
}
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersResponse.java
similarity index 81%
rename from
internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
rename to
internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersResponse.java
index 4e332da32..d85351d98 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersReponse.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersResponse.java
@@ -20,11 +20,11 @@ package org.apache.uniffle.client.response;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.proto.RssProtos;
-public class RssReassignServersReponse extends ClientResponse {
+public class RssReassignServersResponse extends ClientResponse {
private boolean needReassign;
- public RssReassignServersReponse(StatusCode statusCode, String message,
boolean needReassign) {
+ public RssReassignServersResponse(StatusCode statusCode, String message,
boolean needReassign) {
super(statusCode, message);
this.needReassign = needReassign;
}
@@ -33,8 +33,8 @@ public class RssReassignServersReponse extends ClientResponse
{
return needReassign;
}
- public static RssReassignServersReponse
fromProto(RssProtos.ReassignServersReponse response) {
- return new RssReassignServersReponse(
+ public static RssReassignServersResponse
fromProto(RssProtos.ReassignServersResponse response) {
+ return new RssReassignServersResponse(
// todo: [issue#780] add fromProto for StatusCode issue
StatusCode.valueOf(response.getStatus().name()),
response.getMsg(),
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 5b3d9e133..a63e6d23a 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -533,7 +533,7 @@ service ShuffleManager {
// Report write failures to ShuffleManager
rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns
(ReportShuffleWriteFailureResponse);
// Reassign the RPC interface of the ShuffleServer list
- rpc reassignShuffleServers(ReassignServersRequest) returns
(ReassignServersReponse);
+ rpc reassignShuffleServers(ReassignServersRequest) returns
(ReassignServersResponse);
// Reassign on block send failure that occurs in writer
rpc reassignOnBlockSendFailure(RssReassignOnBlockSendFailureRequest) returns
(RssReassignOnBlockSendFailureResponse);
rpc reportShuffleResult (ReportShuffleResultRequest) returns
(ReportShuffleResultResponse);
@@ -609,15 +609,19 @@ message ReassignServersRequest{
int32 numPartitions = 4;
}
-message ReassignServersReponse{
+message ReassignServersResponse {
StatusCode status = 1;
bool needReassign = 2;
string msg = 3;
}
-message RssReassignOnBlockSendFailureRequest{
+message RssReassignOnBlockSendFailureRequest {
int32 shuffleId = 1;
map<int32, ReceivingFailureServers> failurePartitionToServerIds = 2;
+ int64 taskAttemptId = 3;
+ int32 stageId = 4;
+ int32 stageAttemptNumber = 5;
+ string executorId = 6;
}
message ReceivingFailureServers {