This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new acc0e6a03 [#825][part-7] feat(spark): Write Stage resubmit and dynamic 
shuffle server assign integration tests. (#1148)
acc0e6a03 is described below

commit acc0e6a038b1a5303e3e2b7778c0df4d58bd5546
Author: yl09099 <[email protected]>
AuthorDate: Tue Nov 14 18:13:15 2023 +0800

    [#825][part-7] feat(spark): Write Stage resubmit and dynamic shuffle server 
assign integration tests. (#1148)
    
    ### What changes were proposed in this pull request?
    
    Write Stage resubmit and dynamic shuffle server assign integration tests.
    
    Ⅰ. Overall objective:
    
    1. During the shuffle write phase, the ShuffleServer reports faulty nodes 
and reallocates the ShuffleServer list;
    2. Triggers a Stage level retry of SPARK. The shuffleServer node is 
excluded and reallocated before the retry.
    
    Ⅱ. Implementation logic diagram:
    
    
![image](https://github.com/apache/incubator-uniffle/assets/33595968/866c8292-e0ff-4532-b519-02f424f4c2fc)
    
    Ⅲ. As shown in the picture above:
    
    1. During Shuffle registration, obtain the ShuffleServer list to be written 
through the RPC interface of a Coordinator Client by following the solid blue 
line step. The list is bound using ShuffleID.
    2, the Task of Stage starts, solid steps, in accordance with the green by 
ShuffleManager Client RPC interface gets to be written for 
shuffleIdToShuffleHandleInfo ShuffleServer list;
    3. In the Stage, if Task fails to write blocks to the ShuffleServer, press 
the steps in red to report ShuffleServer to FailedShuffleServerList in 
RSSShuffleManager through the RPC interface.
    4. FailedShuffleServerList records the number of ShuffleServer failures. 
After the number of failures reaches the maximum number of retries of the Task 
level, follow the steps in dotted orange lines. Through the RPC interface of a 
Coordinator Client, obtain the list of ShuffleServer files to be written (the 
ShuffleServer files that fail to be written are excluded). After obtaining the 
list, go to Step 5 of the dotted orange line. Throwing a FetchFailed Exception 
triggers a stage-level [...]
    5. Attempt 1 is generated by the SPARK Stage level again. Pull the 
corresponding ShuffleServer list according to the green dotted line.
    
    ### Why are the changes needed?
    
    Fix: #825
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UT.
---
 .../test/RSSStageDynamicServerReWriteTest.java     | 117 +++++++++++++++++++++
 .../server/MockedShuffleServerGrpcService.java     |  10 ++
 2 files changed, 127 insertions(+)

diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
new file mode 100644
index 000000000..3e739690f
--- /dev/null
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.MockedGrpcServer;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class RSSStageDynamicServerReWriteTest extends SparkIntegrationTestBase 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RSSStageDynamicServerReWriteTest.class);
+
+  private static int maxTaskFailures = 3;
+
+  @BeforeAll
+  public static void setupServers(@TempDir File tmpDir) throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    Map<String, String> dynamicConf = Maps.newHashMap();
+    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
+    dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE.name());
+    addDynamicConf(coordinatorConf, dynamicConf);
+    createCoordinatorServer(coordinatorConf);
+    createServer(0, tmpDir, true);
+    createServer(1, tmpDir, false);
+    createServer(2, tmpDir, false);
+    startServers();
+  }
+
+  public static void createServer(int id, File tmpDir, boolean abnormalFlag) 
throws Exception {
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
+    shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
+    File dataDir1 = new File(tmpDir, id + "_1");
+    File dataDir2 = new File(tmpDir, id + "_2");
+    String basePath = dataDir1.getAbsolutePath() + "," + 
dataDir2.getAbsolutePath();
+    shuffleServerConf.setString("rss.storage.type", 
StorageType.MEMORY_LOCALFILE.name());
+    shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 
id);
+    shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
+    shuffleServerConf.setString("rss.storage.basePath", basePath);
+    if (abnormalFlag) {
+      createMockedShuffleServer(shuffleServerConf);
+      for (ShuffleServer shuffleServer : shuffleServers) {
+        // Set the sending block data timeout for the first shuffleServer
+        if (shuffleServer.getGrpcPort() == SHUFFLE_SERVER_PORT) {
+          ((MockedGrpcServer) shuffleServer.getServer())
+              .getService()
+              .enableMockSendDataFailed(true);
+        }
+      }
+    } else {
+      createShuffleServer(shuffleServerConf);
+    }
+  }
+
+  @Override
+  public Map runTest(SparkSession spark, String fileName) throws Exception {
+    List<Row> rows =
+        spark.range(0, 1000, 1, 
4).repartition(2).groupBy("id").count().collectAsList();
+    Map<String, Long> result = Maps.newHashMap();
+    for (Row row : rows) {
+      result.put(row.get(0).toString(), row.getLong(1));
+    }
+    return result;
+  }
+
+  @Override
+  protected SparkConf createSparkConf() {
+    return new SparkConf()
+        .setAppName(this.getClass().getSimpleName())
+        .setMaster(String.format("local[4,%d]", maxTaskFailures));
+  }
+
+  @Override
+  public void updateSparkConfCustomer(SparkConf sparkConf) {
+    sparkConf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_RESUBMIT_STAGE, "true");
+    sparkConf.set("spark.task.maxFailures", String.valueOf(maxTaskFailures));
+  }
+
+  @Test
+  public void testRSSStageResubmit() throws Exception {
+    run();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
 
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
index 28c0944ce..eafd83292 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
@@ -40,6 +40,8 @@ public class MockedShuffleServerGrpcService extends 
ShuffleServerGrpcService {
 
   private long mockedTimeout = -1L;
 
+  private boolean mockSendDataFailed = false;
+
   private boolean recordGetShuffleResult = false;
 
   private long numOfFailedReadRequest = 0;
@@ -49,6 +51,10 @@ public class MockedShuffleServerGrpcService extends 
ShuffleServerGrpcService {
     mockedTimeout = timeout;
   }
 
+  public void enableMockSendDataFailed(boolean mockSendDataFailed) {
+    this.mockSendDataFailed = mockSendDataFailed;
+  }
+
   public void enableRecordGetShuffleResult() {
     recordGetShuffleResult = true;
   }
@@ -74,6 +80,10 @@ public class MockedShuffleServerGrpcService extends 
ShuffleServerGrpcService {
   public void sendShuffleData(
       RssProtos.SendShuffleDataRequest request,
       StreamObserver<RssProtos.SendShuffleDataResponse> responseObserver) {
+    if (mockSendDataFailed) {
+      LOG.info("Add a mocked sendData failed on sendShuffleData");
+      throw new RuntimeException("This write request is failed as mocked 
failure!");
+    }
     if (mockedTimeout > 0) {
       LOG.info("Add a mocked timeout on sendShuffleData");
       Uninterruptibles.sleepUninterruptibly(mockedTimeout, 
TimeUnit.MILLISECONDS);

Reply via email to