This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new acc0e6a03 [#825][part-7] feat(spark): Write Stage resubmit and dynamic
shuffle server assign integration tests. (#1148)
acc0e6a03 is described below
commit acc0e6a038b1a5303e3e2b7778c0df4d58bd5546
Author: yl09099 <[email protected]>
AuthorDate: Tue Nov 14 18:13:15 2023 +0800
[#825][part-7] feat(spark): Write Stage resubmit and dynamic shuffle server
assign integration tests. (#1148)
### What changes were proposed in this pull request?
Write Stage resubmit and dynamic shuffle server assign integration tests.
Ⅰ. Overall objective:
1. During the shuffle write phase, the ShuffleServer reports faulty nodes
and reallocates the ShuffleServer list;
2. Triggers a Stage level retry of SPARK. The shuffleServer node is
excluded and reallocated before the retry.
Ⅱ. Implementation logic diagram:

Ⅲ. As shown in the picture above:
1. During Shuffle registration, obtain the ShuffleServer list to be written
through the RPC interface of a Coordinator Client by following the solid blue
line step. The list is bound using ShuffleID.
2, the Task of Stage starts, solid steps, in accordance with the green by
ShuffleManager Client RPC interface gets to be written for
shuffleIdToShuffleHandleInfo ShuffleServer list;
3. In the Stage, if Task fails to write blocks to the ShuffleServer, press
the steps in red to report ShuffleServer to FailedShuffleServerList in
RSSShuffleManager through the RPC interface.
4. FailedShuffleServerList records the number of ShuffleServer failures.
After the number of failures reaches the maximum number of retries of the Task
level, follow the steps in dotted orange lines. Through the RPC interface of a
Coordinator Client, obtain the list of ShuffleServer files to be written (the
ShuffleServer files that fail to be written are excluded). After obtaining the
list, go to Step 5 of the dotted orange line. Throwing a FetchFailed Exception
triggers a stage-level [...]
5. Attempt 1 is generated by the SPARK Stage level again. Pull the
corresponding ShuffleServer list according to the green dotted line.
### Why are the changes needed?
Fix: #825
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
---
.../test/RSSStageDynamicServerReWriteTest.java | 117 +++++++++++++++++++++
.../server/MockedShuffleServerGrpcService.java | 10 ++
2 files changed, 127 insertions(+)
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
new file mode 100644
index 000000000..3e739690f
--- /dev/null
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.MockedGrpcServer;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class RSSStageDynamicServerReWriteTest extends SparkIntegrationTestBase
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RSSStageDynamicServerReWriteTest.class);
+
+ private static int maxTaskFailures = 3;
+
+ @BeforeAll
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ Map<String, String> dynamicConf = Maps.newHashMap();
+ dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
+ dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
+ addDynamicConf(coordinatorConf, dynamicConf);
+ createCoordinatorServer(coordinatorConf);
+ createServer(0, tmpDir, true);
+ createServer(1, tmpDir, false);
+ createServer(2, tmpDir, false);
+ startServers();
+ }
+
+ public static void createServer(int id, File tmpDir, boolean abnormalFlag)
throws Exception {
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
+ shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
+ File dataDir1 = new File(tmpDir, id + "_1");
+ File dataDir2 = new File(tmpDir, id + "_2");
+ String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
+ shuffleServerConf.setString("rss.storage.type",
StorageType.MEMORY_LOCALFILE.name());
+ shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT +
id);
+ shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
+ shuffleServerConf.setString("rss.storage.basePath", basePath);
+ if (abnormalFlag) {
+ createMockedShuffleServer(shuffleServerConf);
+ for (ShuffleServer shuffleServer : shuffleServers) {
+ // Set the sending block data timeout for the first shuffleServer
+ if (shuffleServer.getGrpcPort() == SHUFFLE_SERVER_PORT) {
+ ((MockedGrpcServer) shuffleServer.getServer())
+ .getService()
+ .enableMockSendDataFailed(true);
+ }
+ }
+ } else {
+ createShuffleServer(shuffleServerConf);
+ }
+ }
+
+ @Override
+ public Map runTest(SparkSession spark, String fileName) throws Exception {
+ List<Row> rows =
+ spark.range(0, 1000, 1,
4).repartition(2).groupBy("id").count().collectAsList();
+ Map<String, Long> result = Maps.newHashMap();
+ for (Row row : rows) {
+ result.put(row.get(0).toString(), row.getLong(1));
+ }
+ return result;
+ }
+
+ @Override
+ protected SparkConf createSparkConf() {
+ return new SparkConf()
+ .setAppName(this.getClass().getSimpleName())
+ .setMaster(String.format("local[4,%d]", maxTaskFailures));
+ }
+
+ @Override
+ public void updateSparkConfCustomer(SparkConf sparkConf) {
+ sparkConf.set(
+ RssSparkConfig.SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_RESUBMIT_STAGE, "true");
+ sparkConf.set("spark.task.maxFailures", String.valueOf(maxTaskFailures));
+ }
+
+ @Test
+ public void testRSSStageResubmit() throws Exception {
+ run();
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
index 28c0944ce..eafd83292 100644
---
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
+++
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
@@ -40,6 +40,8 @@ public class MockedShuffleServerGrpcService extends
ShuffleServerGrpcService {
private long mockedTimeout = -1L;
+ private boolean mockSendDataFailed = false;
+
private boolean recordGetShuffleResult = false;
private long numOfFailedReadRequest = 0;
@@ -49,6 +51,10 @@ public class MockedShuffleServerGrpcService extends
ShuffleServerGrpcService {
mockedTimeout = timeout;
}
+ public void enableMockSendDataFailed(boolean mockSendDataFailed) {
+ this.mockSendDataFailed = mockSendDataFailed;
+ }
+
public void enableRecordGetShuffleResult() {
recordGetShuffleResult = true;
}
@@ -74,6 +80,10 @@ public class MockedShuffleServerGrpcService extends
ShuffleServerGrpcService {
public void sendShuffleData(
RssProtos.SendShuffleDataRequest request,
StreamObserver<RssProtos.SendShuffleDataResponse> responseObserver) {
+ if (mockSendDataFailed) {
+ LOG.info("Add a mocked sendData failed on sendShuffleData");
+ throw new RuntimeException("This write request is failed as mocked
failure!");
+ }
if (mockedTimeout > 0) {
LOG.info("Add a mocked timeout on sendShuffleData");
Uninterruptibles.sleepUninterruptibly(mockedTimeout,
TimeUnit.MILLISECONDS);