This is an automated email from the ASF dual-hosted git repository.
taiyangli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new da9c2fac6 [GLUTEN-6724][CH] Shuffle writer supports compression level
configuration for CompressionCodecFactory (#6725)
da9c2fac6 is described below
commit da9c2fac6fe00437cf5119397783df05673753a8
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Aug 12 11:14:12 2024 +0800
[GLUTEN-6724][CH] Shuffle writer supports compression level configuration
for CompressionCodecFactory (#6725)
* [GLUTEN-6724][CH] Shuffle writer supports compression level configuration
for CompressionCodecFactory
* [GLUTEN-6724][CH] Shuffle writer supports compression level configuration
for CompressionCodecFactory
---
.../org/apache/gluten/vectorized/BlockOutputStream.java | 9 ++++++++-
.../gluten/vectorized/CHShuffleSplitterJniWrapper.java | 6 ++++++
.../gluten/vectorized/CHColumnarBatchSerializer.scala | 13 ++++++++++---
.../org/apache/spark/shuffle/CHColumnarShuffleWriter.scala | 12 +++++++++---
.../org/apache/spark/sql/execution/utils/CHExecUtil.scala | 6 ++++--
.../sql/execution/benchmarks/CHStorageJoinBenchmark.scala | 3 ++-
cpp-ch/local-engine/Shuffle/PartitionWriter.cpp | 10 +++++-----
cpp-ch/local-engine/Shuffle/ShuffleCommon.h | 2 +-
cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp | 4 ++--
cpp-ch/local-engine/Shuffle/ShuffleWriter.h | 2 +-
cpp-ch/local-engine/local_engine_jni.cpp | 8 ++++++--
.../spark/shuffle/CHCelebornColumnarBatchSerializer.scala | 11 ++++++++---
.../spark/shuffle/CHCelebornColumnarShuffleWriter.scala | 5 +++--
.../spark/shuffle/CelebornColumnarShuffleWriter.scala | 5 ++++-
.../shuffle/writer/VeloxUniffleColumnarShuffleWriter.java | 8 ++++++--
15 files changed, 75 insertions(+), 29 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
index 40e2c2c56..e209010b2 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
@@ -38,6 +38,7 @@ public class BlockOutputStream implements Closeable {
SQLMetric dataSize,
boolean compressionEnable,
String defaultCompressionCodec,
+ int defaultCompressionLevel,
int bufferSize) {
OutputStream unwrapOutputStream =
CHShuffleWriteStreamFactory.unwrapSparkCompressionOutputStream(
@@ -50,7 +51,12 @@ public class BlockOutputStream implements Closeable {
}
this.instance =
nativeCreate(
- this.outputStream, buffer, defaultCompressionCodec,
compressionEnable, bufferSize);
+ this.outputStream,
+ buffer,
+ defaultCompressionCodec,
+ defaultCompressionLevel,
+ compressionEnable,
+ bufferSize);
this.dataSize = dataSize;
}
@@ -58,6 +64,7 @@ public class BlockOutputStream implements Closeable {
OutputStream outputStream,
byte[] buffer,
String defaultCompressionCodec,
+ int defaultCompressionLevel,
boolean compressionEnable,
int bufferSize);
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
index 864cc4eb7..7bc4f5dac 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
@@ -27,6 +27,7 @@ public class CHShuffleSplitterJniWrapper {
long mapId,
int bufferSize,
String codec,
+ int level,
String dataFile,
String localDirs,
int subDirsPerLocalDir,
@@ -43,6 +44,7 @@ public class CHShuffleSplitterJniWrapper {
mapId,
bufferSize,
codec,
+ level,
dataFile,
localDirs,
subDirsPerLocalDir,
@@ -58,6 +60,7 @@ public class CHShuffleSplitterJniWrapper {
long mapId,
int bufferSize,
String codec,
+ int level,
long spillThreshold,
String hashAlgorithm,
Object pusher,
@@ -71,6 +74,7 @@ public class CHShuffleSplitterJniWrapper {
mapId,
bufferSize,
codec,
+ level,
spillThreshold,
hashAlgorithm,
pusher,
@@ -86,6 +90,7 @@ public class CHShuffleSplitterJniWrapper {
long mapId,
int bufferSize,
String codec,
+ int level,
String dataFile,
String localDirs,
int subDirsPerLocalDir,
@@ -103,6 +108,7 @@ public class CHShuffleSplitterJniWrapper {
long mapId,
int bufferSize,
String codec,
+ int level,
long spillThreshold,
String hashAlgorithm,
Object pusher,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
index f640bfd2d..fa6f8addf 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
@@ -54,8 +54,14 @@ private class CHColumnarBatchSerializerInstance(
extends SerializerInstance
with Logging {
- private lazy val compressionCodec =
-
GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT)
+ private lazy val conf = SparkEnv.get.conf
+ private lazy val compressionCodec =
GlutenShuffleUtils.getCompressionCodec(conf)
+ private lazy val capitalizedCompressionCodec =
compressionCodec.toUpperCase(Locale.ROOT)
+ private lazy val compressionLevel =
+ GlutenShuffleUtils.getCompressionLevel(
+ conf,
+ compressionCodec,
+ GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
override def deserializeStream(in: InputStream): DeserializationStream = {
new DeserializationStream {
@@ -136,7 +142,8 @@ private class CHColumnarBatchSerializerInstance(
writeBuffer,
dataSize,
CHBackendSettings.useCustomizedShuffleCodec,
- compressionCodec,
+ capitalizedCompressionCodec,
+ compressionLevel,
CHBackendSettings.customizeBufferSize
)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
index d60873430..758c487a1 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
@@ -51,8 +51,13 @@ class CHColumnarShuffleWriter[K, V](
.mkString(",")
private val subDirsPerLocalDir =
blockManager.diskBlockManager.subDirsPerLocalDir
private val splitSize = GlutenConfig.getConf.maxBatchSize
- private val customizedCompressCodec =
- GlutenShuffleUtils.getCompressionCodec(conf).toUpperCase(Locale.ROOT)
+ private val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
+ private val capitalizedCompressionCodec =
compressionCodec.toUpperCase(Locale.ROOT)
+ private val compressionLevel =
+ GlutenShuffleUtils.getCompressionLevel(
+ conf,
+ compressionCodec,
+ GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
private val maxSortBufferSize =
GlutenConfig.getConf.chColumnarMaxSortBufferSize
private val forceMemorySortShuffle =
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
private val spillThreshold =
GlutenConfig.getConf.chColumnarShuffleSpillThreshold
@@ -98,7 +103,8 @@ class CHColumnarShuffleWriter[K, V](
dep.shuffleId,
mapId,
splitSize,
- customizedCompressCodec,
+ capitalizedCompressionCodec,
+ compressionLevel,
dataTmp.getAbsolutePath,
localDirs,
subDirsPerLocalDir,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
index 7526e6d3d..38c15fde7 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
@@ -59,14 +59,16 @@ object CHExecUtil extends Logging {
dataSize: SQLMetric,
iter: Iterator[ColumnarBatch],
compressionCodec: Option[String] = Some("lz4"),
+ compressionLevel: Option[Int] = None,
bufferSize: Int = 4 << 10): Iterator[(Int, Array[Byte])] = {
var count = 0
val bos = new ByteArrayOutputStream()
val buffer = new Array[Byte](bufferSize) // 4K
+ val level = compressionLevel.getOrElse(Int.MinValue)
val blockOutputStream =
compressionCodec
- .map(new BlockOutputStream(bos, buffer, dataSize, true, _, bufferSize))
- .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "",
bufferSize))
+ .map(new BlockOutputStream(bos, buffer, dataSize, true, _, level,
bufferSize))
+ .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "",
level, bufferSize))
while (iter.hasNext) {
val batch = iter.next()
count += batch.numRows
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
index f8cd4bf57..322c9521e 100644
---
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
@@ -191,7 +191,8 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark
with CHSqlBasedBenchmark
batch =>
val bos = new ByteArrayOutputStream()
val buffer = new Array[Byte](4 << 10) // 4K
- val dout = new BlockOutputStream(bos, buffer, dataSize, true, "lz4",
buffer.length)
+ val dout =
+ new BlockOutputStream(bos, buffer, dataSize, true, "lz4",
Int.MinValue, buffer.length)
dout.write(batch)
dout.flush()
dout.close()
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
index 58be56421..2f22d0e24 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
@@ -139,7 +139,7 @@ size_t LocalPartitionWriter::evictPartitions()
{
auto file = getNextSpillFile();
WriteBufferFromFile output(file,
shuffle_writer->options.io_buffer_size);
- auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
{});
+ auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
shuffle_writer->options.compress_level);
CompressedWriteBuffer compressed_output(output, codec,
shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, shuffle_writer->output_header);
@@ -200,7 +200,7 @@ String Spillable::getNextSpillFile()
std::vector<UInt64> Spillable::mergeSpills(CachedShuffleWriter *
shuffle_writer, WriteBuffer & data_file, ExtraData extra_data)
{
- auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
{});
+ auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
shuffle_writer->options.compress_level);
CompressedWriteBuffer compressed_output(data_file, codec,
shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, shuffle_writer->output_header);
@@ -352,7 +352,7 @@ size_t MemorySortLocalPartitionWriter::evictPartitions()
return;
auto file = getNextSpillFile();
WriteBufferFromFile output(file,
shuffle_writer->options.io_buffer_size);
- auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
{});
+ auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
shuffle_writer->options.compress_level);
CompressedWriteBuffer compressed_output(output, codec,
shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, output_header);
@@ -453,7 +453,7 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions()
return;
WriteBufferFromOwnString output;
- auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
{});
+ auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
shuffle_writer->options.compress_level);
CompressedWriteBuffer compressed_output(output, codec,
shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, shuffle_writer->output_header);
@@ -564,7 +564,7 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t
partition_id)
return;
WriteBufferFromOwnString output;
- auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
{});
+ auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
shuffle_writer->options.compress_level);
CompressedWriteBuffer compressed_output(output, codec,
shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, shuffle_writer->output_header);
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleCommon.h
b/cpp-ch/local-engine/Shuffle/ShuffleCommon.h
index d398362aa..052f6d2e3 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleCommon.h
+++ b/cpp-ch/local-engine/Shuffle/ShuffleCommon.h
@@ -44,7 +44,7 @@ struct SplitOptions
std::string hash_exprs;
std::string out_exprs;
std::string compress_method = "zstd";
- int compress_level;
+ std::optional<int> compress_level;
size_t spill_threshold = 300 * 1024 * 1024;
std::string hash_algorithm;
size_t max_sort_buffer_size = 1_GiB;
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
index dddf0b895..8aa624ff9 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
@@ -25,13 +25,13 @@ using namespace DB;
namespace local_engine
{
ShuffleWriter::ShuffleWriter(
- jobject output_stream, jbyteArray buffer, const std::string & codecStr,
bool enable_compression, size_t customize_buffer_size)
+ jobject output_stream, jbyteArray buffer, const std::string & codecStr,
jint level, bool enable_compression, size_t customize_buffer_size)
{
compression_enable = enable_compression;
write_buffer =
std::make_unique<WriteBufferFromJavaOutputStream>(output_stream, buffer,
customize_buffer_size);
if (compression_enable)
{
- auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), {});
+ auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr),
level < 0 ? std::nullopt : std::optional<int>(level));
compressed_out =
std::make_unique<CompressedWriteBuffer>(*write_buffer, codec);
}
}
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h
b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h
index 98f67d1cc..541e93e03 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h
+++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h
@@ -24,7 +24,7 @@ class ShuffleWriter
{
public:
ShuffleWriter(
- jobject output_stream, jbyteArray buffer, const std::string &
codecStr, bool enable_compression, size_t customize_buffer_size);
+ jobject output_stream, jbyteArray buffer, const std::string &
codecStr, jint level, bool enable_compression, size_t customize_buffer_size);
virtual ~ShuffleWriter();
void write(const DB::Block & block);
void flush();
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index db0dd8b62..828556b4a 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -544,6 +544,7 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
jlong map_id,
jint split_size,
jstring codec,
+ jint compress_level,
jstring data_file,
jstring local_dirs,
jint num_sub_dirs,
@@ -585,6 +586,7 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
+ .compress_level = compress_level < 0 ? std::nullopt :
std::optional<int>(compress_level),
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm),
.max_sort_buffer_size = static_cast<size_t>(max_sort_buffer_size),
@@ -606,6 +608,7 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
jlong map_id,
jint split_size,
jstring codec,
+ jint compress_level,
jlong spill_threshold,
jstring hash_algorithm,
jobject pusher,
@@ -637,6 +640,7 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
+ .compress_level = compress_level < 0 ? std::nullopt :
std::optional<int>(compress_level),
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm),
.force_memory_sort = static_cast<bool>(force_memory_sort)};
@@ -1160,11 +1164,11 @@ JNIEXPORT jint
Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeNextPa
}
JNIEXPORT jlong
Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate(
- JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring
codec, jboolean compressed, jint customize_buffer_size)
+ JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring
codec, jint level, jboolean compressed, jint customize_buffer_size)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleWriter * writer
- = new local_engine::ShuffleWriter(output_stream, buffer,
jstring2string(env, codec), compressed, customize_buffer_size);
+ = new local_engine::ShuffleWriter(output_stream, buffer,
jstring2string(env, codec), level, compressed, customize_buffer_size);
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
diff --git
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
index 3619855f7..5072ce6a1 100644
---
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
+++
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
@@ -58,8 +58,12 @@ private class CHCelebornColumnarBatchSerializerInstance(
extends SerializerInstance
with Logging {
- private lazy val compressionCodec =
-
GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT)
+ private lazy val conf = SparkEnv.get.conf
+ private lazy val compressionCodec =
GlutenShuffleUtils.getCompressionCodec(conf)
+ private lazy val capitalizedCompressionCodec =
compressionCodec.toUpperCase(Locale.ROOT)
+ private lazy val compressionLevel =
+ GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec,
+ GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
override def deserializeStream(in: InputStream): DeserializationStream = {
new DeserializationStream {
@@ -199,7 +203,8 @@ private class CHCelebornColumnarBatchSerializerInstance(
writeBuffer,
dataSize,
CHBackendSettings.useCustomizedShuffleCodec,
- compressionCodec,
+ capitalizedCompressionCodec,
+ compressionLevel,
CHBackendSettings.customizeBufferSize
)
diff --git
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
index c7d7957c1..9b99e533f 100644
---
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
+++
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
@@ -47,7 +47,7 @@ class CHCelebornColumnarShuffleWriter[K, V](
client,
writeMetrics) {
- private val customizedCompressCodec =
+ private val capitalizedCompressionCodec =
customizedCompressionCodec.toUpperCase(Locale.ROOT)
private val jniWrapper = new CHShuffleSplitterJniWrapper
@@ -105,7 +105,8 @@ class CHCelebornColumnarShuffleWriter[K, V](
shuffleId,
mapId,
nativeBufferSize,
- customizedCompressCodec,
+ capitalizedCompressionCodec,
+ compressionLevel,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
diff --git
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
index f71fadd4c..3f7c3586c 100644
---
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
+++
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
@@ -94,7 +94,10 @@ abstract class CelebornColumnarShuffleWriter[K, V](
}
protected val compressionLevel: Int =
- GlutenShuffleUtils.getCompressionLevel(conf, customizedCompressionCodec,
null)
+ GlutenShuffleUtils.getCompressionLevel(
+ conf,
+ customizedCompressionCodec,
+ GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
protected val bufferCompressThreshold: Int =
GlutenConfig.getConf.columnarShuffleCompressionThreshold
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index d505260b8..2219fc674 100644
---
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -66,7 +66,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends
RssShuffleWriter<K,
GlutenConfig.getConf().columnarShuffleCompressionThreshold();
private final double reallocThreshold =
GlutenConfig.getConf().columnarShuffleReallocThreshold();
private String compressionCodec;
- private final int compressionLevel;
+ private int compressionLevel;
private final int partitionId;
private final Runtime runtime =
Runtimes.contextInstance("UniffleShuffleWriter");
@@ -120,8 +120,12 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValue().get());
if ((boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS())) {
compressionCodec = GlutenShuffleUtils.getCompressionCodec(sparkConf);
+ compressionLevel =
+ GlutenShuffleUtils.getCompressionLevel(
+ sparkConf,
+ compressionCodec,
+
GlutenConfig.getConf().columnarShuffleCodecBackend().getOrElse(() -> null));
}
- compressionLevel = GlutenShuffleUtils.getCompressionLevel(sparkConf,
compressionCodec, null);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]