This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 69b1b45bf [#2714] feat(spark): Respect compression type when 
activating overlapping compression mechanism (#2715)
69b1b45bf is described below

commit 69b1b45bf4e6e03f4cfe3117416a80460f50537c
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jan 29 10:31:06 2026 +0800

    [#2714] feat(spark): Respect compression type when activating overlapping 
compression mechanism (#2715)
    
    ### What changes were proposed in this pull request?
    
    Respect compression type when activating overlapping compression mechanism
    
    ### Why are the changes needed?
    
    #2714 . Unify the overlapping compression checking logic into one place and 
also respect the compression type when activating this mechanism.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests are enough.
---
 .../shuffle/writer/OverlappingCompressionDataPusher.java  | 15 +++++++++++++++
 .../apache/spark/shuffle/writer/WriteBufferManager.java   | 10 ++++------
 .../uniffle/shuffle/manager/RssShuffleManagerBase.java    | 15 +++++++--------
 .../java/org/apache/uniffle/common/compression/Codec.java |  5 +++++
 4 files changed, 31 insertions(+), 14 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
index a8c593f2f..5328179b0 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
@@ -29,6 +29,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.common.compression.Codec;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ThreadUtils;
 
@@ -82,4 +84,17 @@ public class OverlappingCompressionDataPusher extends 
DataPusher {
               return super.send(processedEvent);
             });
   }
+
+  public static boolean isEnabled(RssConf rssConf) {
+    boolean overlappingCompressionEnabled =
+        rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
+    int overlappingCompressionThreadsPerVcore =
+        
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
+    if (Codec.hasCodec(rssConf)
+        && overlappingCompressionEnabled
+        && overlappingCompressionThreadsPerVcore > 0) {
+      return true;
+    }
+    return false;
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index dbb5e6943..0321311a6 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -58,8 +58,6 @@ import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 
-import static 
org.apache.spark.shuffle.RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED;
-
 public class WriteBufferManager extends MemoryConsumer {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(WriteBufferManager.class);
@@ -111,7 +109,8 @@ public class WriteBufferManager extends MemoryConsumer {
   private Function<Integer, List<ShuffleServerInfo>> 
partitionAssignmentRetrieveFunc;
   private int stageAttemptNumber;
   private ShuffleServerPushCostTracker shuffleServerPushCostTracker;
-  private boolean overlappingCompressionEnabled;
+  // whether to use deferred compression for shuffle blocks
+  private final boolean isDeferredCompression;
 
   public WriteBufferManager(
       int shuffleId,
@@ -187,8 +186,7 @@ public class WriteBufferManager extends MemoryConsumer {
     this.requireMemoryInterval = 
bufferManagerOptions.getRequireMemoryInterval();
     this.requireMemoryRetryMax = 
bufferManagerOptions.getRequireMemoryRetryMax();
     this.arrayOutputStream = new 
WrappedByteArrayOutputStream(serializerBufferSize);
-    this.overlappingCompressionEnabled =
-        rssConf.getBoolean(RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
+    this.isDeferredCompression = 
OverlappingCompressionDataPusher.isEnabled(rssConf);
     // in columnar shuffle, the serializer here is never used
     this.isRowBased = rssConf.getBoolean(RssSparkConfig.RSS_ROW_BASED);
     if (isRowBased) {
@@ -474,7 +472,7 @@ public class WriteBufferManager extends MemoryConsumer {
 
   // transform records to shuffleBlock
   protected ShuffleBlockInfo createShuffleBlock(int partitionId, WriterBuffer 
wb) {
-    if (overlappingCompressionEnabled) {
+    if (isDeferredCompression) {
       return createDeferredCompressedBlock(partitionId, wb);
     }
 
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 8b92eb632..e0cd85929 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -353,13 +353,10 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
       int poolSize = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
       int keepAliveTime = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
 
-      boolean overlappingCompressionEnabled =
-          
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
-      int overlappingCompressionThreadsPerVcore =
-          
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
-      if (overlappingCompressionEnabled && 
overlappingCompressionThreadsPerVcore > 0) {
-        int compressionThreads =
-            overlappingCompressionThreadsPerVcore * 
sparkConf.getInt(EXECUTOR_CORES, 1);
+      if (OverlappingCompressionDataPusher.isEnabled(rssConf)) {
+        int threads =
+            
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE)
+                * sparkConf.getInt(EXECUTOR_CORES, 1);
         this.dataPusher =
             new OverlappingCompressionDataPusher(
                 shuffleWriteClient,
@@ -368,7 +365,9 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
                 failedTaskIds,
                 poolSize,
                 keepAliveTime,
-                compressionThreads);
+                threads);
+        LOG.info(
+            "Using {} with {} compression threads", 
dataPusher.getClass().getSimpleName(), threads);
       } else {
         this.dataPusher =
             new DataPusher(
diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java 
b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
index 440e13785..afee86811 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
@@ -27,6 +27,11 @@ import static 
org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
 
 public abstract class Codec {
 
+  public static boolean hasCodec(RssConf rssConf) {
+    Type type = rssConf.get(COMPRESSION_TYPE);
+    return type != Type.NONE;
+  }
+
   public static Optional<Codec> create(RssConf rssConf) {
     Optional<Codec> codec = newInstance(rssConf);
     if (codec.isPresent() && 
rssConf.getBoolean(COMPRESSION_STATISTICS_ENABLED)) {

Reply via email to