This is an automated email from the ASF dual-hosted git repository.

taiyangli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new da9c2fac6 [GLUTEN-6724][CH] Shuffle writer supports compression level 
configuration for CompressionCodecFactory (#6725)
da9c2fac6 is described below

commit da9c2fac6fe00437cf5119397783df05673753a8
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Aug 12 11:14:12 2024 +0800

    [GLUTEN-6724][CH] Shuffle writer supports compression level configuration 
for CompressionCodecFactory (#6725)
    
    * [GLUTEN-6724][CH] Shuffle writer supports compression level configuration 
for CompressionCodecFactory
    
    * [GLUTEN-6724][CH] Shuffle writer supports compression level configuration 
for CompressionCodecFactory
---
 .../org/apache/gluten/vectorized/BlockOutputStream.java     |  9 ++++++++-
 .../gluten/vectorized/CHShuffleSplitterJniWrapper.java      |  6 ++++++
 .../gluten/vectorized/CHColumnarBatchSerializer.scala       | 13 ++++++++++---
 .../org/apache/spark/shuffle/CHColumnarShuffleWriter.scala  | 12 +++++++++---
 .../org/apache/spark/sql/execution/utils/CHExecUtil.scala   |  6 ++++--
 .../sql/execution/benchmarks/CHStorageJoinBenchmark.scala   |  3 ++-
 cpp-ch/local-engine/Shuffle/PartitionWriter.cpp             | 10 +++++-----
 cpp-ch/local-engine/Shuffle/ShuffleCommon.h                 |  2 +-
 cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp               |  4 ++--
 cpp-ch/local-engine/Shuffle/ShuffleWriter.h                 |  2 +-
 cpp-ch/local-engine/local_engine_jni.cpp                    |  8 ++++++--
 .../spark/shuffle/CHCelebornColumnarBatchSerializer.scala   | 11 ++++++++---
 .../spark/shuffle/CHCelebornColumnarShuffleWriter.scala     |  5 +++--
 .../spark/shuffle/CelebornColumnarShuffleWriter.scala       |  5 ++++-
 .../shuffle/writer/VeloxUniffleColumnarShuffleWriter.java   |  8 ++++++--
 15 files changed, 75 insertions(+), 29 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
index 40e2c2c56..e209010b2 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
@@ -38,6 +38,7 @@ public class BlockOutputStream implements Closeable {
       SQLMetric dataSize,
       boolean compressionEnable,
       String defaultCompressionCodec,
+      int defaultCompressionLevel,
       int bufferSize) {
     OutputStream unwrapOutputStream =
         CHShuffleWriteStreamFactory.unwrapSparkCompressionOutputStream(
@@ -50,7 +51,12 @@ public class BlockOutputStream implements Closeable {
     }
     this.instance =
         nativeCreate(
-            this.outputStream, buffer, defaultCompressionCodec, 
compressionEnable, bufferSize);
+            this.outputStream,
+            buffer,
+            defaultCompressionCodec,
+            defaultCompressionLevel,
+            compressionEnable,
+            bufferSize);
     this.dataSize = dataSize;
   }
 
@@ -58,6 +64,7 @@ public class BlockOutputStream implements Closeable {
       OutputStream outputStream,
       byte[] buffer,
       String defaultCompressionCodec,
+      int defaultCompressionLevel,
       boolean compressionEnable,
       int bufferSize);
 
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
index 864cc4eb7..7bc4f5dac 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
@@ -27,6 +27,7 @@ public class CHShuffleSplitterJniWrapper {
       long mapId,
       int bufferSize,
       String codec,
+      int level,
       String dataFile,
       String localDirs,
       int subDirsPerLocalDir,
@@ -43,6 +44,7 @@ public class CHShuffleSplitterJniWrapper {
         mapId,
         bufferSize,
         codec,
+        level,
         dataFile,
         localDirs,
         subDirsPerLocalDir,
@@ -58,6 +60,7 @@ public class CHShuffleSplitterJniWrapper {
       long mapId,
       int bufferSize,
       String codec,
+      int level,
       long spillThreshold,
       String hashAlgorithm,
       Object pusher,
@@ -71,6 +74,7 @@ public class CHShuffleSplitterJniWrapper {
         mapId,
         bufferSize,
         codec,
+        level,
         spillThreshold,
         hashAlgorithm,
         pusher,
@@ -86,6 +90,7 @@ public class CHShuffleSplitterJniWrapper {
       long mapId,
       int bufferSize,
       String codec,
+      int level,
       String dataFile,
       String localDirs,
       int subDirsPerLocalDir,
@@ -103,6 +108,7 @@ public class CHShuffleSplitterJniWrapper {
       long mapId,
       int bufferSize,
       String codec,
+      int level,
       long spillThreshold,
       String hashAlgorithm,
       Object pusher,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
index f640bfd2d..fa6f8addf 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
@@ -54,8 +54,14 @@ private class CHColumnarBatchSerializerInstance(
   extends SerializerInstance
   with Logging {
 
-  private lazy val compressionCodec =
-    
GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT)
+  private lazy val conf = SparkEnv.get.conf
+  private lazy val compressionCodec = 
GlutenShuffleUtils.getCompressionCodec(conf)
+  private lazy val capitalizedCompressionCodec = 
compressionCodec.toUpperCase(Locale.ROOT)
+  private lazy val compressionLevel =
+    GlutenShuffleUtils.getCompressionLevel(
+      conf,
+      compressionCodec,
+      GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
 
   override def deserializeStream(in: InputStream): DeserializationStream = {
     new DeserializationStream {
@@ -136,7 +142,8 @@ private class CHColumnarBatchSerializerInstance(
         writeBuffer,
         dataSize,
         CHBackendSettings.useCustomizedShuffleCodec,
-        compressionCodec,
+        capitalizedCompressionCodec,
+        compressionLevel,
         CHBackendSettings.customizeBufferSize
       )
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
index d60873430..758c487a1 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
@@ -51,8 +51,13 @@ class CHColumnarShuffleWriter[K, V](
     .mkString(",")
   private val subDirsPerLocalDir = 
blockManager.diskBlockManager.subDirsPerLocalDir
   private val splitSize = GlutenConfig.getConf.maxBatchSize
-  private val customizedCompressCodec =
-    GlutenShuffleUtils.getCompressionCodec(conf).toUpperCase(Locale.ROOT)
+  private val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
+  private val capitalizedCompressionCodec = 
compressionCodec.toUpperCase(Locale.ROOT)
+  private val compressionLevel =
+    GlutenShuffleUtils.getCompressionLevel(
+      conf,
+      compressionCodec,
+      GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
   private val maxSortBufferSize = 
GlutenConfig.getConf.chColumnarMaxSortBufferSize
   private val forceMemorySortShuffle = 
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
   private val spillThreshold = 
GlutenConfig.getConf.chColumnarShuffleSpillThreshold
@@ -98,7 +103,8 @@ class CHColumnarShuffleWriter[K, V](
         dep.shuffleId,
         mapId,
         splitSize,
-        customizedCompressCodec,
+        capitalizedCompressionCodec,
+        compressionLevel,
         dataTmp.getAbsolutePath,
         localDirs,
         subDirsPerLocalDir,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
index 7526e6d3d..38c15fde7 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
@@ -59,14 +59,16 @@ object CHExecUtil extends Logging {
       dataSize: SQLMetric,
       iter: Iterator[ColumnarBatch],
       compressionCodec: Option[String] = Some("lz4"),
+      compressionLevel: Option[Int] = None,
       bufferSize: Int = 4 << 10): Iterator[(Int, Array[Byte])] = {
     var count = 0
     val bos = new ByteArrayOutputStream()
     val buffer = new Array[Byte](bufferSize) // 4K
+    val level = compressionLevel.getOrElse(Int.MinValue)
     val blockOutputStream =
       compressionCodec
-        .map(new BlockOutputStream(bos, buffer, dataSize, true, _, bufferSize))
-        .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", 
bufferSize))
+        .map(new BlockOutputStream(bos, buffer, dataSize, true, _, level, 
bufferSize))
+        .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", 
level, bufferSize))
     while (iter.hasNext) {
       val batch = iter.next()
       count += batch.numRows
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
index f8cd4bf57..322c9521e 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
@@ -191,7 +191,8 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark 
with CHSqlBasedBenchmark
         batch =>
           val bos = new ByteArrayOutputStream()
           val buffer = new Array[Byte](4 << 10) // 4K
-          val dout = new BlockOutputStream(bos, buffer, dataSize, true, "lz4", 
buffer.length)
+          val dout =
+            new BlockOutputStream(bos, buffer, dataSize, true, "lz4", 
Int.MinValue, buffer.length)
           dout.write(batch)
           dout.flush()
           dout.close()
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp 
b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
index 58be56421..2f22d0e24 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
@@ -139,7 +139,7 @@ size_t LocalPartitionWriter::evictPartitions()
     {
         auto file = getNextSpillFile();
         WriteBufferFromFile output(file, 
shuffle_writer->options.io_buffer_size);
-        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 {});
+        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 shuffle_writer->options.compress_level);
         CompressedWriteBuffer compressed_output(output, codec, 
shuffle_writer->options.io_buffer_size);
         NativeWriter writer(compressed_output, shuffle_writer->output_header);
 
@@ -200,7 +200,7 @@ String Spillable::getNextSpillFile()
 
 std::vector<UInt64> Spillable::mergeSpills(CachedShuffleWriter * 
shuffle_writer, WriteBuffer & data_file, ExtraData extra_data)
 {
-    auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 {});
+    auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 shuffle_writer->options.compress_level);
 
     CompressedWriteBuffer compressed_output(data_file, codec, 
shuffle_writer->options.io_buffer_size);
     NativeWriter writer(compressed_output, shuffle_writer->output_header);
@@ -352,7 +352,7 @@ size_t MemorySortLocalPartitionWriter::evictPartitions()
             return;
         auto file = getNextSpillFile();
         WriteBufferFromFile output(file, 
shuffle_writer->options.io_buffer_size);
-        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 {});
+        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 shuffle_writer->options.compress_level);
         CompressedWriteBuffer compressed_output(output, codec, 
shuffle_writer->options.io_buffer_size);
         NativeWriter writer(compressed_output, output_header);
 
@@ -453,7 +453,7 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions()
             return;
 
         WriteBufferFromOwnString output;
-        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 {});
+        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 shuffle_writer->options.compress_level);
         CompressedWriteBuffer compressed_output(output, codec, 
shuffle_writer->options.io_buffer_size);
         NativeWriter writer(compressed_output, shuffle_writer->output_header);
 
@@ -564,7 +564,7 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t 
partition_id)
             return;
 
         WriteBufferFromOwnString output;
-        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 {});
+        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 shuffle_writer->options.compress_level);
         CompressedWriteBuffer compressed_output(output, codec, 
shuffle_writer->options.io_buffer_size);
         NativeWriter writer(compressed_output, shuffle_writer->output_header);
 
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleCommon.h 
b/cpp-ch/local-engine/Shuffle/ShuffleCommon.h
index d398362aa..052f6d2e3 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleCommon.h
+++ b/cpp-ch/local-engine/Shuffle/ShuffleCommon.h
@@ -44,7 +44,7 @@ struct SplitOptions
     std::string hash_exprs;
     std::string out_exprs;
     std::string compress_method = "zstd";
-    int compress_level;
+    std::optional<int> compress_level;
     size_t spill_threshold = 300 * 1024 * 1024;
     std::string hash_algorithm;
     size_t max_sort_buffer_size = 1_GiB;
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp 
b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
index dddf0b895..8aa624ff9 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
@@ -25,13 +25,13 @@ using namespace DB;
 namespace local_engine
 {
 ShuffleWriter::ShuffleWriter(
-    jobject output_stream, jbyteArray buffer, const std::string & codecStr, 
bool enable_compression, size_t customize_buffer_size)
+    jobject output_stream, jbyteArray buffer, const std::string & codecStr, 
jint level, bool enable_compression, size_t customize_buffer_size)
 {
     compression_enable = enable_compression;
     write_buffer = 
std::make_unique<WriteBufferFromJavaOutputStream>(output_stream, buffer, 
customize_buffer_size);
     if (compression_enable)
     {
-        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), {});
+        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), 
level < 0 ? std::nullopt : std::optional<int>(level));
         compressed_out = 
std::make_unique<CompressedWriteBuffer>(*write_buffer, codec);
     }
 }
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h 
b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h
index 98f67d1cc..541e93e03 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h
+++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h
@@ -24,7 +24,7 @@ class ShuffleWriter
 {
 public:
     ShuffleWriter(
-        jobject output_stream, jbyteArray buffer, const std::string & 
codecStr, bool enable_compression, size_t customize_buffer_size);
+        jobject output_stream, jbyteArray buffer, const std::string & 
codecStr, jint level, bool enable_compression, size_t customize_buffer_size);
     virtual ~ShuffleWriter();
     void write(const DB::Block & block);
     void flush();
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index db0dd8b62..828556b4a 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -544,6 +544,7 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
     jlong map_id,
     jint split_size,
     jstring codec,
