This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new eb53a1a7b [#2725] feat(spark): Introduce optional fast-switch and 
ignore retry-count checking for stale assignment  (#2726)
eb53a1a7b is described below

commit eb53a1a7bb63dc0c7f1814e9772c1bb1e242d164
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Feb 10 19:37:13 2026 +0800

    [#2725] feat(spark): Introduce optional fast-switch and ignore retry-count 
checking for stale assignment  (#2726)
    
    ### What changes were proposed in this pull request?
    
    1.  Introduce option for stale assignment fast-switch. After having this, 
we could better to inspect some bugs if this mechanism is caused
    2. Ignore retry count checking for stale assignment to fix the multi server 
switch
    
    ### Why are the changes needed?
    
    for #2725
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests
---
 .../org/apache/spark/shuffle/RssSparkConfig.java   |  8 ++++
 .../apache/spark/shuffle/writer/DataPusher.java    | 49 ++++++++++++++++++----
 .../writer/OverlappingCompressionDataPusher.java   | 27 +++++++++++-
 .../shuffle/manager/RssShuffleManagerBase.java     |  6 ++-
 .../spark/shuffle/writer/DataPusherTest.java       |  9 +++-
 .../OverlappingCompressionDataPusherTest.java      | 11 ++++-
 .../spark/shuffle/writer/RssShuffleWriter.java     | 19 +++++++--
 7 files changed, 110 insertions(+), 19 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index be336ef33..48414ec57 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -190,6 +190,14 @@ public class RssSparkConfig {
           .defaultValue(1)
           .withDescription("The block retry max times when partition reassign 
is enabled.");
 
+  public static final ConfigOption<Boolean>
+      RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED =
+          
ConfigOptions.key("rss.client.reassign.staleAssignmentFastSwitchEnabled")
+              .booleanType()
+              .defaultValue(true)
+              .withDescription(
+                  "Whether to fast-switch the stale shuffle server assignment 
when pushing shuffle data. It can be enabled when partition reassign mechanism 
is enabled.");
+
   public static final ConfigOption<Boolean> 
RSS_CLIENT_MAP_SIDE_COMBINE_ENABLED =
       ConfigOptions.key("rss.client.mapSideCombine.enabled")
           .booleanType()
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
index 05dfce694..df13e0f39 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
@@ -30,9 +30,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.spark.shuffle.RssSparkConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,8 +44,9 @@ import org.apache.uniffle.client.impl.FailedBlockSendTracker;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.ThreadUtils;
 
 /**
@@ -64,13 +67,18 @@ public class DataPusher implements Closeable {
   // Must be thread safe
   private final Set<String> failedTaskIds;
 
+  // Whether to fast-switch for those stale assignment to avoid backpressure.
+  // This is only valid if partition-reassign is enabled and single replica is 
used.
+  private final boolean staleAssignmentFastSwitchEnabled;
+
   public DataPusher(
       ShuffleWriteClient shuffleWriteClient,
       Map<String, Set<Long>> taskToSuccessBlockIds,
       Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
       Set<String> failedTaskIds,
       int threadPoolSize,
-      int threadKeepAliveTime) {
+      int threadKeepAliveTime,
+      RssConf rssConf) {
     this.shuffleWriteClient = shuffleWriteClient;
     this.taskToSuccessBlockIds = taskToSuccessBlockIds;
     this.taskToFailedBlockSendTracker = taskToFailedBlockSendTracker;
@@ -83,6 +91,28 @@ public class DataPusher implements Closeable {
             TimeUnit.SECONDS,
             Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
             ThreadUtils.getThreadFactory(this.getClass().getName()));
+    this.staleAssignmentFastSwitchEnabled =
+        rssConf.get(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED)
+            && rssConf.get(
+                
RssSparkConfig.RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED);
+  }
+
+  @VisibleForTesting
+  public DataPusher(
+      ShuffleWriteClient shuffleWriteClient,
+      Map<String, Set<Long>> taskToSuccessBlockIds,
+      Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
+      Set<String> failedTaskIds,
+      int threadPoolSize,
+      int threadKeepAliveTime) {
+    this(
+        shuffleWriteClient,
+        taskToSuccessBlockIds,
+        taskToFailedBlockSendTracker,
+        failedTaskIds,
+        threadPoolSize,
+        threadKeepAliveTime,
+        new RssConf());
   }
 
   public CompletableFuture<Long> send(AddBlockEvent event) {
@@ -92,10 +122,8 @@ public class DataPusher implements Closeable {
     return CompletableFuture.supplyAsync(
             () -> {
               String taskId = event.getTaskId();
-              List<ShuffleBlockInfo> shuffleBlockInfoList = 
event.getShuffleDataInfoList();
-              // filter out the shuffle blocks with stale assignment
-              List<ShuffleBlockInfo> validBlocks =
-                  filterOutStaleAssignmentBlocks(taskId, shuffleBlockInfoList);
+              List<ShuffleBlockInfo> blocks = event.getShuffleDataInfoList();
+              List<ShuffleBlockInfo> validBlocks = 
filterOutStaleAssignmentBlocks(taskId, blocks);
               if (CollectionUtils.isEmpty(validBlocks)) {
                 return 0L;
               }
@@ -157,6 +185,9 @@ public class DataPusher implements Closeable {
    */
   private List<ShuffleBlockInfo> filterOutStaleAssignmentBlocks(
       String taskId, List<ShuffleBlockInfo> blocks) {
+    if (!staleAssignmentFastSwitchEnabled) {
+      return blocks;
+    }
     FailedBlockSendTracker staleBlockTracker = new FailedBlockSendTracker();
     List<ShuffleBlockInfo> validBlocks = new ArrayList<>();
     for (ShuffleBlockInfo block : blocks) {
@@ -165,9 +196,11 @@ public class DataPusher implements Closeable {
       if (servers == null || servers.size() != 1) {
         validBlocks.add(block);
       } else {
+        ShuffleServerInfo server = servers.get(0);
         if (block.isStaleAssignment()) {
-          staleBlockTracker.add(
-              block, block.getShuffleServerInfos().get(0), 
StatusCode.INTERNAL_ERROR);
+          // It means the block failed due to the stale assignment fast-switch 
when status code is
+          // null.
+          staleBlockTracker.add(block, server, null);
         } else {
           validBlocks.add(block);
         }
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
index 5328179b0..7a1218c1c 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.spark.shuffle.RssSparkConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,14 +50,16 @@ public class OverlappingCompressionDataPusher extends 
DataPusher {
       Set<String> failedTaskIds,
       int threadPoolSize,
       int threadKeepAliveTime,
-      int compressionThreads) {
+      int compressionThreads,
+      RssConf rssConf) {
     super(
         shuffleWriteClient,
         taskToSuccessBlockIds,
         taskToFailedBlockSendTracker,
         failedTaskIds,
         threadPoolSize,
-        threadKeepAliveTime);
+        threadKeepAliveTime,
+        rssConf);
     if (compressionThreads <= 0) {
       throw new RssException(
           "Invalid rss configuration of "
@@ -69,6 +72,26 @@ public class OverlappingCompressionDataPusher extends 
DataPusher {
             compressionThreads, 
ThreadUtils.getThreadFactory("compression-thread"));
   }
 
+  @VisibleForTesting
+  public OverlappingCompressionDataPusher(
+      ShuffleWriteClient shuffleWriteClient,
+      Map<String, Set<Long>> taskToSuccessBlockIds,
+      Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
+      Set<String> failedTaskIds,
+      int threadPoolSize,
+      int threadKeepAliveTime,
+      int compressionThreads) {
+    this(
+        shuffleWriteClient,
+        taskToSuccessBlockIds,
+        taskToFailedBlockSendTracker,
+        failedTaskIds,
+        threadPoolSize,
+        threadKeepAliveTime,
+        compressionThreads,
+        new RssConf());
+  }
+
   @Override
   public CompletableFuture<Long> send(AddBlockEvent event) {
     // Step 1: process event data in a separate thread (e.g., trigger 
compression)
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index e0cd85929..1aad8500f 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -365,7 +365,8 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
                 failedTaskIds,
                 poolSize,
                 keepAliveTime,
-                threads);
+                threads,
+                rssConf);
         LOG.info(
             "Using {} with {} compression threads", 
dataPusher.getClass().getSimpleName(), threads);
       } else {
@@ -376,7 +377,8 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
                 taskToFailedBlockSendTracker,
                 failedTaskIds,
                 poolSize,
-                keepAliveTime);
+                keepAliveTime,
+                rssConf);
       }
     }
 
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
index 0bbf21fdf..eb357d9da 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
@@ -29,6 +29,7 @@ import java.util.function.Supplier;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.spark.shuffle.RssSparkConfig;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.client.common.ShuffleServerPushCostTracker;
@@ -38,6 +39,8 @@ import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.JavaUtils;
 
@@ -97,6 +100,9 @@ public class DataPusherTest {
     Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker = 
JavaUtils.newConcurrentMap();
     Set<String> failedTaskIds = new HashSet<>();
 
+    RssConf rssConf = new RssConf();
+    rssConf.set(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED, true);
+    
rssConf.set(RssSparkConfig.RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED,
 true);
     DataPusher dataPusher =
         new DataPusher(
             shuffleWriteClient,
@@ -104,7 +110,8 @@ public class DataPusherTest {
             taskToFailedBlockSendTracker,
             failedTaskIds,
             1,
-            2);
+            2,
+            rssConf);
     dataPusher.setRssAppId("testFilterOutStaleAssignmentBlocks");
 
     String taskId = "taskId1";
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
index ffa22771c..efdf9423b 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
@@ -26,12 +26,14 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import com.google.common.collect.Maps;
+import org.apache.spark.shuffle.RssSparkConfig;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.client.impl.FailedBlockSendTracker;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.JavaUtils;
@@ -51,6 +53,9 @@ public class OverlappingCompressionDataPusherTest {
     Set<String> failedTaskIds = new HashSet<>();
 
     RssConf rssConf = new RssConf();
+    rssConf.set(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED, true);
+    
rssConf.set(RssSparkConfig.RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED,
 true);
+
     int threads = 
rssConf.get(RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
 
     // case1: Illegal thread number of compression
@@ -64,7 +69,8 @@ public class OverlappingCompressionDataPusherTest {
               failedTaskIds,
               1,
               2,
-              threads);
+              threads,
+              rssConf);
         });
 
     // case2: Propagated into the underlying data pusher
@@ -76,7 +82,8 @@ public class OverlappingCompressionDataPusherTest {
             failedTaskIds,
             1,
             2,
-            1);
+            1,
+            rssConf);
     pusher.setRssAppId("testSend");
 
     String taskId = "taskId1";
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 4deef511d..75bd1fb76 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -631,12 +631,18 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     for (Long blockId : failedBlockIds) {
       List<TrackingBlockStatus> failedBlockStatus = 
failedTracker.getFailedBlockStatus(blockId);
       synchronized (failedBlockStatus) {
-        int retryIndex =
+        int retryCnt =
             failedBlockStatus.stream()
+                .filter(
+                    x -> {
+                      // If statusCode is null, the block was resent due to a 
stale assignment.
+                      // In this case, the retry count checking should be 
ignored.
+                      return x.getStatusCode() != null;
+                    })
                 .map(x -> x.getShuffleBlockInfo().getRetryCnt())
                 .max(Comparator.comparing(Integer::valueOf))
-                .get();
-        if (retryIndex >= blockFailSentRetryMaxTimes) {
+                .orElse(-1);
+        if (retryCnt >= blockFailSentRetryMaxTimes) {
           LOG.error(
               "Partial blocks for taskId: [{}] retry exceeding the max retry 
times: [{}]. Fast fail! faulty server list: {}",
               taskId,
@@ -862,7 +868,12 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       // clear the previous retry state of block
       clearFailedBlockState(block);
       final ShuffleBlockInfo newBlock = block;
-      newBlock.incrRetryCnt();
+      // if the status code is null, it means the block is resent due to stale 
assignment, not
+      // because of the block send failure. In this case, the retry count 
should not be increased;
+      // otherwise it may cause unexpected fast failure.
+      if (blockStatus.getStatusCode() != null) {
+        newBlock.incrRetryCnt();
+      }
       newBlock.reassignShuffleServers(Arrays.asList(replacement));
       resendCandidates.add(newBlock);
     }

Reply via email to