This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push: new f1323ab95 [#2536] feat(spark): Controllable concurrency for overlapping compression (#2541) f1323ab95 is described below commit f1323ab95034ef53e75e77283f110e096f7d6b6d Author: Junfan Zhang <zus...@apache.org> AuthorDate: Thu Jul 10 11:05:47 2025 +0800 [#2536] feat(spark): Controllable concurrency for overlapping compression (#2541) ### What changes were proposed in this pull request? Introduce extra the option to control the concurrency of overlapping compression. ### Why are the changes needed? for #2536 . Decoupling compression concurrency from the transfer pool is essential for resource isolation and performance stability, especially under multi-tenant or CPU-constrained environments. ### Does this PR introduce _any_ user-facing change? Yes. `rss.client.write.overlappingCompressionThreads=-1`. About overlapping compression related config options, I will add it into doc in another PR ### How was this patch tested? Unit tests. --- .../org/apache/spark/shuffle/RssSparkConfig.java | 7 ++ .../writer/OverlappingCompressionDataPusher.java | 89 ++++++++++++++++++++ .../shuffle/manager/RssShuffleManagerBase.java | 35 ++++++-- .../OverlappingCompressionDataPusherTest.java | 94 ++++++++++++++++++++++ 4 files changed, 217 insertions(+), 8 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java index 66b964ac9..4bffd227d 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java @@ -45,6 +45,13 @@ public class RssSparkConfig { .defaultValue(false) .withDescription("Whether to overlapping compress shuffle blocks."); + public static final ConfigOption<Integer> RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS = + ConfigOptions.key("rss.client.write.overlappingCompressionThreads") + .intType() + .defaultValue(-1) + .withDescription( + "The number of threads to overlapping compress shuffle blocks. If <= 0, this will be disabled."); + public static final ConfigOption<Boolean> RSS_READ_REORDER_MULTI_SERVERS_ENABLED = ConfigOptions.key("rss.client.read.reorderMultiServersEnable") .booleanType() diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java new file mode 100644 index 000000000..c6e939421 --- /dev/null +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.writer; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.spark.shuffle.RssSparkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.client.api.ShuffleWriteClient; +import org.apache.uniffle.client.impl.FailedBlockSendTracker; +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.ThreadUtils; + +/** + * The extension of {@link DataPusher} is used only when the overlapping compression is activated. + */ +public class OverlappingCompressionDataPusher extends DataPusher { + private static final Logger LOG = LoggerFactory.getLogger(OverlappingCompressionDataPusher.class); + + private final ExecutorService compressionThreadPool; + + public OverlappingCompressionDataPusher( + ShuffleWriteClient shuffleWriteClient, + Map<String, Set<Long>> taskToSuccessBlockIds, + Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker, + Set<String> failedTaskIds, + int threadPoolSize, + int threadKeepAliveTime, + RssConf rssConf) { + super( + shuffleWriteClient, + taskToSuccessBlockIds, + taskToFailedBlockSendTracker, + failedTaskIds, + threadPoolSize, + threadKeepAliveTime); + + int compressionThreads = + rssConf.getInteger(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS); + if (compressionThreads <= 0) { + throw new RssException( + "Invalid rss configuration of " + + RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS.key() + + ": " + + compressionThreads); + } + this.compressionThreadPool = + Executors.newFixedThreadPool( + compressionThreads, ThreadUtils.getThreadFactory("compression-thread")); + } + + @Override + public CompletableFuture<Long> send(AddBlockEvent event) { + // Step 1: process event data in a separate thread (e.g., trigger compression) + return CompletableFuture.supplyAsync( + () -> { + event.getShuffleDataInfoList().forEach(shuffleDataInfo -> shuffleDataInfo.getData()); + return event; + }, + compressionThreadPool) + .thenCompose( + processedEvent -> { + // Step 2: forward to the parent class's send method + return super.send(processedEvent); + }); + } +} diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index e9085cfa2..dee4c2ef7 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -66,6 +66,7 @@ import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo; import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo; import org.apache.spark.shuffle.writer.AddBlockEvent; import org.apache.spark.shuffle.writer.DataPusher; +import org.apache.spark.shuffle.writer.OverlappingCompressionDataPusher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -338,14 +339,32 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac LOG.info("Rss data pusher is starting..."); int poolSize = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE); int keepAliveTime = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE); - this.dataPusher = - new DataPusher( - shuffleWriteClient, - taskToSuccessBlockIds, - taskToFailedBlockSendTracker, - failedTaskIds, - poolSize, - keepAliveTime); + + boolean overlappingCompressionEnabled = + rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED); + int overlappingCompressionThreads = + rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS); + if (overlappingCompressionEnabled && overlappingCompressionThreads > 0) { + this.dataPusher = + new OverlappingCompressionDataPusher( + shuffleWriteClient, + taskToSuccessBlockIds, + taskToFailedBlockSendTracker, + failedTaskIds, + poolSize, + keepAliveTime, + rssConf); + } else { + this.dataPusher = + new DataPusher( + shuffleWriteClient, + taskToSuccessBlockIds, + taskToFailedBlockSendTracker, + failedTaskIds, + poolSize, + keepAliveTime); + } + this.partitionReassignMaxServerNum = rssConf.get(RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM); this.shuffleHandleInfoManager = new ShuffleHandleInfoManager(); diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java new file mode 100644 index 000000000..ced0fdfa7 --- /dev/null +++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.writer; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import com.google.common.collect.Maps; +import org.apache.spark.shuffle.RssSparkConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.client.impl.FailedBlockSendTracker; +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.JavaUtils; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class OverlappingCompressionDataPusherTest { + + @Test + public void testSend() { + DataPusherTest.FakedShuffleWriteClient shuffleWriteClient = + new DataPusherTest.FakedShuffleWriteClient(); + + Map<String, Set<Long>> taskToSuccessBlockIds = Maps.newConcurrentMap(); + Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker = JavaUtils.newConcurrentMap(); + Set<String> failedTaskIds = new HashSet<>(); + + RssConf rssConf = new RssConf(); + + // case1: Illegal thread number of compression + Assertions.assertThrows( + RssException.class, + () -> { + new OverlappingCompressionDataPusher( + shuffleWriteClient, + taskToSuccessBlockIds, + taskToFailedBlockSendTracker, + failedTaskIds, + 1, + 2, + rssConf); + }); + + // case2: Propagated into the underlying data pusher + rssConf.set(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS, 1); + DataPusher pusher = + new OverlappingCompressionDataPusher( + shuffleWriteClient, + taskToSuccessBlockIds, + taskToFailedBlockSendTracker, + failedTaskIds, + 1, + 2, + rssConf); + pusher.setRssAppId("testSend"); + + String taskId = "taskId1"; + List<ShuffleServerInfo> server1 = + Collections.singletonList(new ShuffleServerInfo("0", "localhost", 1234)); + ShuffleBlockInfo staleBlock1 = + new ShuffleBlockInfo( + 1, 1, 3, 1, 1, new byte[1], server1, 1, 100, 1, integer -> Collections.emptyList()); + + // case1: will fast fail due to the stale assignment + AddBlockEvent event = new AddBlockEvent(taskId, Arrays.asList(staleBlock1)); + CompletableFuture<Long> f1 = pusher.send(event); + assertEquals(f1.join(), 0); + } +}