This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 69b1b45bf [#2714] feat(spark): Respect compression type when
activating overlapping compression mechanism (#2715)
69b1b45bf is described below
commit 69b1b45bf4e6e03f4cfe3117416a80460f50537c
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jan 29 10:31:06 2026 +0800
[#2714] feat(spark): Respect compression type when activating overlapping
compression mechanism (#2715)
### What changes were proposed in this pull request?
Respect compression type when activating overlapping compression mechanism
### Why are the changes needed?
#2714 . Unify the overlapping compression checking logic into one place and
also respect the compression type when activating this mechanism.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests are enough.
---
.../shuffle/writer/OverlappingCompressionDataPusher.java | 15 +++++++++++++++
.../apache/spark/shuffle/writer/WriteBufferManager.java | 10 ++++------
.../uniffle/shuffle/manager/RssShuffleManagerBase.java | 15 +++++++--------
.../java/org/apache/uniffle/common/compression/Codec.java | 5 +++++
4 files changed, 31 insertions(+), 14 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
index a8c593f2f..5328179b0 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
@@ -29,6 +29,8 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.common.compression.Codec;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -82,4 +84,17 @@ public class OverlappingCompressionDataPusher extends
DataPusher {
return super.send(processedEvent);
});
}
+
+ public static boolean isEnabled(RssConf rssConf) {
+ boolean overlappingCompressionEnabled =
+ rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
+ int overlappingCompressionThreadsPerVcore =
+
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
+ if (Codec.hasCodec(rssConf)
+ && overlappingCompressionEnabled
+ && overlappingCompressionThreadsPerVcore > 0) {
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index dbb5e6943..0321311a6 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -58,8 +58,6 @@ import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ChecksumUtils;
-import static
org.apache.spark.shuffle.RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED;
-
public class WriteBufferManager extends MemoryConsumer {
private static final Logger LOG =
LoggerFactory.getLogger(WriteBufferManager.class);
@@ -111,7 +109,8 @@ public class WriteBufferManager extends MemoryConsumer {
private Function<Integer, List<ShuffleServerInfo>>
partitionAssignmentRetrieveFunc;
private int stageAttemptNumber;
private ShuffleServerPushCostTracker shuffleServerPushCostTracker;
- private boolean overlappingCompressionEnabled;
+ // whether to use deferred compression for shuffle blocks
+ private final boolean isDeferredCompression;
public WriteBufferManager(
int shuffleId,
@@ -187,8 +186,7 @@ public class WriteBufferManager extends MemoryConsumer {
this.requireMemoryInterval =
bufferManagerOptions.getRequireMemoryInterval();
this.requireMemoryRetryMax =
bufferManagerOptions.getRequireMemoryRetryMax();
this.arrayOutputStream = new
WrappedByteArrayOutputStream(serializerBufferSize);
- this.overlappingCompressionEnabled =
- rssConf.getBoolean(RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
+ this.isDeferredCompression =
OverlappingCompressionDataPusher.isEnabled(rssConf);
// in columnar shuffle, the serializer here is never used
this.isRowBased = rssConf.getBoolean(RssSparkConfig.RSS_ROW_BASED);
if (isRowBased) {
@@ -474,7 +472,7 @@ public class WriteBufferManager extends MemoryConsumer {
// transform records to shuffleBlock
protected ShuffleBlockInfo createShuffleBlock(int partitionId, WriterBuffer
wb) {
- if (overlappingCompressionEnabled) {
+ if (isDeferredCompression) {
return createDeferredCompressedBlock(partitionId, wb);
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 8b92eb632..e0cd85929 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -353,13 +353,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
int poolSize =
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
int keepAliveTime =
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
- boolean overlappingCompressionEnabled =
-
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
- int overlappingCompressionThreadsPerVcore =
-
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
- if (overlappingCompressionEnabled &&
overlappingCompressionThreadsPerVcore > 0) {
- int compressionThreads =
- overlappingCompressionThreadsPerVcore *
sparkConf.getInt(EXECUTOR_CORES, 1);
+ if (OverlappingCompressionDataPusher.isEnabled(rssConf)) {
+ int threads =
+
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE)
+ * sparkConf.getInt(EXECUTOR_CORES, 1);
this.dataPusher =
new OverlappingCompressionDataPusher(
shuffleWriteClient,
@@ -368,7 +365,9 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
failedTaskIds,
poolSize,
keepAliveTime,
- compressionThreads);
+ threads);
+ LOG.info(
+ "Using {} with {} compression threads",
dataPusher.getClass().getSimpleName(), threads);
} else {
this.dataPusher =
new DataPusher(
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
index 440e13785..afee86811 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
@@ -27,6 +27,11 @@ import static
org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
public abstract class Codec {
+ public static boolean hasCodec(RssConf rssConf) {
+ Type type = rssConf.get(COMPRESSION_TYPE);
+ return type != Type.NONE;
+ }
+
public static Optional<Codec> create(RssConf rssConf) {
Optional<Codec> codec = newInstance(rssConf);
if (codec.isPresent() &&
rssConf.getBoolean(COMPRESSION_STATISTICS_ENABLED)) {