+    jint compress_level,
     jstring data_file,
     jstring local_dirs,
     jint num_sub_dirs,
@@ -585,6 +586,7 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
         .hash_exprs = hash_exprs,
         .out_exprs = out_exprs,
         .compress_method = jstring2string(env, codec),
+        .compress_level = compress_level < 0 ? std::nullopt : 
std::optional<int>(compress_level),
         .spill_threshold = static_cast<size_t>(spill_threshold),
         .hash_algorithm = jstring2string(env, hash_algorithm),
         .max_sort_buffer_size = static_cast<size_t>(max_sort_buffer_size),
@@ -606,6 +608,7 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
     jlong map_id,
     jint split_size,
     jstring codec,
+    jint compress_level,
     jlong spill_threshold,
     jstring hash_algorithm,
     jobject pusher,
@@ -637,6 +640,7 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
         .hash_exprs = hash_exprs,
         .out_exprs = out_exprs,
         .compress_method = jstring2string(env, codec),
+        .compress_level = compress_level < 0 ? std::nullopt : 
std::optional<int>(compress_level),
         .spill_threshold = static_cast<size_t>(spill_threshold),
         .hash_algorithm = jstring2string(env, hash_algorithm),
         .force_memory_sort = static_cast<bool>(force_memory_sort)};
