This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5ef60aa0c [#825][part-6] feat(spark): Added logic that failed to send
ShuffleServer. (#1147)
5ef60aa0c is described below
commit 5ef60aa0c99f300a1ad6169bf1518bc6c1efe7dc
Author: yl09099 <[email protected]>
AuthorDate: Thu Nov 9 09:42:29 2023 +0800
[#825][part-6] feat(spark): Added logic that failed to send ShuffleServer.
(#1147)
### What changes were proposed in this pull request?
Added logic that failed to send ShuffleServer.
Ⅰ. Overall objective:
1. During the shuffle write phase, the ShuffleServer reports faulty nodes
and reallocates the ShuffleServer list;
2. Triggers a Stage level retry of SPARK. The shuffleServer node is
excluded and reallocated before the retry.
Ⅱ. Implementation logic diagram:

Ⅲ. As shown in the picture above:
1. During Shuffle registration, obtain the ShuffleServer list to be written
through the RPC interface of a Coordinator Client by following the solid blue
line step. The list is bound using ShuffleID.
2, the Task of Stage starts, solid steps, in accordance with the green by
ShuffleManager Client RPC interface gets to be written for
shuffleIdToShuffleHandleInfo ShuffleServer list;
3. In the Stage, if Task fails to write blocks to the ShuffleServer, press
the steps in red to report ShuffleServer to FailedShuffleServerList in
RSSShuffleManager through the RPC interface.
4. FailedShuffleServerList records the number of ShuffleServer failures.
After the number of failures reaches the maximum number of retries of the Task
level, follow the steps in dotted orange lines. Through the RPC interface of a
Coordinator Client, obtain the list of ShuffleServer files to be written (the
ShuffleServer files that fail to be written are excluded). After obtaining the
list, go to Step 5 of the dotted orange line. Throwing a FetchFailed Exception
triggers a stage-level [...]
5. Attempt 1 is generated by the SPARK Stage level again. Pull the
corresponding ShuffleServer list according to the green dotted line.
### Why are the changes needed?
Fix: #825
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
---
.../apache/spark/shuffle/RssShuffleManager.java | 6 +-
.../spark/shuffle/writer/RssShuffleWriter.java | 84 +++++++++++++++++++++-
.../apache/spark/shuffle/RssShuffleManager.java | 6 +-
.../spark/shuffle/writer/RssShuffleWriter.java | 82 ++++++++++++++++++++-
.../common/exception/RssSendFailedException.java | 32 +++++++++
.../common/exception/RssWaitFailedException.java | 32 +++++++++
6 files changed, 234 insertions(+), 8 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 2f7e5105b..b68a67d35 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -783,7 +783,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
* @param shuffleId shuffleId
* @return ShuffleHandleInfo
*/
- private ShuffleHandleInfo getRemoteShuffleHandleInfo(int shuffleId) {
+ private synchronized ShuffleHandleInfo getRemoteShuffleHandleInfo(int
shuffleId) {
ShuffleHandleInfo shuffleHandleInfo;
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
String driver = rssConf.getString("driver.host", "");
@@ -888,4 +888,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
return false;
}
}
+
+ public boolean isRssResubmitStage() {
+ return rssResubmitStage;
+ }
}
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index fabac5651..ac1e83ed0 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -17,12 +17,14 @@
package org.apache.spark.shuffle.writer;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -47,19 +49,32 @@ import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
+import org.apache.spark.shuffle.FetchFailedException;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.RssShuffleManager;
import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
import org.apache.spark.shuffle.ShuffleHandleInfo;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManagerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.request.RssReassignServersRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
+import org.apache.uniffle.client.response.RssReassignServersReponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssSendFailedException;
+import org.apache.uniffle.common.exception.RssWaitFailedException;
import org.apache.uniffle.storage.util.StorageType;
public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
@@ -91,6 +106,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private final Function<String, Boolean> taskFailureCallback;
private final Set<Long> blockIds = Sets.newConcurrentHashSet();
private TaskContext taskContext;
+ private SparkConf sparkConf;
public RssShuffleWriter(
String appId,
@@ -155,6 +171,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
isMemoryShuffleEnabled(sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key()));
this.taskFailureCallback = taskFailureCallback;
this.taskContext = context;
+ this.sparkConf = sparkConf;
}
public RssShuffleWriter(
@@ -217,7 +234,11 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
writeImpl(records);
} catch (Exception e) {
taskFailureCallback.apply(taskId);
- throw e;
+ if (shuffleManager.isRssResubmitStage()) {
+ throw throwFetchFailedIfNecessary(e);
+ } else {
+ throw e;
+ }
}
}
@@ -354,7 +375,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
+ failedBlockIds.size()
+ " blocks can't be sent to shuffle server.";
LOG.error(errorMsg);
- throw new RssException(errorMsg);
+ throw new RssSendFailedException(errorMsg);
}
// remove blockIds which was sent successfully, if there has none left,
all data are sent
@@ -374,7 +395,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
+ sendCheckTimeout
+ " ms.";
LOG.error(errorMsg);
- throw new RssException(errorMsg);
+ throw new RssWaitFailedException(errorMsg);
}
}
}
@@ -436,4 +457,61 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
protected ShuffleWriteMetrics getShuffleWriteMetrics() {
return shuffleWriteMetrics;
}
+
+ private static ShuffleManagerClient createShuffleManagerClient(String host,
int port)
+ throws IOException {
+ ClientType grpc = ClientType.GRPC;
+ // Host can be inferred from `spark.driver.bindAddress`, which would be
set when SparkContext is
+ // constructed.
+ return
ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(grpc,
host, port);
+ }
+
+ private RssException throwFetchFailedIfNecessary(Exception e) {
+ // The shuffleServer is registered only when a Block fails to be sent
+ if (e instanceof RssSendFailedException) {
+ Map<Long, BlockingQueue<ShuffleServerInfo>> failedBlockIds =
+ shuffleManager.getFailedBlockIdsWithShuffleServer(taskId);
+ List<ShuffleServerInfo> shuffleServerInfos = Lists.newArrayList();
+ for (Map.Entry<Long, BlockingQueue<ShuffleServerInfo>> longListEntry :
+ failedBlockIds.entrySet()) {
+ shuffleServerInfos.addAll(longListEntry.getValue());
+ }
+ RssReportShuffleWriteFailureRequest req =
+ new RssReportShuffleWriteFailureRequest(
+ appId,
+ shuffleId,
+ taskContext.stageAttemptNumber(),
+ shuffleServerInfos,
+ e.getMessage());
+ RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+ String driver = rssConf.getString("driver.host", "");
+ int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
+ try (ShuffleManagerClient shuffleManagerClient =
createShuffleManagerClient(driver, port)) {
+ RssReportShuffleWriteFailureResponse response =
+ shuffleManagerClient.reportShuffleWriteFailure(req);
+ if (response.getReSubmitWholeStage()) {
+ RssReassignServersRequest rssReassignServersRequest =
+ new RssReassignServersRequest(
+ taskContext.stageId(),
+ taskContext.stageAttemptNumber(),
+ shuffleId,
+ partitioner.numPartitions());
+ RssReassignServersReponse rssReassignServersReponse =
+
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
+ LOG.info(
+ "Whether the reassignment is successful: {}",
+ rssReassignServersReponse.isNeedReassign());
+ // since we are going to roll out the whole stage, mapIndex
shouldn't matter, hence -1 is
+ // provided.
+ FetchFailedException ffe =
+ RssSparkShuffleUtils.createFetchFailedException(
+ shuffleId, -1, taskContext.stageAttemptNumber(), e);
+ return new RssException(ffe);
+ }
+ } catch (IOException ioe) {
+ LOG.info("Error closing shuffle manager client with error:", ioe);
+ }
+ }
+ return new RssException(e);
+ }
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index ceac60063..b854b0bf7 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -1083,7 +1083,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
* @param shuffleId shuffleId
* @return ShuffleHandleInfo
*/
- private ShuffleHandleInfo getRemoteShuffleHandleInfo(int shuffleId) {
+ private synchronized ShuffleHandleInfo getRemoteShuffleHandleInfo(int
shuffleId) {
ShuffleHandleInfo shuffleHandleInfo;
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
String driver = rssConf.getString("driver.host", "");
@@ -1187,4 +1187,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
return false;
}
}
+
+ public boolean isRssResubmitStage() {
+ return rssResubmitStage;
+ }
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 1c8b89305..4929e650c 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -49,19 +49,32 @@ import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
+import org.apache.spark.shuffle.FetchFailedException;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.RssShuffleManager;
import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
import org.apache.spark.shuffle.ShuffleHandleInfo;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManagerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
+import org.apache.uniffle.client.request.RssReassignServersRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
+import org.apache.uniffle.client.response.RssReassignServersReponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssSendFailedException;
+import org.apache.uniffle.common.exception.RssWaitFailedException;
import org.apache.uniffle.storage.util.StorageType;
public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
@@ -91,6 +104,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private final Function<String, Boolean> taskFailureCallback;
private final Set<Long> blockIds = Sets.newConcurrentHashSet();
private TaskContext taskContext;
+ private SparkConf sparkConf;
/** used by columnar rss shuffle writer implementation */
protected final long taskAttemptId;
@@ -167,6 +181,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
isMemoryShuffleEnabled(sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key()));
this.taskFailureCallback = taskFailureCallback;
this.taskContext = context;
+ this.sparkConf = sparkConf;
}
public RssShuffleWriter(
@@ -221,7 +236,11 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
writeImpl(records);
} catch (Exception e) {
taskFailureCallback.apply(taskId);
- throw e;
+ if (shuffleManager.isRssResubmitStage()) {
+ throw throwFetchFailedIfNecessary(e);
+ } else {
+ throw e;
+ }
}
}
@@ -359,7 +378,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
+ sendCheckTimeout
+ " ms.";
LOG.error(errorMsg);
- throw new RssException(errorMsg);
+ throw new RssWaitFailedException(errorMsg);
}
} finally {
if (interrupted) {
@@ -379,7 +398,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
+ failedBlockIds.size()
+ " blocks can't be sent to shuffle server.";
LOG.error(errorMsg);
- throw new RssException(errorMsg);
+ throw new RssSendFailedException(errorMsg);
}
}
@@ -473,4 +492,61 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
public WriteBufferManager getBufferManager() {
return bufferManager;
}
+
+ private static ShuffleManagerClient createShuffleManagerClient(String host,
int port)
+ throws IOException {
+ ClientType grpc = ClientType.GRPC;
+ // Host can be inferred from `spark.driver.bindAddress`, which would be
set when SparkContext is
+ // constructed.
+ return
ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(grpc,
host, port);
+ }
+
+ private RssException throwFetchFailedIfNecessary(Exception e) {
+ // The shuffleServer is registered only when a Block fails to be sent
+ if (e instanceof RssSendFailedException) {
+ Map<Long, BlockingQueue<ShuffleServerInfo>> failedBlockIds =
+ shuffleManager.getFailedBlockIdsWithShuffleServer(taskId);
+ List<ShuffleServerInfo> shuffleServerInfos = Lists.newArrayList();
+ for (Map.Entry<Long, BlockingQueue<ShuffleServerInfo>> longListEntry :
+ failedBlockIds.entrySet()) {
+ shuffleServerInfos.addAll(longListEntry.getValue());
+ }
+ RssReportShuffleWriteFailureRequest req =
+ new RssReportShuffleWriteFailureRequest(
+ appId,
+ shuffleId,
+ taskContext.stageAttemptNumber(),
+ shuffleServerInfos,
+ e.getMessage());
+ RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+ String driver = rssConf.getString("driver.host", "");
+ int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
+ try (ShuffleManagerClient shuffleManagerClient =
createShuffleManagerClient(driver, port)) {
+ RssReportShuffleWriteFailureResponse response =
+ shuffleManagerClient.reportShuffleWriteFailure(req);
+ if (response.getReSubmitWholeStage()) {
+ RssReassignServersRequest rssReassignServersRequest =
+ new RssReassignServersRequest(
+ taskContext.stageId(),
+ taskContext.stageAttemptNumber(),
+ shuffleId,
+ partitioner.numPartitions());
+ RssReassignServersReponse rssReassignServersReponse =
+
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
+ LOG.info(
+ "Whether the reassignment is successful: {}",
+ rssReassignServersReponse.isNeedReassign());
+ // since we are going to roll out the whole stage, mapIndex
shouldn't matter, hence -1 is
+ // provided.
+ FetchFailedException ffe =
+ RssSparkShuffleUtils.createFetchFailedException(
+ shuffleId, -1, taskContext.stageAttemptNumber(), e);
+ return new RssException(ffe);
+ }
+ } catch (IOException ioe) {
+ LOG.info("Error closing shuffle manager client with error:", ioe);
+ }
+ }
+ return new RssException(e);
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/exception/RssSendFailedException.java
b/common/src/main/java/org/apache/uniffle/common/exception/RssSendFailedException.java
new file mode 100644
index 000000000..a1f4679ad
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/exception/RssSendFailedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.exception;
+
+public class RssSendFailedException extends RuntimeException {
+ public RssSendFailedException(String message) {
+ super(message);
+ }
+
+ public RssSendFailedException(Throwable e) {
+ super(e);
+ }
+
+ public RssSendFailedException(String message, Throwable e) {
+ super(message, e);
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/exception/RssWaitFailedException.java
b/common/src/main/java/org/apache/uniffle/common/exception/RssWaitFailedException.java
new file mode 100644
index 000000000..147727e66
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/exception/RssWaitFailedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.exception;
+
+public class RssWaitFailedException extends RuntimeException {
+ public RssWaitFailedException(String message) {
+ super(message);
+ }
+
+ public RssWaitFailedException(Throwable e) {
+ super(e);
+ }
+
+ public RssWaitFailedException(String message, Throwable e) {
+ super(message, e);
+ }
+}