This is an automated email from the ASF dual-hosted git repository.
jerrylei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1be07a916 [#825][part-4] feat(spark): Report write failures to
ShuffleManager. (#1258)
1be07a916 is described below
commit 1be07a9160300a2cabb3cec433e62b44b9907a94
Author: yl09099 <[email protected]>
AuthorDate: Tue Oct 31 17:28:37 2023 +0800
[#825][part-4] feat(spark): Report write failures to ShuffleManager. (#1258)
### What changes were proposed in this pull request?
Send the write exception to the ShuffleServer to the ShuffleManager.
### Why are the changes needed?
Fix: #825
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UT.
---
.../manager/RssShuffleManagerInterface.java | 7 +
.../shuffle/manager/ShuffleManagerGrpcService.java | 158 +++++++++++++++++++++
.../shuffle/manager/DummyRssShuffleManager.java | 3 +
.../apache/spark/shuffle/RssShuffleManager.java | 12 ++
.../apache/spark/shuffle/RssShuffleManager.java | 12 ++
.../uniffle/client/api/ShuffleManagerClient.java | 5 +
.../client/impl/grpc/ShuffleManagerGrpcClient.java | 17 +++
.../RssReportShuffleWriteFailureRequest.java | 70 +++++++++
.../RssReportShuffleWriteFailureResponse.java | 53 +++++++
proto/src/main/proto/Rss.proto | 16 +++
10 files changed, 353 insertions(+)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
index 34009cbad..4940277c8 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
@@ -63,4 +63,11 @@ public interface RssShuffleManagerInterface {
* @return ShuffleHandleInfo
*/
ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int shuffleId);
+
+ /**
+ * Add the shuffleServer that failed to write to the failure list
+ *
+ * @param shuffleServerId
+ */
+ void addFailuresShuffleServerInfos(String shuffleServerId);
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 5c4b8795c..a03c0439b 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -17,9 +17,12 @@
package org.apache.uniffle.shuffle.manager;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
@@ -37,12 +40,86 @@ import
org.apache.uniffle.proto.ShuffleManagerGrpc.ShuffleManagerImplBase;
public class ShuffleManagerGrpcService extends ShuffleManagerImplBase {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
private final Map<Integer, RssShuffleStatus> shuffleStatus =
JavaUtils.newConcurrentMap();
+ // The shuffleId mapping records the number of ShuffleServer write failures
+ private final Map<Integer, ShuffleServerFailureRecord> shuffleWrtieStatus =
+ JavaUtils.newConcurrentMap();
private final RssShuffleManagerInterface shuffleManager;
public ShuffleManagerGrpcService(RssShuffleManagerInterface shuffleManager) {
this.shuffleManager = shuffleManager;
}
+ @Override
+ public void reportShuffleWriteFailure(
+ RssProtos.ReportShuffleWriteFailureRequest request,
+ StreamObserver<RssProtos.ReportShuffleWriteFailureResponse>
responseObserver) {
+ String appId = request.getAppId();
+ int shuffleId = request.getShuffleId();
+ int stageAttemptNumber = request.getStageAttemptNumber();
+ List<RssProtos.ShuffleServerId> shuffleServerIdsList =
request.getShuffleServerIdsList();
+ RssProtos.StatusCode code;
+ boolean reSubmitWholeStage;
+ String msg;
+ if (!appId.equals(shuffleManager.getAppId())) {
+ msg =
+ String.format(
+ "got a wrong shuffle write failure report from appId: %s,
expected appId: %s",
+ appId, shuffleManager.getAppId());
+ LOG.warn(msg);
+ code = RssProtos.StatusCode.INVALID_REQUEST;
+ reSubmitWholeStage = false;
+ } else {
+ Map<String, AtomicInteger> shuffleServerInfoIntegerMap =
JavaUtils.newConcurrentMap();
+ List<ShuffleServerInfo> shuffleServerInfos =
+ ShuffleServerInfo.fromProto(shuffleServerIdsList);
+ shuffleServerInfos.forEach(
+ shuffleServerInfo -> {
+ shuffleServerInfoIntegerMap.put(shuffleServerInfo.getId(), new
AtomicInteger(0));
+ });
+ ShuffleServerFailureRecord shuffleServerFailureRecord =
+ shuffleWrtieStatus.computeIfAbsent(
+ shuffleId,
+ key ->
+ new ShuffleServerFailureRecord(shuffleServerInfoIntegerMap,
stageAttemptNumber));
+ boolean resetflag =
+
shuffleServerFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
+ if (resetflag) {
+ msg =
+ String.format(
+ "got an old stage(%d vs %d) shuffle write failure report,
which should be impossible.",
+ shuffleServerFailureRecord.getStageAttempt(),
stageAttemptNumber);
+ LOG.warn(msg);
+ code = RssProtos.StatusCode.INVALID_REQUEST;
+ reSubmitWholeStage = false;
+ } else {
+ code = RssProtos.StatusCode.SUCCESS;
+ // update the stage shuffleServer write failed count
+ boolean fetchFailureflag =
+ shuffleServerFailureRecord.incPartitionWriteFailure(
+ stageAttemptNumber, shuffleServerInfos, shuffleManager);
+ if (fetchFailureflag) {
+ reSubmitWholeStage = true;
+ msg =
+ String.format(
+ "report shuffle write failure as maximum number(%d) of
shuffle write is occurred",
+ shuffleManager.getMaxFetchFailures());
+ } else {
+ reSubmitWholeStage = false;
+ msg = "don't report shuffle write failure";
+ }
+ }
+ }
+
+ RssProtos.ReportShuffleWriteFailureResponse reply =
+ RssProtos.ReportShuffleWriteFailureResponse.newBuilder()
+ .setStatus(code)
+ .setReSubmitWholeStage(reSubmitWholeStage)
+ .setMsg(msg)
+ .build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
@Override
public void reportShuffleFetchFailure(
RssProtos.ReportShuffleFetchFailureRequest request,
@@ -163,6 +240,87 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
shuffleStatus.remove(shuffleId);
}
+ private static class ShuffleServerFailureRecord {
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock =
lock.writeLock();
+ private final Map<String, AtomicInteger> shuffleServerFailureRecordCount;
+ private int stageAttemptNumber;
+
+ private ShuffleServerFailureRecord(
+ Map<String, AtomicInteger> shuffleServerFailureRecordCount, int
stageAttemptNumber) {
+ this.shuffleServerFailureRecordCount = shuffleServerFailureRecordCount;
+ this.stageAttemptNumber = stageAttemptNumber;
+ }
+
+ private <T> T withReadLock(Supplier<T> fn) {
+ readLock.lock();
+ try {
+ return fn.get();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private <T> T withWriteLock(Supplier<T> fn) {
+ writeLock.lock();
+ try {
+ return fn.get();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public int getStageAttempt() {
+ return withReadLock(() -> this.stageAttemptNumber);
+ }
+
+ public boolean resetStageAttemptIfNecessary(int stageAttemptNumber) {
+ return withWriteLock(
+ () -> {
+ if (this.stageAttemptNumber < stageAttemptNumber) {
+ // a new stage attempt is issued. Record the shuffleServer
status of the Map should be
+ // clear and reset.
+ shuffleServerFailureRecordCount.clear();
+ this.stageAttemptNumber = stageAttemptNumber;
+ return false;
+ } else if (this.stageAttemptNumber > stageAttemptNumber) {
+ return true;
+ }
+ return false;
+ });
+ }
+
+ public boolean incPartitionWriteFailure(
+ int stageAttemptNumber,
+ List<ShuffleServerInfo> shuffleServerInfos,
+ RssShuffleManagerInterface shuffleManager) {
+ return withWriteLock(
+ () -> {
+ if (this.stageAttemptNumber != stageAttemptNumber) {
+ // do nothing here
+ return false;
+ }
+ shuffleServerInfos.forEach(
+ shuffleServerInfo -> {
+ shuffleServerFailureRecordCount
+ .computeIfAbsent(shuffleServerInfo.getId(), k -> new
AtomicInteger())
+ .incrementAndGet();
+ });
+ List<Map.Entry<String, AtomicInteger>> list =
+ new ArrayList(shuffleServerFailureRecordCount.entrySet());
+ Collections.sort(list, (o1, o2) -> (o1.getValue().get() -
o2.getValue().get()));
+ Map.Entry<String, AtomicInteger> shuffleServerInfoIntegerEntry =
list.get(0);
+ if (shuffleServerInfoIntegerEntry.getValue().get()
+ > shuffleManager.getMaxFetchFailures()) {
+
shuffleManager.addFailuresShuffleServerInfos(shuffleServerInfoIntegerEntry.getKey());
+ return true;
+ }
+ return false;
+ });
+ }
+ }
+
private static class RssShuffleStatus {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
diff --git
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
index dfd4b69a1..24980afbf 100644
---
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
+++
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
@@ -54,4 +54,7 @@ public class DummyRssShuffleManager implements
RssShuffleManagerInterface {
public ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int shuffleId) {
return null;
}
+
+ @Override
+ public void addFailuresShuffleServerInfos(String shuffleServerId) {}
}
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index dcebe11f8..2cdf8636c 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -123,6 +123,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
JavaUtils.newConcurrentMap();
/** Whether to enable the dynamic shuffleServer function rewrite and reread
functions */
private boolean rssResubmitStage;
+ /** A list of shuffleServer for Write failures */
+ private Set<String> failuresShuffleServerIds = Sets.newHashSet();
public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
if (sparkConf.getBoolean("spark.sql.adaptive.enabled", false)) {
@@ -794,4 +796,14 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
rpcPartitionToShufflerServer.getRemoteStorageInfo());
return shuffleHandleInfo;
}
+
+ /**
+ * Add the shuffleServer that failed to write to the failure list
+ *
+ * @param shuffleServerId
+ */
+ @Override
+ public void addFailuresShuffleServerInfos(String shuffleServerId) {
+ failuresShuffleServerIds.add(shuffleServerId);
+ }
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d9e730e7e..3ee108fd3 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -134,6 +134,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
/** Whether to enable the dynamic shuffleServer function rewrite and reread
functions */
private boolean rssResubmitStage;
+ /** A list of shuffleServer for Write failures */
+ private Set<String> failuresShuffleServerIds;
public RssShuffleManager(SparkConf conf, boolean isDriver) {
this.sparkConf = conf;
@@ -1092,4 +1094,14 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
rpcPartitionToShufflerServer.getRemoteStorageInfo());
return shuffleHandleInfo;
}
+
+ /**
+ * Add the shuffleServer that failed to write to the failure list
+ *
+ * @param shuffleServerId
+ */
+ @Override
+ public void addFailuresShuffleServerInfos(String shuffleServerId) {
+ failuresShuffleServerIds.add(shuffleServerId);
+ }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index 997a8e0bc..cd2d79569 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -21,8 +21,10 @@ import java.io.Closeable;
import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
public interface ShuffleManagerClient extends Closeable {
RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(
@@ -36,4 +38,7 @@ public interface ShuffleManagerClient extends Closeable {
*/
RssPartitionToShuffleServerResponse getPartitionToShufflerServer(
RssPartitionToShuffleServerRequest req);
+
+ RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
+ RssReportShuffleWriteFailureRequest req);
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index 56962c885..8708a9b42 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -25,8 +25,10 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.proto.RssProtos;
@@ -88,4 +90,19 @@ public class ShuffleManagerGrpcClient extends GrpcClient
implements ShuffleManag
RssPartitionToShuffleServerResponse.fromProto(partitionToShufflerServer);
return rssPartitionToShuffleServerResponse;
}
+
+ @Override
+ public RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
+ RssReportShuffleWriteFailureRequest request) {
+ RssProtos.ReportShuffleWriteFailureRequest protoRequest =
request.toProto();
+ try {
+ RssProtos.ReportShuffleWriteFailureResponse response =
+ getBlockingStub().reportShuffleWriteFailure(protoRequest);
+ return RssReportShuffleWriteFailureResponse.fromProto(response);
+ } catch (Exception e) {
+ String msg = "Report shuffle fetch failure to host:port[" + host + ":" +
port + "] failed";
+ LOG.warn(msg, e);
+ throw new RssException(msg, e);
+ }
+ }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteFailureRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteFailureRequest.java
new file mode 100644
index 000000000..c05176769
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteFailureRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleWriteFailureRequest;
+import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
+
+public class RssReportShuffleWriteFailureRequest {
+ private String appId;
+ private int shuffleId;
+ private int stageAttemptNumber;
+ private List<ShuffleServerInfo> shuffleServerInfos;
+ private String exception;
+
+ public RssReportShuffleWriteFailureRequest(
+ String appId,
+ int shuffleId,
+ int stageAttemptNumber,
+ List<ShuffleServerInfo> shuffleServerInfos,
+ String exception) {
+ this.appId = appId;
+ this.shuffleId = shuffleId;
+ this.stageAttemptNumber = stageAttemptNumber;
+ this.shuffleServerInfos = shuffleServerInfos;
+ this.exception = exception;
+ }
+
+ public ReportShuffleWriteFailureRequest toProto() {
+ List<ShuffleServerId> shuffleServerIds = Lists.newArrayList();
+ for (ShuffleServerInfo shuffleServerInfo : shuffleServerInfos) {
+ shuffleServerIds.add(
+ ShuffleServerId.newBuilder()
+ .setId(shuffleServerInfo.getId())
+ .setPort(shuffleServerInfo.getGrpcPort())
+ .setIp(shuffleServerInfo.getHost())
+ .build());
+ }
+ ReportShuffleWriteFailureRequest.Builder builder =
+ ReportShuffleWriteFailureRequest.newBuilder();
+ builder
+ .setAppId(appId)
+ .setShuffleId(shuffleId)
+ .setStageAttemptNumber(stageAttemptNumber)
+ .addAllShuffleServerIds(shuffleServerIds);
+ if (exception != null) {
+ builder.setException(exception);
+ }
+ return builder.build();
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteFailureResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteFailureResponse.java
new file mode 100644
index 000000000..4f4d7b3be
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteFailureResponse.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleWriteFailureResponse;
+
+public class RssReportShuffleWriteFailureResponse extends ClientResponse {
+ private boolean reSubmitWholeStage;
+
+ public RssReportShuffleWriteFailureResponse(StatusCode code, String msg,
boolean recomputeStage) {
+ super(code, msg);
+ this.reSubmitWholeStage = recomputeStage;
+ }
+
+ public boolean getReSubmitWholeStage() {
+ return this.reSubmitWholeStage;
+ }
+
+ public ReportShuffleWriteFailureResponse toProto() {
+ ReportShuffleWriteFailureResponse.Builder builder =
+ ReportShuffleWriteFailureResponse.newBuilder();
+ return builder
+ .setStatus(getStatusCode().toProto())
+ .setMsg(getMessage())
+ .setReSubmitWholeStage(reSubmitWholeStage)
+ .build();
+ }
+
+ public static RssReportShuffleWriteFailureResponse fromProto(
+ ReportShuffleWriteFailureResponse response) {
+ return new RssReportShuffleWriteFailureResponse(
+ // todo: [issue#780] add fromProto for StatusCode issue
+ StatusCode.valueOf(response.getStatus().name()),
+ response.getMsg(),
+ response.getReSubmitWholeStage());
+ }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 31e29bdae..93bd2f67b 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -499,6 +499,8 @@ service ShuffleManager {
rpc reportShuffleFetchFailure (ReportShuffleFetchFailureRequest) returns
(ReportShuffleFetchFailureResponse);
// Gets the mapping between partitions and ShuffleServer from the
ShuffleManager server
rpc getPartitionToShufflerServer(PartitionToShuffleServerRequest) returns
(PartitionToShuffleServerResponse);
+ // Report write failures to ShuffleManager
+ rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns
(ReportShuffleWriteFailureResponse);
}
message ReportShuffleFetchFailureRequest {
@@ -534,3 +536,17 @@ message RemoteStorageInfo{
string path = 1;
map<string, string> confItems = 2;
}
+
+message ReportShuffleWriteFailureRequest {
+ string appId = 1;
+ int32 shuffleId = 2;
+ int32 stageAttemptNumber = 3;
+ repeated ShuffleServerId shuffleServerIds= 5;
+ string exception = 6;
+}
+
+message ReportShuffleWriteFailureResponse {
+ StatusCode status = 1;
+ bool reSubmitWholeStage = 2;
+ string msg = 3;
+}