@@ -1160,11 +1164,11 @@ JNIEXPORT jint 
Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeNextPa
 }
 
 JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate(
-    JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring 
codec, jboolean compressed, jint customize_buffer_size)
+    JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring 
codec, jint level, jboolean compressed, jint customize_buffer_size)
 {
     LOCAL_ENGINE_JNI_METHOD_START
     local_engine::ShuffleWriter * writer
-        = new local_engine::ShuffleWriter(output_stream, buffer, 
jstring2string(env, codec), compressed, customize_buffer_size);
+        = new local_engine::ShuffleWriter(output_stream, buffer, 
jstring2string(env, codec), level, compressed, customize_buffer_size);
     return reinterpret_cast<jlong>(writer);
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
diff --git 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
index 3619855f7..5072ce6a1 100644
--- 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
+++ 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
@@ -58,8 +58,12 @@ private class CHCelebornColumnarBatchSerializerInstance(
   extends SerializerInstance
   with Logging {
 
-  private lazy val compressionCodec =
-    
GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT)
+  private lazy val conf = SparkEnv.get.conf
+  private lazy val compressionCodec = 
GlutenShuffleUtils.getCompressionCodec(conf)
+  private lazy val capitalizedCompressionCodec = 
compressionCodec.toUpperCase(Locale.ROOT)
+  private lazy val compressionLevel =
+    GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec,
+      GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
 
   override def deserializeStream(in: InputStream): DeserializationStream = {
     new DeserializationStream {
@@ -199,7 +203,8 @@ private class CHCelebornColumnarBatchSerializerInstance(
         writeBuffer,
         dataSize,
         CHBackendSettings.useCustomizedShuffleCodec,
-        compressionCodec,
+        capitalizedCompressionCodec,
+        compressionLevel,
         CHBackendSettings.customizeBufferSize
       )
 
diff --git 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
index c7d7957c1..9b99e533f 100644
--- 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
@@ -47,7 +47,7 @@ class CHCelebornColumnarShuffleWriter[K, V](
     client,
     writeMetrics) {
 
-  private val customizedCompressCodec =
+  private val capitalizedCompressionCodec =
     customizedCompressionCodec.toUpperCase(Locale.ROOT)
 
   private val jniWrapper = new CHShuffleSplitterJniWrapper
@@ -105,7 +105,8 @@ class CHCelebornColumnarShuffleWriter[K, V](
       shuffleId,
       mapId,
       nativeBufferSize,
-      customizedCompressCodec,
+      capitalizedCompressionCodec,
+      compressionLevel,
       GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
       CHBackendSettings.shuffleHashAlgorithm,
       celebornPartitionPusher,
diff --git 
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
 
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
index f71fadd4c..3f7c3586c 100644
--- 
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
@@ -94,7 +94,10 @@ abstract class CelebornColumnarShuffleWriter[K, V](
     }
 
   protected val compressionLevel: Int =
-    GlutenShuffleUtils.getCompressionLevel(conf, customizedCompressionCodec, 
null)
+    GlutenShuffleUtils.getCompressionLevel(
+      conf,
+      customizedCompressionCodec,
+      GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
 
   protected val bufferCompressThreshold: Int =
     GlutenConfig.getConf.columnarShuffleCompressionThreshold
diff --git 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index d505260b8..2219fc674 100644
--- 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++ 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -66,7 +66,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends 
RssShuffleWriter<K,
       GlutenConfig.getConf().columnarShuffleCompressionThreshold();
   private final double reallocThreshold = 
GlutenConfig.getConf().columnarShuffleReallocThreshold();
   private String compressionCodec;
-  private final int compressionLevel;
+  private int compressionLevel;
   private final int partitionId;
 
   private final Runtime runtime = 
Runtimes.contextInstance("UniffleShuffleWriter");
@@ -120,8 +120,12 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
                 RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValue().get());
     if ((boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS())) {
       compressionCodec = GlutenShuffleUtils.getCompressionCodec(sparkConf);
+      compressionLevel =
+          GlutenShuffleUtils.getCompressionLevel(
+              sparkConf,
+              compressionCodec,
+              
GlutenConfig.getConf().columnarShuffleCodecBackend().getOrElse(() -> null));
     }
-    compressionLevel = GlutenShuffleUtils.getCompressionLevel(sparkConf, 
compressionCodec, null);
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to