This is an automated email from the ASF dual-hosted git repository.

jerrylei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1be07a916 [#825][part-4] feat(spark): Report write failures to 
ShuffleManager. (#1258)
1be07a916 is described below

commit 1be07a9160300a2cabb3cec433e62b44b9907a94
Author: yl09099 <[email protected]>
AuthorDate: Tue Oct 31 17:28:37 2023 +0800

    [#825][part-4] feat(spark): Report write failures to ShuffleManager. (#1258)
    
    ### What changes were proposed in this pull request?
    
    Send the write exception to the ShuffleServer to the ShuffleManager.
    
    ### Why are the changes needed?
    
    Fix: #825
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UT.
---
 .../manager/RssShuffleManagerInterface.java        |   7 +
 .../shuffle/manager/ShuffleManagerGrpcService.java | 158 +++++++++++++++++++++
 .../shuffle/manager/DummyRssShuffleManager.java    |   3 +
 .../apache/spark/shuffle/RssShuffleManager.java    |  12 ++
 .../apache/spark/shuffle/RssShuffleManager.java    |  12 ++
 .../uniffle/client/api/ShuffleManagerClient.java   |   5 +
 .../client/impl/grpc/ShuffleManagerGrpcClient.java |  17 +++
 .../RssReportShuffleWriteFailureRequest.java       |  70 +++++++++
 .../RssReportShuffleWriteFailureResponse.java      |  53 +++++++
 proto/src/main/proto/Rss.proto                     |  16 +++
 10 files changed, 353 insertions(+)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
index 34009cbad..4940277c8 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
@@ -63,4 +63,11 @@ public interface RssShuffleManagerInterface {
    * @return ShuffleHandleInfo
    */
   ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int shuffleId);
+
+  /**
+   * Add the shuffleServer that failed to write to the failure list
+   *
+   * @param shuffleServerId
+   */
+  void addFailuresShuffleServerInfos(String shuffleServerId);
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 5c4b8795c..a03c0439b 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -17,9 +17,12 @@
 
 package org.apache.uniffle.shuffle.manager;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
@@ -37,12 +40,86 @@ import 
org.apache.uniffle.proto.ShuffleManagerGrpc.ShuffleManagerImplBase;
 public class ShuffleManagerGrpcService extends ShuffleManagerImplBase {
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
   private final Map<Integer, RssShuffleStatus> shuffleStatus = 
JavaUtils.newConcurrentMap();
+  // The shuffleId mapping records the number of ShuffleServer write failures
+  private final Map<Integer, ShuffleServerFailureRecord> shuffleWrtieStatus =
+      JavaUtils.newConcurrentMap();
   private final RssShuffleManagerInterface shuffleManager;
 
   public ShuffleManagerGrpcService(RssShuffleManagerInterface shuffleManager) {
     this.shuffleManager = shuffleManager;
   }
 
+  @Override
+  public void reportShuffleWriteFailure(
+      RssProtos.ReportShuffleWriteFailureRequest request,
+      StreamObserver<RssProtos.ReportShuffleWriteFailureResponse> 
responseObserver) {
+    String appId = request.getAppId();
+    int shuffleId = request.getShuffleId();
+    int stageAttemptNumber = request.getStageAttemptNumber();
+    List<RssProtos.ShuffleServerId> shuffleServerIdsList = 
request.getShuffleServerIdsList();
+    RssProtos.StatusCode code;
+    boolean reSubmitWholeStage;
+    String msg;
+    if (!appId.equals(shuffleManager.getAppId())) {
+      msg =
+          String.format(
+              "got a wrong shuffle write failure report from appId: %s, 
expected appId: %s",
+              appId, shuffleManager.getAppId());
+      LOG.warn(msg);
+      code = RssProtos.StatusCode.INVALID_REQUEST;
+      reSubmitWholeStage = false;
+    } else {
+      Map<String, AtomicInteger> shuffleServerInfoIntegerMap = 
JavaUtils.newConcurrentMap();
+      List<ShuffleServerInfo> shuffleServerInfos =
+          ShuffleServerInfo.fromProto(shuffleServerIdsList);
+      shuffleServerInfos.forEach(
+          shuffleServerInfo -> {
+            shuffleServerInfoIntegerMap.put(shuffleServerInfo.getId(), new 
AtomicInteger(0));
+          });
+      ShuffleServerFailureRecord shuffleServerFailureRecord =
+          shuffleWrtieStatus.computeIfAbsent(
+              shuffleId,
+              key ->
+                  new ShuffleServerFailureRecord(shuffleServerInfoIntegerMap, 
stageAttemptNumber));
+      boolean resetflag =
+          
shuffleServerFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
+      if (resetflag) {
+        msg =
+            String.format(
+                "got an old stage(%d vs %d) shuffle write failure report, 
which should be impossible.",
+                shuffleServerFailureRecord.getStageAttempt(), 
stageAttemptNumber);
+        LOG.warn(msg);
+        code = RssProtos.StatusCode.INVALID_REQUEST;
+        reSubmitWholeStage = false;
+      } else {
+        code = RssProtos.StatusCode.SUCCESS;
+        // update the stage shuffleServer write failed count
+        boolean fetchFailureflag =
+            shuffleServerFailureRecord.incPartitionWriteFailure(
+                stageAttemptNumber, shuffleServerInfos, shuffleManager);
+        if (fetchFailureflag) {
+          reSubmitWholeStage = true;
+          msg =
+              String.format(
+                  "report shuffle write failure as maximum number(%d) of 
shuffle write is occurred",
+                  shuffleManager.getMaxFetchFailures());
+        } else {
+          reSubmitWholeStage = false;
+          msg = "don't report shuffle write failure";
+        }
+      }
+    }
+
+    RssProtos.ReportShuffleWriteFailureResponse reply =
+        RssProtos.ReportShuffleWriteFailureResponse.newBuilder()
+            .setStatus(code)
+            .setReSubmitWholeStage(reSubmitWholeStage)
+            .setMsg(msg)
+            .build();
+    responseObserver.onNext(reply);
+    responseObserver.onCompleted();
+  }
+
   @Override
   public void reportShuffleFetchFailure(
       RssProtos.ReportShuffleFetchFailureRequest request,
@@ -163,6 +240,87 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
     shuffleStatus.remove(shuffleId);
   }
 
+  private static class ShuffleServerFailureRecord {
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+    private final ReentrantReadWriteLock.WriteLock writeLock = 
lock.writeLock();
+    private final Map<String, AtomicInteger> shuffleServerFailureRecordCount;
+    private int stageAttemptNumber;
+
+    private ShuffleServerFailureRecord(
+        Map<String, AtomicInteger> shuffleServerFailureRecordCount, int 
stageAttemptNumber) {
+      this.shuffleServerFailureRecordCount = shuffleServerFailureRecordCount;
+      this.stageAttemptNumber = stageAttemptNumber;
+    }
+
+    private <T> T withReadLock(Supplier<T> fn) {
+      readLock.lock();
+      try {
+        return fn.get();
+      } finally {
+        readLock.unlock();
+      }
+    }
+
+    private <T> T withWriteLock(Supplier<T> fn) {
+      writeLock.lock();
+      try {
+        return fn.get();
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    public int getStageAttempt() {
+      return withReadLock(() -> this.stageAttemptNumber);
+    }
+
+    public boolean resetStageAttemptIfNecessary(int stageAttemptNumber) {
+      return withWriteLock(
+          () -> {
+            if (this.stageAttemptNumber < stageAttemptNumber) {
+              // a new stage attempt is issued. Record the shuffleServer 
status of the Map should be
+              // clear and reset.
+              shuffleServerFailureRecordCount.clear();
+              this.stageAttemptNumber = stageAttemptNumber;
+              return false;
+            } else if (this.stageAttemptNumber > stageAttemptNumber) {
+              return true;
+            }
+            return false;
+          });
+    }
+
+    public boolean incPartitionWriteFailure(
+        int stageAttemptNumber,
+        List<ShuffleServerInfo> shuffleServerInfos,
+        RssShuffleManagerInterface shuffleManager) {
+      return withWriteLock(
+          () -> {
+            if (this.stageAttemptNumber != stageAttemptNumber) {
+              // do nothing here
+              return false;
+            }
+            shuffleServerInfos.forEach(
+                shuffleServerInfo -> {
+                  shuffleServerFailureRecordCount
+                      .computeIfAbsent(shuffleServerInfo.getId(), k -> new 
AtomicInteger())
+                      .incrementAndGet();
+                });
+            List<Map.Entry<String, AtomicInteger>> list =
+                new ArrayList(shuffleServerFailureRecordCount.entrySet());
+            Collections.sort(list, (o1, o2) -> (o1.getValue().get() - 
o2.getValue().get()));
+            Map.Entry<String, AtomicInteger> shuffleServerInfoIntegerEntry = 
list.get(0);
+            if (shuffleServerInfoIntegerEntry.getValue().get()
+                > shuffleManager.getMaxFetchFailures()) {
+              
shuffleManager.addFailuresShuffleServerInfos(shuffleServerInfoIntegerEntry.getKey());
+              return true;
+            }
+            return false;
+          });
+    }
+  }
+
   private static class RssShuffleStatus {
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
diff --git 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
index dfd4b69a1..24980afbf 100644
--- 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
+++ 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
@@ -54,4 +54,7 @@ public class DummyRssShuffleManager implements 
RssShuffleManagerInterface {
   public ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int shuffleId) {
     return null;
   }
+
+  @Override
+  public void addFailuresShuffleServerInfos(String shuffleServerId) {}
 }
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index dcebe11f8..2cdf8636c 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -123,6 +123,8 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
       JavaUtils.newConcurrentMap();
   /** Whether to enable the dynamic shuffleServer function rewrite and reread 
functions */
   private boolean rssResubmitStage;
+  /** A list of shuffleServer for Write failures */
+  private Set<String> failuresShuffleServerIds = Sets.newHashSet();
 
   public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
     if (sparkConf.getBoolean("spark.sql.adaptive.enabled", false)) {
@@ -794,4 +796,14 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
             rpcPartitionToShufflerServer.getRemoteStorageInfo());
     return shuffleHandleInfo;
   }
+
+  /**
+   * Add the shuffleServer that failed to write to the failure list
+   *
+   * @param shuffleServerId
+   */
+  @Override
+  public void addFailuresShuffleServerInfos(String shuffleServerId) {
+    failuresShuffleServerIds.add(shuffleServerId);
+  }
 }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d9e730e7e..3ee108fd3 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -134,6 +134,8 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
   private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
   /** Whether to enable the dynamic shuffleServer function rewrite and reread 
functions */
   private boolean rssResubmitStage;
+  /** A list of shuffleServer for Write failures */
+  private Set<String> failuresShuffleServerIds;
 
   public RssShuffleManager(SparkConf conf, boolean isDriver) {
     this.sparkConf = conf;
@@ -1092,4 +1094,14 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
             rpcPartitionToShufflerServer.getRemoteStorageInfo());
     return shuffleHandleInfo;
   }
+
+  /**
+   * Add the shuffleServer that failed to write to the failure list
+   *
+   * @param shuffleServerId
+   */
+  @Override
+  public void addFailuresShuffleServerInfos(String shuffleServerId) {
+    failuresShuffleServerIds.add(shuffleServerId);
+  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index 997a8e0bc..cd2d79569 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -21,8 +21,10 @@ import java.io.Closeable;
 
 import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
 import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
 import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
 import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
 
 public interface ShuffleManagerClient extends Closeable {
   RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(
@@ -36,4 +38,7 @@ public interface ShuffleManagerClient extends Closeable {
    */
   RssPartitionToShuffleServerResponse getPartitionToShufflerServer(
       RssPartitionToShuffleServerRequest req);
+
+  RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
+      RssReportShuffleWriteFailureRequest req);
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index 56962c885..8708a9b42 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -25,8 +25,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.ShuffleManagerClient;
 import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
 import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
 import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
 import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.proto.RssProtos;
@@ -88,4 +90,19 @@ public class ShuffleManagerGrpcClient extends GrpcClient 
implements ShuffleManag
         
RssPartitionToShuffleServerResponse.fromProto(partitionToShufflerServer);
     return rssPartitionToShuffleServerResponse;
   }
+
+  @Override
+  public RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
+      RssReportShuffleWriteFailureRequest request) {
+    RssProtos.ReportShuffleWriteFailureRequest protoRequest = 
request.toProto();
+    try {
+      RssProtos.ReportShuffleWriteFailureResponse response =
+          getBlockingStub().reportShuffleWriteFailure(protoRequest);
+      return RssReportShuffleWriteFailureResponse.fromProto(response);
+    } catch (Exception e) {
+      String msg = "Report shuffle fetch failure to host:port[" + host + ":" + 
port + "] failed";
+      LOG.warn(msg, e);
+      throw new RssException(msg, e);
+    }
+  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteFailureRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteFailureRequest.java
new file mode 100644
index 000000000..c05176769
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteFailureRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleWriteFailureRequest;
+import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
+
+public class RssReportShuffleWriteFailureRequest {
+  private String appId;
+  private int shuffleId;
+  private int stageAttemptNumber;
+  private List<ShuffleServerInfo> shuffleServerInfos;
+  private String exception;
+
+  public RssReportShuffleWriteFailureRequest(
+      String appId,
+      int shuffleId,
+      int stageAttemptNumber,
+      List<ShuffleServerInfo> shuffleServerInfos,
+      String exception) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.stageAttemptNumber = stageAttemptNumber;
+    this.shuffleServerInfos = shuffleServerInfos;
+    this.exception = exception;
+  }
+
+  public ReportShuffleWriteFailureRequest toProto() {
+    List<ShuffleServerId> shuffleServerIds = Lists.newArrayList();
+    for (ShuffleServerInfo shuffleServerInfo : shuffleServerInfos) {
+      shuffleServerIds.add(
+          ShuffleServerId.newBuilder()
+              .setId(shuffleServerInfo.getId())
+              .setPort(shuffleServerInfo.getGrpcPort())
+              .setIp(shuffleServerInfo.getHost())
+              .build());
+    }
+    ReportShuffleWriteFailureRequest.Builder builder =
+        ReportShuffleWriteFailureRequest.newBuilder();
+    builder
+        .setAppId(appId)
+        .setShuffleId(shuffleId)
+        .setStageAttemptNumber(stageAttemptNumber)
+        .addAllShuffleServerIds(shuffleServerIds);
+    if (exception != null) {
+      builder.setException(exception);
+    }
+    return builder.build();
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteFailureResponse.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteFailureResponse.java
new file mode 100644
index 000000000..4f4d7b3be
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteFailureResponse.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleWriteFailureResponse;
+
+public class RssReportShuffleWriteFailureResponse extends ClientResponse {
+  private boolean reSubmitWholeStage;
+
+  public RssReportShuffleWriteFailureResponse(StatusCode code, String msg, 
boolean recomputeStage) {
+    super(code, msg);
+    this.reSubmitWholeStage = recomputeStage;
+  }
+
+  public boolean getReSubmitWholeStage() {
+    return this.reSubmitWholeStage;
+  }
+
+  public ReportShuffleWriteFailureResponse toProto() {
+    ReportShuffleWriteFailureResponse.Builder builder =
+        ReportShuffleWriteFailureResponse.newBuilder();
+    return builder
+        .setStatus(getStatusCode().toProto())
+        .setMsg(getMessage())
+        .setReSubmitWholeStage(reSubmitWholeStage)
+        .build();
+  }
+
+  public static RssReportShuffleWriteFailureResponse fromProto(
+      ReportShuffleWriteFailureResponse response) {
+    return new RssReportShuffleWriteFailureResponse(
+        // todo: [issue#780] add fromProto for StatusCode issue
+        StatusCode.valueOf(response.getStatus().name()),
+        response.getMsg(),
+        response.getReSubmitWholeStage());
+  }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 31e29bdae..93bd2f67b 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -499,6 +499,8 @@ service ShuffleManager {
   rpc reportShuffleFetchFailure (ReportShuffleFetchFailureRequest) returns 
(ReportShuffleFetchFailureResponse);
   // Gets the mapping between partitions and ShuffleServer from the 
ShuffleManager server
   rpc getPartitionToShufflerServer(PartitionToShuffleServerRequest) returns 
(PartitionToShuffleServerResponse);
+  // Report write failures to ShuffleManager
+  rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns 
(ReportShuffleWriteFailureResponse);
 }
 
 message ReportShuffleFetchFailureRequest {
@@ -534,3 +536,17 @@ message RemoteStorageInfo{
   string path = 1;
   map<string, string> confItems = 2;
 }
+
+message ReportShuffleWriteFailureRequest {
+  string appId = 1;
+  int32 shuffleId = 2;
+  int32 stageAttemptNumber = 3;
+  repeated ShuffleServerId shuffleServerIds= 5;
+  string exception = 6;
+}
+
+message ReportShuffleWriteFailureResponse {
+  StatusCode status = 1;
+  bool reSubmitWholeStage = 2;
+  string msg = 3;
+}

Reply via email to