This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 38dcff19a [#1608] feat(spark3): Ensure the compatiblity of reassign 
and stageRetry (#1783)
38dcff19a is described below

commit 38dcff19a8e9cec615bc8074ff1ebabecd57cdf9
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jun 19 13:29:46 2024 +0800

    [#1608] feat(spark3): Ensure the compatiblity of reassign and stageRetry 
(#1783)
    
    ### What changes were proposed in this pull request?
    
    Ensure the compatiblity of reassign and stageRetry.
    
    ### Why are the changes needed?
    
    To improve the job stability if having reassign and stage retry.
    For #1608
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
---
 .../shuffle/manager/RssShuffleManagerBase.java     | 34 ++++++++---
 .../apache/spark/shuffle/RssShuffleManager.java    | 11 ++--
 .../apache/spark/shuffle/RssShuffleManager.java    | 17 +++---
 .../uniffle/test/ReassignAndStageRetryTest.java    | 71 ++++++++++++++++++++++
 .../server/MockedShuffleServerGrpcService.java     | 15 +++++
 5 files changed, 123 insertions(+), 25 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 197035e3a..25acf1f0f 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -117,7 +117,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
 
   protected boolean blockIdSelfManagedEnabled;
 
-  protected boolean taskBlockSendFailureRetryEnabled;
+  protected boolean partitionReassignEnabled;
 
   protected boolean shuffleManagerRpcServiceEnabled;
 
@@ -638,7 +638,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
   @Override
   public int getMaxFetchFailures() {
     final String TASK_MAX_FAILURE = "spark.task.maxFailures";
-    return Math.max(1, sparkConf.getInt(TASK_MAX_FAILURE, 4) - 1);
+    return Math.max(0, sparkConf.getInt(TASK_MAX_FAILURE, 4) - 1);
   }
 
   /**
@@ -701,6 +701,10 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
             (StageAttemptShuffleHandleInfo) 
shuffleHandleInfoManager.get(shuffleId);
         
stageAttemptShuffleHandleInfo.replaceCurrentShuffleHandleInfo(shuffleHandleInfo);
         rssStageResubmitManager.recordAndGetServerAssignedInfo(shuffleId, 
stageIdAndAttempt, true);
+        LOG.info(
+            "The stage retry has been triggered successfully for the stageId: 
{}, attemptNumber: {}",
+            stageId,
+            stageAttemptNumber);
         return true;
       } else {
         LOG.info(
@@ -720,12 +724,22 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
       int shuffleId,
       Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
     long startTime = System.currentTimeMillis();
-    MutableShuffleHandleInfo handleInfo =
-        (MutableShuffleHandleInfo) shuffleHandleInfoManager.get(shuffleId);
-    synchronized (handleInfo) {
+    ShuffleHandleInfo handleInfo = shuffleHandleInfoManager.get(shuffleId);
+    MutableShuffleHandleInfo internalHandle = null;
+    if (handleInfo instanceof MutableShuffleHandleInfo) {
+      internalHandle = (MutableShuffleHandleInfo) handleInfo;
+    } else if (handleInfo instanceof StageAttemptShuffleHandleInfo) {
+      internalHandle =
+          (MutableShuffleHandleInfo) ((StageAttemptShuffleHandleInfo) 
handleInfo).getCurrent();
+    }
+    if (internalHandle == null) {
+      throw new RssException(
+          "An unexpected error occurred: internalHandle is null, which should 
not happen");
+    }
+    synchronized (internalHandle) {
       // If the reassignment servers for one partition exceeds the max 
reassign server num,
       // it should fast fail.
-      handleInfo.checkPartitionReassignServerNum(
+      internalHandle.checkPartitionReassignServerNum(
           partitionToFailureServers.keySet(), partitionReassignMaxServerNum);
 
       Map<ShuffleServerInfo, List<PartitionRange>> newServerToPartitions = new 
HashMap<>();
@@ -740,10 +754,10 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
           String serverId = receivingFailureServer.getServerId();
 
           boolean serverHasReplaced = false;
-          Set<ShuffleServerInfo> replacements = 
handleInfo.getReplacements(serverId);
+          Set<ShuffleServerInfo> replacements = 
internalHandle.getReplacements(serverId);
           if (CollectionUtils.isEmpty(replacements)) {
             final int requiredServerNum = 1;
-            Set<String> excludedServers = new 
HashSet<>(handleInfo.listExcludedServers());
+            Set<String> excludedServers = new 
HashSet<>(internalHandle.listExcludedServers());
             excludedServers.add(serverId);
             replacements =
                 reassignServerForTask(
@@ -759,7 +773,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
           }
 
           Set<ShuffleServerInfo> updatedReassignServers =
-              handleInfo.updateAssignment(partitionId, serverId, replacements);
+              internalHandle.updateAssignment(partitionId, serverId, 
replacements);
 
           reassignResult
               .computeIfAbsent(serverId, x -> new HashMap<>())
@@ -788,7 +802,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
           System.currentTimeMillis() - startTime,
           reassignResult);
 
-      return handleInfo;
+      return internalHandle;
     }
   }
 
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index eb48b2f38..974b99862 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -168,11 +168,10 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     this.rssResubmitStage =
         rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
             && RssSparkShuffleUtils.isStageResubmitSupported();
-    this.taskBlockSendFailureRetryEnabled =
-        rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
+    this.partitionReassignEnabled = 
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
     this.blockIdSelfManagedEnabled = 
rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
     this.shuffleManagerRpcServiceEnabled =
-        taskBlockSendFailureRetryEnabled || rssResubmitStage || 
blockIdSelfManagedEnabled;
+        partitionReassignEnabled || rssResubmitStage || 
blockIdSelfManagedEnabled;
     if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) {
       if (isDriver) {
         heartBeatScheduledExecutorService =
@@ -341,7 +340,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
       StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo =
           new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage, 
handleInfo);
       shuffleHandleInfoManager.register(shuffleId, 
stageAttemptShuffleHandleInfo);
-    } else if (shuffleManagerRpcServiceEnabled && 
taskBlockSendFailureRetryEnabled) {
+    } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
       ShuffleHandleInfo shuffleHandleInfo =
           new MutableShuffleHandleInfo(shuffleId, partitionToServers, 
remoteStorage);
       shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo);
@@ -410,7 +409,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
       if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
         // In Stage Retry mode, Get the ShuffleServer list from the Driver 
based on the shuffleId
         shuffleHandleInfo = 
getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
-      } else if (shuffleManagerRpcServiceEnabled && 
taskBlockSendFailureRetryEnabled) {
+      } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
         // In Block Retry mode, Get the ShuffleServer list from the Driver 
based on the shuffleId
         shuffleHandleInfo = 
getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
       } else {
@@ -483,7 +482,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
       if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
         // In Stage Retry mode, Get the ShuffleServer list from the Driver 
based on the shuffleId.
         shuffleHandleInfo = 
getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
-      } else if (shuffleManagerRpcServiceEnabled && 
taskBlockSendFailureRetryEnabled) {
+      } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
         // In Block Retry mode, Get the ShuffleServer list from the Driver 
based on the shuffleId
         shuffleHandleInfo = 
getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
       } else {
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 6354e17a2..9e6f2a26f 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -182,20 +182,19 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     this.rssResubmitStage =
         rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
             && RssSparkShuffleUtils.isStageResubmitSupported();
-    this.taskBlockSendFailureRetryEnabled =
-        rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
+    this.partitionReassignEnabled = 
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
 
     // The feature of partition reassign is exclusive with multiple replicas 
and stage retry.
-    if (taskBlockSendFailureRetryEnabled) {
-      if (rssResubmitStage || dataReplica > 1) {
+    if (partitionReassignEnabled) {
+      if (dataReplica > 1) {
         throw new RssException(
-            "The feature of partition reassign is incompatible with multiple 
replicas and stage retry.");
+            "The feature of task partition reassign is incompatible with 
multiple replicas mechanism.");
       }
     }
 
     this.blockIdSelfManagedEnabled = 
rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
     this.shuffleManagerRpcServiceEnabled =
-        taskBlockSendFailureRetryEnabled || rssResubmitStage || 
blockIdSelfManagedEnabled;
+        partitionReassignEnabled || rssResubmitStage || 
blockIdSelfManagedEnabled;
     if (isDriver) {
       heartBeatScheduledExecutorService =
           ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
@@ -455,7 +454,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
       StageAttemptShuffleHandleInfo handleInfo =
           new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage, 
shuffleHandleInfo);
       shuffleHandleInfoManager.register(shuffleId, handleInfo);
-    } else if (shuffleManagerRpcServiceEnabled && 
taskBlockSendFailureRetryEnabled) {
+    } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
       ShuffleHandleInfo shuffleHandleInfo =
           new MutableShuffleHandleInfo(shuffleId, partitionToServers, 
remoteStorage);
       shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo);
@@ -496,7 +495,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
       // In Stage Retry mode, Get the ShuffleServer list from the Driver based 
on the shuffleId.
       shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
-    } else if (shuffleManagerRpcServiceEnabled && 
taskBlockSendFailureRetryEnabled) {
+    } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
       // In Stage Retry mode, Get the ShuffleServer list from the Driver based 
on the shuffleId.
       shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
     } else {
@@ -640,7 +639,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
       // In Stage Retry mode, Get the ShuffleServer list from the Driver based 
on the shuffleId.
       shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
-    } else if (shuffleManagerRpcServiceEnabled && 
taskBlockSendFailureRetryEnabled) {
+    } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
       // In Block Retry mode, Get the ShuffleServer list from the Driver based 
on the shuffleId.
       shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
     } else {
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java
new file mode 100644
index 000000000..663a176c8
--- /dev/null
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.apache.spark.SparkConf;
+
+import org.apache.uniffle.server.MockedGrpcServer;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+
+import static 
org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES;
+import static 
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
+import static 
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
+import static 
org.apache.uniffle.client.util.RssClientConfig.RSS_RESUBMIT_STAGE;
+import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;
+
+/**
+ * This class is to test the compatibility of reassign and stage retry 
mechanism that were enabled
+ * at the same time.
+ */
+public class ReassignAndStageRetryTest extends 
PartitionBlockDataReassignMultiTimesTest {
+
+  @Override
+  public void updateSparkConfCustomer(SparkConf sparkConf) {
+    sparkConf.set("spark.task.maxFailures", String.valueOf(1));
+    sparkConf.set("spark." + RSS_RESUBMIT_STAGE, "true");
+
+    sparkConf.set("spark.sql.shuffle.partitions", "4");
+    sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2");
+    sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1");
+    sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
+    sparkConf.set("spark." + 
RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES.key(), "1");
+
+    // simulate the grpc servers has different free memory
+    // and make the assign priority seq: g1 -> g2 -> g3
+    ShuffleServer g1 = grpcShuffleServers.get(0);
+    ShuffleBufferManager bufferManager = g1.getShuffleBufferManager();
+    bufferManager.setUsedMemory(bufferManager.getCapacity() - 3000000);
+    g1.sendHeartbeat();
+
+    ShuffleServer g2 = grpcShuffleServers.get(1);
+    bufferManager = g2.getShuffleBufferManager();
+    bufferManager.setUsedMemory(bufferManager.getCapacity() - 2000000);
+    g2.sendHeartbeat();
+
+    ShuffleServer g3 = grpcShuffleServers.get(2);
+    bufferManager = g3.getShuffleBufferManager();
+    bufferManager.setUsedMemory(bufferManager.getCapacity() - 1000000);
+    g3.sendHeartbeat();
+
+    // This will make the partition of g1 reassign to g2 servers.
+    ((MockedGrpcServer) 
g1.getServer()).getService().setMockSendDataFailedStageNumber(0);
+    // And then reassign to g3. But reassign max times reaches due to max 
reassign times.
+    ((MockedGrpcServer) 
g2.getServer()).getService().setMockSendDataFailedStageNumber(0);
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
 
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
index d0d77f60d..a25e1fd45 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
@@ -44,6 +44,7 @@ public class MockedShuffleServerGrpcService extends 
ShuffleServerGrpcService {
   private long mockedTimeout = -1L;
 
   private boolean mockSendDataFailed = false;
+  private int mockSendDataFailedStageNumber = -1;
 
   private boolean mockRequireBufferFailedWithNoBuffer = false;
   private boolean isMockRequireBufferFailedWithNoBufferForHugePartition = 
false;
@@ -130,6 +131,12 @@ public class MockedShuffleServerGrpcService extends 
ShuffleServerGrpcService {
       LOG.info("Add a mocked sendData failed on sendShuffleData");
       throw new RuntimeException("This write request is failed as mocked 
failure!");
     }
+    if (mockSendDataFailedStageNumber == request.getStageAttemptNumber()) {
+      LOG.info(
+          "Add a mocked sendData failed on sendShuffleData with the stage 
number={}",
+          mockSendDataFailedStageNumber);
+      throw new RuntimeException("This write request is failed as mocked 
failure!");
+    }
     if (mockedTimeout > 0) {
       LOG.info("Add a mocked timeout on sendShuffleData");
       Uninterruptibles.sleepUninterruptibly(mockedTimeout, 
TimeUnit.MILLISECONDS);
@@ -286,4 +293,12 @@ public class MockedShuffleServerGrpcService extends 
ShuffleServerGrpcService {
     }
     super.getLocalShuffleIndex(request, responseObserver);
   }
+
+  public int getMockSendDataFailedStageNumber() {
+    return mockSendDataFailedStageNumber;
+  }
+
+  public void setMockSendDataFailedStageNumber(int 
mockSendDataFailedStageNumber) {
+    this.mockSendDataFailedStageNumber = mockSendDataFailedStageNumber;
+  }
 }

Reply via email to