This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new eb53a1a7b [#2725] feat(spark): Introduce optional fast-switch and
ignore retry-count checking for stale assignment (#2726)
eb53a1a7b is described below
commit eb53a1a7bb63dc0c7f1814e9772c1bb1e242d164
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Feb 10 19:37:13 2026 +0800
[#2725] feat(spark): Introduce optional fast-switch and ignore retry-count
checking for stale assignment (#2726)
### What changes were proposed in this pull request?
1. Introduce option for stale assignment fast-switch. After having this,
we could better to inspect some bugs if this mechanism is caused
2. Ignore retry count checking for stale assignment to fix the multi server
switch
### Why are the changes needed?
for #2725
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests
---
.../org/apache/spark/shuffle/RssSparkConfig.java | 8 ++++
.../apache/spark/shuffle/writer/DataPusher.java | 49 ++++++++++++++++++----
.../writer/OverlappingCompressionDataPusher.java | 27 +++++++++++-
.../shuffle/manager/RssShuffleManagerBase.java | 6 ++-
.../spark/shuffle/writer/DataPusherTest.java | 9 +++-
.../OverlappingCompressionDataPusherTest.java | 11 ++++-
.../spark/shuffle/writer/RssShuffleWriter.java | 19 +++++++--
7 files changed, 110 insertions(+), 19 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index be336ef33..48414ec57 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -190,6 +190,14 @@ public class RssSparkConfig {
.defaultValue(1)
.withDescription("The block retry max times when partition reassign
is enabled.");
+ public static final ConfigOption<Boolean>
+ RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED =
+
ConfigOptions.key("rss.client.reassign.staleAssignmentFastSwitchEnabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to fast-switch the stale shuffle server assignment
when pushing shuffle data. It can be enabled when partition reassign mechanism
is enabled.");
+
public static final ConfigOption<Boolean>
RSS_CLIENT_MAP_SIDE_COMBINE_ENABLED =
ConfigOptions.key("rss.client.mapSideCombine.enabled")
.booleanType()
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
index 05dfce694..df13e0f39 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
@@ -30,9 +30,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.spark.shuffle.RssSparkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,8 +44,9 @@ import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ThreadUtils;
/**
@@ -64,13 +67,18 @@ public class DataPusher implements Closeable {
// Must be thread safe
private final Set<String> failedTaskIds;
+ // Whether to fast-switch for those stale assignment to avoid backpressure.
+ // This is only valid if partition-reassign is enabled and single replica is
used.
+ private final boolean staleAssignmentFastSwitchEnabled;
+
public DataPusher(
ShuffleWriteClient shuffleWriteClient,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
Set<String> failedTaskIds,
int threadPoolSize,
- int threadKeepAliveTime) {
+ int threadKeepAliveTime,
+ RssConf rssConf) {
this.shuffleWriteClient = shuffleWriteClient;
this.taskToSuccessBlockIds = taskToSuccessBlockIds;
this.taskToFailedBlockSendTracker = taskToFailedBlockSendTracker;
@@ -83,6 +91,28 @@ public class DataPusher implements Closeable {
TimeUnit.SECONDS,
Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
ThreadUtils.getThreadFactory(this.getClass().getName()));
+ this.staleAssignmentFastSwitchEnabled =
+ rssConf.get(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED)
+ && rssConf.get(
+
RssSparkConfig.RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED);
+ }
+
+ @VisibleForTesting
+ public DataPusher(
+ ShuffleWriteClient shuffleWriteClient,
+ Map<String, Set<Long>> taskToSuccessBlockIds,
+ Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
+ Set<String> failedTaskIds,
+ int threadPoolSize,
+ int threadKeepAliveTime) {
+ this(
+ shuffleWriteClient,
+ taskToSuccessBlockIds,
+ taskToFailedBlockSendTracker,
+ failedTaskIds,
+ threadPoolSize,
+ threadKeepAliveTime,
+ new RssConf());
}
public CompletableFuture<Long> send(AddBlockEvent event) {
@@ -92,10 +122,8 @@ public class DataPusher implements Closeable {
return CompletableFuture.supplyAsync(
() -> {
String taskId = event.getTaskId();
- List<ShuffleBlockInfo> shuffleBlockInfoList =
event.getShuffleDataInfoList();
- // filter out the shuffle blocks with stale assignment
- List<ShuffleBlockInfo> validBlocks =
- filterOutStaleAssignmentBlocks(taskId, shuffleBlockInfoList);
+ List<ShuffleBlockInfo> blocks = event.getShuffleDataInfoList();
+ List<ShuffleBlockInfo> validBlocks =
filterOutStaleAssignmentBlocks(taskId, blocks);
if (CollectionUtils.isEmpty(validBlocks)) {
return 0L;
}
@@ -157,6 +185,9 @@ public class DataPusher implements Closeable {
*/
private List<ShuffleBlockInfo> filterOutStaleAssignmentBlocks(
String taskId, List<ShuffleBlockInfo> blocks) {
+ if (!staleAssignmentFastSwitchEnabled) {
+ return blocks;
+ }
FailedBlockSendTracker staleBlockTracker = new FailedBlockSendTracker();
List<ShuffleBlockInfo> validBlocks = new ArrayList<>();
for (ShuffleBlockInfo block : blocks) {
@@ -165,9 +196,11 @@ public class DataPusher implements Closeable {
if (servers == null || servers.size() != 1) {
validBlocks.add(block);
} else {
+ ShuffleServerInfo server = servers.get(0);
if (block.isStaleAssignment()) {
- staleBlockTracker.add(
- block, block.getShuffleServerInfos().get(0),
StatusCode.INTERNAL_ERROR);
+ // It means the block failed due to the stale assignment fast-switch
when status code is
+ // null.
+ staleBlockTracker.add(block, server, null);
} else {
validBlocks.add(block);
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
index 5328179b0..7a1218c1c 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.shuffle.RssSparkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,14 +50,16 @@ public class OverlappingCompressionDataPusher extends
DataPusher {
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime,
- int compressionThreads) {
+ int compressionThreads,
+ RssConf rssConf) {
super(
shuffleWriteClient,
taskToSuccessBlockIds,
taskToFailedBlockSendTracker,
failedTaskIds,
threadPoolSize,
- threadKeepAliveTime);
+ threadKeepAliveTime,
+ rssConf);
if (compressionThreads <= 0) {
throw new RssException(
"Invalid rss configuration of "
@@ -69,6 +72,26 @@ public class OverlappingCompressionDataPusher extends
DataPusher {
compressionThreads,
ThreadUtils.getThreadFactory("compression-thread"));
}
+ @VisibleForTesting
+ public OverlappingCompressionDataPusher(
+ ShuffleWriteClient shuffleWriteClient,
+ Map<String, Set<Long>> taskToSuccessBlockIds,
+ Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
+ Set<String> failedTaskIds,
+ int threadPoolSize,
+ int threadKeepAliveTime,
+ int compressionThreads) {
+ this(
+ shuffleWriteClient,
+ taskToSuccessBlockIds,
+ taskToFailedBlockSendTracker,
+ failedTaskIds,
+ threadPoolSize,
+ threadKeepAliveTime,
+ compressionThreads,
+ new RssConf());
+ }
+
@Override
public CompletableFuture<Long> send(AddBlockEvent event) {
// Step 1: process event data in a separate thread (e.g., trigger
compression)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index e0cd85929..1aad8500f 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -365,7 +365,8 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
failedTaskIds,
poolSize,
keepAliveTime,
- threads);
+ threads,
+ rssConf);
LOG.info(
"Using {} with {} compression threads",
dataPusher.getClass().getSimpleName(), threads);
} else {
@@ -376,7 +377,8 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
taskToFailedBlockSendTracker,
failedTaskIds,
poolSize,
- keepAliveTime);
+ keepAliveTime,
+ rssConf);
}
}
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
index 0bbf21fdf..eb357d9da 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
@@ -29,6 +29,7 @@ import java.util.function.Supplier;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.spark.shuffle.RssSparkConfig;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.client.common.ShuffleServerPushCostTracker;
@@ -38,6 +39,8 @@ import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.JavaUtils;
@@ -97,6 +100,9 @@ public class DataPusherTest {
Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker =
JavaUtils.newConcurrentMap();
Set<String> failedTaskIds = new HashSet<>();
+ RssConf rssConf = new RssConf();
+ rssConf.set(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED, true);
+
rssConf.set(RssSparkConfig.RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED,
true);
DataPusher dataPusher =
new DataPusher(
shuffleWriteClient,
@@ -104,7 +110,8 @@ public class DataPusherTest {
taskToFailedBlockSendTracker,
failedTaskIds,
1,
- 2);
+ 2,
+ rssConf);
dataPusher.setRssAppId("testFilterOutStaleAssignmentBlocks");
String taskId = "taskId1";
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
index ffa22771c..efdf9423b 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
@@ -26,12 +26,14 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import com.google.common.collect.Maps;
+import org.apache.spark.shuffle.RssSparkConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.JavaUtils;
@@ -51,6 +53,9 @@ public class OverlappingCompressionDataPusherTest {
Set<String> failedTaskIds = new HashSet<>();
RssConf rssConf = new RssConf();
+ rssConf.set(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED, true);
+
rssConf.set(RssSparkConfig.RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED,
true);
+
int threads =
rssConf.get(RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
// case1: Illegal thread number of compression
@@ -64,7 +69,8 @@ public class OverlappingCompressionDataPusherTest {
failedTaskIds,
1,
2,
- threads);
+ threads,
+ rssConf);
});
// case2: Propagated into the underlying data pusher
@@ -76,7 +82,8 @@ public class OverlappingCompressionDataPusherTest {
failedTaskIds,
1,
2,
- 1);
+ 1,
+ rssConf);
pusher.setRssAppId("testSend");
String taskId = "taskId1";
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 4deef511d..75bd1fb76 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -631,12 +631,18 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
for (Long blockId : failedBlockIds) {
List<TrackingBlockStatus> failedBlockStatus =
failedTracker.getFailedBlockStatus(blockId);
synchronized (failedBlockStatus) {
- int retryIndex =
+ int retryCnt =
failedBlockStatus.stream()
+ .filter(
+ x -> {
+ // If statusCode is null, the block was resent due to a
stale assignment.
+ // In this case, the retry count checking should be
ignored.
+ return x.getStatusCode() != null;
+ })
.map(x -> x.getShuffleBlockInfo().getRetryCnt())
.max(Comparator.comparing(Integer::valueOf))
- .get();
- if (retryIndex >= blockFailSentRetryMaxTimes) {
+ .orElse(-1);
+ if (retryCnt >= blockFailSentRetryMaxTimes) {
LOG.error(
"Partial blocks for taskId: [{}] retry exceeding the max retry
times: [{}]. Fast fail! faulty server list: {}",
taskId,
@@ -862,7 +868,12 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
// clear the previous retry state of block
clearFailedBlockState(block);
final ShuffleBlockInfo newBlock = block;
- newBlock.incrRetryCnt();
+ // if the status code is null, it means the block is resent due to stale
assignment, not
+ // because of the block send failure. In this case, the retry count
should not be increased;
+ // otherwise it may cause unexpected fast failure.
+ if (blockStatus.getStatusCode() != null) {
+ newBlock.incrRetryCnt();
+ }
newBlock.reassignShuffleServers(Arrays.asList(replacement));
resendCandidates.add(newBlock);
}