This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f1323ab95 [#2536] feat(spark): Controllable concurrency for 
overlapping compression (#2541)
f1323ab95 is described below

commit f1323ab95034ef53e75e77283f110e096f7d6b6d
Author: Junfan Zhang <zus...@apache.org>
AuthorDate: Thu Jul 10 11:05:47 2025 +0800

    [#2536] feat(spark): Controllable concurrency for overlapping compression 
(#2541)
    
    ### What changes were proposed in this pull request?
    
    Introduce extra the option to control the concurrency of overlapping 
compression.
    
    ### Why are the changes needed?
    
    for #2536 .
    
    Decoupling compression concurrency from the transfer pool is essential for 
resource isolation and performance stability, especially under multi-tenant or 
CPU-constrained environments.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    `rss.client.write.overlappingCompressionThreads=-1`.
    
    About overlapping compression related config options, I will add it into 
doc in another PR
    
    ### How was this patch tested?
    
    Unit tests.
---
 .../org/apache/spark/shuffle/RssSparkConfig.java   |  7 ++
 .../writer/OverlappingCompressionDataPusher.java   | 89 ++++++++++++++++++++
 .../shuffle/manager/RssShuffleManagerBase.java     | 35 ++++++--
 .../OverlappingCompressionDataPusherTest.java      | 94 ++++++++++++++++++++++
 4 files changed, 217 insertions(+), 8 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 66b964ac9..4bffd227d 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -45,6 +45,13 @@ public class RssSparkConfig {
           .defaultValue(false)
           .withDescription("Whether to overlapping compress shuffle blocks.");
 
+  public static final ConfigOption<Integer> 
RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS =
+      ConfigOptions.key("rss.client.write.overlappingCompressionThreads")
+          .intType()
+          .defaultValue(-1)
+          .withDescription(
+              "The number of threads to overlapping compress shuffle blocks. 
If <= 0, this will be disabled.");
+
   public static final ConfigOption<Boolean> 
RSS_READ_REORDER_MULTI_SERVERS_ENABLED =
       ConfigOptions.key("rss.client.read.reorderMultiServersEnable")
           .booleanType()
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
new file mode 100644
index 000000000..c6e939421
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.writer;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * The extension of {@link DataPusher} is used only when the overlapping 
compression is activated.
+ */
+public class OverlappingCompressionDataPusher extends DataPusher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OverlappingCompressionDataPusher.class);
+
+  private final ExecutorService compressionThreadPool;
+
+  public OverlappingCompressionDataPusher(
+      ShuffleWriteClient shuffleWriteClient,
+      Map<String, Set<Long>> taskToSuccessBlockIds,
+      Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
+      Set<String> failedTaskIds,
+      int threadPoolSize,
+      int threadKeepAliveTime,
+      RssConf rssConf) {
+    super(
+        shuffleWriteClient,
+        taskToSuccessBlockIds,
+        taskToFailedBlockSendTracker,
+        failedTaskIds,
+        threadPoolSize,
+        threadKeepAliveTime);
+
+    int compressionThreads =
+        
rssConf.getInteger(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS);
+    if (compressionThreads <= 0) {
+      throw new RssException(
+          "Invalid rss configuration of "
+              + RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS.key()
+              + ": "
+              + compressionThreads);
+    }
+    this.compressionThreadPool =
+        Executors.newFixedThreadPool(
+            compressionThreads, 
ThreadUtils.getThreadFactory("compression-thread"));
+  }
+
+  @Override
+  public CompletableFuture<Long> send(AddBlockEvent event) {
+    // Step 1: process event data in a separate thread (e.g., trigger 
compression)
+    return CompletableFuture.supplyAsync(
+            () -> {
+              event.getShuffleDataInfoList().forEach(shuffleDataInfo -> 
shuffleDataInfo.getData());
+              return event;
+            },
+            compressionThreadPool)
+        .thenCompose(
+            processedEvent -> {
+              // Step 2: forward to the parent class's send method
+              return super.send(processedEvent);
+            });
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index e9085cfa2..dee4c2ef7 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -66,6 +66,7 @@ import 
org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
 import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo;
 import org.apache.spark.shuffle.writer.AddBlockEvent;
 import org.apache.spark.shuffle.writer.DataPusher;
+import org.apache.spark.shuffle.writer.OverlappingCompressionDataPusher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -338,14 +339,32 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     LOG.info("Rss data pusher is starting...");
     int poolSize = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
     int keepAliveTime = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
-    this.dataPusher =
-        new DataPusher(
-            shuffleWriteClient,
-            taskToSuccessBlockIds,
-            taskToFailedBlockSendTracker,
-            failedTaskIds,
-            poolSize,
-            keepAliveTime);
+
+    boolean overlappingCompressionEnabled =
+        rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
+    int overlappingCompressionThreads =
+        rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS);
+    if (overlappingCompressionEnabled && overlappingCompressionThreads > 0) {
+      this.dataPusher =
+          new OverlappingCompressionDataPusher(
+              shuffleWriteClient,
+              taskToSuccessBlockIds,
+              taskToFailedBlockSendTracker,
+              failedTaskIds,
+              poolSize,
+              keepAliveTime,
+              rssConf);
+    } else {
+      this.dataPusher =
+          new DataPusher(
+              shuffleWriteClient,
+              taskToSuccessBlockIds,
+              taskToFailedBlockSendTracker,
+              failedTaskIds,
+              poolSize,
+              keepAliveTime);
+    }
+
     this.partitionReassignMaxServerNum =
         rssConf.get(RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM);
     this.shuffleHandleInfoManager = new ShuffleHandleInfoManager();
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
new file mode 100644
index 000000000..ced0fdfa7
--- /dev/null
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.writer;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.common.collect.Maps;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OverlappingCompressionDataPusherTest {
+
+  @Test
+  public void testSend() {
+    DataPusherTest.FakedShuffleWriteClient shuffleWriteClient =
+        new DataPusherTest.FakedShuffleWriteClient();
+
+    Map<String, Set<Long>> taskToSuccessBlockIds = Maps.newConcurrentMap();
+    Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker = 
JavaUtils.newConcurrentMap();
+    Set<String> failedTaskIds = new HashSet<>();
+
+    RssConf rssConf = new RssConf();
+
+    // case1: Illegal thread number of compression
+    Assertions.assertThrows(
+        RssException.class,
+        () -> {
+          new OverlappingCompressionDataPusher(
+              shuffleWriteClient,
+              taskToSuccessBlockIds,
+              taskToFailedBlockSendTracker,
+              failedTaskIds,
+              1,
+              2,
+              rssConf);
+        });
+
+    // case2: Propagated into the underlying data pusher
+    rssConf.set(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS, 1);
+    DataPusher pusher =
+        new OverlappingCompressionDataPusher(
+            shuffleWriteClient,
+            taskToSuccessBlockIds,
+            taskToFailedBlockSendTracker,
+            failedTaskIds,
+            1,
+            2,
+            rssConf);
+    pusher.setRssAppId("testSend");
+
+    String taskId = "taskId1";
+    List<ShuffleServerInfo> server1 =
+        Collections.singletonList(new ShuffleServerInfo("0", "localhost", 
1234));
+    ShuffleBlockInfo staleBlock1 =
+        new ShuffleBlockInfo(
+            1, 1, 3, 1, 1, new byte[1], server1, 1, 100, 1, integer -> 
Collections.emptyList());
+
+    // case1: will fast fail due to the stale assignment
+    AddBlockEvent event = new AddBlockEvent(taskId, 
Arrays.asList(staleBlock1));
+    CompletableFuture<Long> f1 = pusher.send(event);
+    assertEquals(f1.join(), 0);
+  }
+}

Reply via email to