This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d077f936f1 [GLUTEN-9163][VL] Separate compression buffer and disk
write buffer configuration (#9356)
d077f936f1 is described below
commit d077f936f1494bef30b18e295bb55b9c01fc75da
Author: Rong Ma <[email protected]>
AuthorDate: Wed Apr 23 21:46:19 2025 +0100
[GLUTEN-9163][VL] Separate compression buffer and disk write buffer
configuration (#9356)
---
.../CHCelebornColumnarBatchSerializer.scala | 7 +-
.../shuffle/CHCelebornColumnarShuffleWriter.scala | 3 +-
.../vectorized/CHColumnarBatchSerializer.scala | 5 +-
.../spark/shuffle/CHColumnarShuffleWriter.scala | 5 +-
.../VeloxCelebornColumnarBatchSerializer.scala | 6 +-
.../VeloxCelebornColumnarShuffleWriter.scala | 9 +-
.../writer/VeloxUniffleColumnarShuffleWriter.java | 12 +-
.../vectorized/ColumnarBatchSerializer.scala | 6 +-
.../spark/shuffle/ColumnarShuffleWriter.scala | 33 ++--
cpp/core/config/GlutenConfig.h | 2 +
cpp/core/jni/JniWrapper.cc | 8 +-
cpp/core/shuffle/LocalPartitionWriter.cc | 16 +-
cpp/core/shuffle/Options.h | 9 +-
cpp/core/shuffle/Utils.h | 20 +--
cpp/core/shuffle/rss/RssPartitionWriter.cc | 4 +-
cpp/velox/compute/VeloxRuntime.cc | 3 +-
cpp/velox/shuffle/VeloxShuffleReader.cc | 41 +++--
cpp/velox/shuffle/VeloxShuffleReader.h | 14 +-
cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 1 +
docs/Configuration.md | 171 +++++++++++----------
.../gluten/vectorized/ShuffleReaderJniWrapper.java | 3 +-
.../gluten/vectorized/ShuffleWriterJniWrapper.java | 5 +
.../shuffle/CelebornColumnarShuffleWriter.scala | 23 +--
.../apache/spark/shuffle/GlutenShuffleUtils.scala | 14 +-
.../org/apache/gluten/config/GlutenConfig.scala | 11 ++
25 files changed, 248 insertions(+), 183 deletions(-)
diff --git
a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
index e634a3d7be..fd8f16f63e 100644
---
a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
+++
b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.vectorized.BlockOutputStream
import org.apache.gluten.vectorized.CHStreamReader
@@ -27,7 +28,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.celeborn.client.read.CelebornInputStream
-import org.apache.gluten.config.GlutenConfig
import java.io._
import java.nio.ByteBuffer
@@ -63,10 +63,7 @@ private class CHCelebornColumnarBatchSerializerInstance(
private lazy val compressionCodec =
GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec =
compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
- GlutenShuffleUtils.getCompressionLevel(
- conf,
- compressionCodec,
- GlutenConfig.get.columnarShuffleCodecBackend.orNull)
+ GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec)
override def deserializeStream(in: InputStream): DeserializationStream = {
new DeserializationStream {
diff --git
a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
index b293fd32f7..7b9567fc62 100644
---
a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
+++
b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
@@ -49,8 +49,7 @@ class CHCelebornColumnarShuffleWriter[K, V](
writeMetrics) {
private val capitalizedCompressionCodec =
- if (customizedCompressionCodec != null)
customizedCompressionCodec.toUpperCase(Locale.ROOT)
- else "NONE"
+ compressionCodec.map(_.toUpperCase(Locale.ROOT)).getOrElse("NONE")
private val jniWrapper = new CHShuffleSplitterJniWrapper
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
index 467433b46c..ab701f52ba 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
@@ -64,10 +64,7 @@ private class CHColumnarBatchSerializerInstance(
private lazy val compressionCodec =
GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec =
compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
- GlutenShuffleUtils.getCompressionLevel(
- conf,
- compressionCodec,
- GlutenConfig.get.columnarShuffleCodecBackend.orNull)
+ GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec)
private val useColumnarShuffle: Boolean =
GlutenConfig.get.isUseColumnarShuffleManager
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
index efe330c77c..b8f60d9601 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
@@ -55,10 +55,7 @@ class CHColumnarShuffleWriter[K, V](
private val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private val capitalizedCompressionCodec =
compressionCodec.toUpperCase(Locale.ROOT)
private val compressionLevel =
- GlutenShuffleUtils.getCompressionLevel(
- conf,
- compressionCodec,
- GlutenConfig.get.columnarShuffleCodecBackend.orNull)
+ GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec)
private val maxSortBufferSize = CHConfig.get.chColumnarMaxSortBufferSize
private val forceMemorySortShuffle =
CHConfig.get.chColumnarForceMemorySortShuffle
private val spillThreshold = CHConfig.get.chColumnarShuffleSpillThreshold
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index fbf1c67303..2db65383df 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -90,14 +90,16 @@ private class CelebornColumnarBatchSerializerInstance(
.replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER)
val jniWrapper = ShuffleReaderJniWrapper.create(runtime)
val batchSize = GlutenConfig.get.maxBatchSize
- val bufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
+ val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
+ val deserializerBufferSize =
GlutenConfig.get.columnarSortShuffleDeserializerBufferSize
val handle = jniWrapper
.make(
cSchema.memoryAddress(),
compressionCodec,
compressionCodecBackend,
batchSize,
- bufferSize,
+ readerBufferSize,
+ deserializerBufferSize,
shuffleWriterType
)
// Close shuffle reader instance as lately as the end of task processing,
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index 115982f48c..b7af7c02a8 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -18,12 +18,14 @@ package org.apache.spark.shuffle
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.ReservedKeys
import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers}
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.vectorized._
import org.apache.spark._
-import org.apache.spark.internal.config.{SHUFFLE_SORT_INIT_BUFFER_SIZE,
SHUFFLE_SORT_USE_RADIXSORT}
+import org.apache.spark.internal.config.{SHUFFLE_DISK_WRITE_BUFFER_SIZE,
SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT}
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
@@ -32,8 +34,6 @@ import org.apache.spark.util.SparkResourceUtil
import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
-import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.config.ReservedKeys
import java.io.IOException
@@ -124,9 +124,10 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
dep.nativePartitioning.getShortName,
dep.nativePartitioning.getNumPartitions,
nativeBufferSize,
- customizedCompressionCodec,
+ compressionCodec.orNull,
compressionLevel,
compressionBufferSize,
+ conf.get(SHUFFLE_DISK_WRITE_BUFFER_SIZE).toInt,
bufferCompressThreshold,
GlutenConfig.get.columnarShuffleCompressionMode,
conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
diff --git
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index 3fd8261e96..f970f6e2e9 100644
---
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -70,6 +70,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends
RssShuffleWriter<K,
private String compressionCodec;
private int compressionLevel;
private int compressionBufferSize;
+ private int diskWriteBufferSize;
private final int partitionId;
private final Runtime runtime =
@@ -124,13 +125,11 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValue().get());
if ((boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS())) {
compressionCodec = GlutenShuffleUtils.getCompressionCodec(sparkConf);
- compressionLevel =
- GlutenShuffleUtils.getCompressionLevel(
- sparkConf,
- compressionCodec,
- GlutenConfig.get().columnarShuffleCodecBackend().getOrElse(() ->
null));
+ compressionLevel = GlutenShuffleUtils.getCompressionLevel(sparkConf,
compressionCodec);
compressionBufferSize =
- GlutenShuffleUtils.getSortEvictBufferSize(sparkConf,
compressionCodec);
+ GlutenShuffleUtils.getCompressionBufferSize(sparkConf,
compressionCodec);
+ diskWriteBufferSize =
+ (int) (long)
sparkConf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
}
}
@@ -158,6 +157,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
compressionCodec,
compressionLevel,
compressionBufferSize,
+ diskWriteBufferSize,
compressThreshold,
GlutenConfig.get().columnarShuffleCompressionMode(),
(int) (long)
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index a51a4fca82..82cb64d959 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -100,7 +100,8 @@ private class ColumnarBatchSerializerInstance(
val compressionCodecBackend =
GlutenConfig.get.columnarShuffleCodecBackend.orNull
val batchSize = GlutenConfig.get.maxBatchSize
- val bufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
+ val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
+ val deserializerBufferSize =
GlutenConfig.get.columnarSortShuffleDeserializerBufferSize
val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName,
"ShuffleReader")
val jniWrapper = ShuffleReaderJniWrapper.create(runtime)
val shuffleReaderHandle = jniWrapper.make(
@@ -108,7 +109,8 @@ private class ColumnarBatchSerializerInstance(
compressionCodec,
compressionCodecBackend,
batchSize,
- bufferSize,
+ readerBufferSize,
+ deserializerBufferSize,
shuffleWriterType)
// Close shuffle reader instance as lately as the end of task processing,
// since the native reader could hold a reference to memory pool that
diff --git
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 4d49b93447..3f24205a9d 100644
---
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -26,7 +26,7 @@ import org.apache.gluten.vectorized._
import org.apache.spark._
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{SHUFFLE_COMPRESS,
SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT}
+import org.apache.spark.internal.config.{SHUFFLE_COMPRESS,
SHUFFLE_DISK_WRITE_BUFFER_SIZE, SHUFFLE_SORT_INIT_BUFFER_SIZE,
SHUFFLE_SORT_USE_RADIXSORT}
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -80,21 +80,27 @@ class ColumnarShuffleWriter[K, V](
private val nativeMergeThreshold =
GlutenConfig.get.columnarShuffleMergeThreshold
- private val compressionCodec =
+ private val compressionCodec: Option[String] =
if (conf.getBoolean(SHUFFLE_COMPRESS.key,
SHUFFLE_COMPRESS.defaultValue.get)) {
- GlutenShuffleUtils.getCompressionCodec(conf)
+ Some(GlutenShuffleUtils.getCompressionCodec(conf))
} else {
- null // uncompressed
+ None
}
- private val compressionCodecBackend =
- GlutenConfig.get.columnarShuffleCodecBackend.orNull
+ private val compressionCodecBackend: Option[String] =
+ GlutenConfig.get.columnarShuffleCodecBackend
- private val compressionLevel =
- GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec,
compressionCodecBackend)
+ private val compressionLevel = {
+ compressionCodec
+ .map(codec => GlutenShuffleUtils.getCompressionLevel(conf, codec))
+ .getOrElse(GlutenShuffleUtils.DEFAULT_COMPRESSION_LEVEL)
+ }
- private val sortEvictBufferSize =
- GlutenShuffleUtils.getSortEvictBufferSize(conf, compressionCodec)
+ private val compressionBufferSize = {
+ compressionCodec
+ .map(codec => GlutenShuffleUtils.getCompressionBufferSize(conf, codec))
+ .getOrElse(0)
+ }
private val bufferCompressThreshold =
GlutenConfig.get.columnarShuffleCompressionThreshold
@@ -145,10 +151,11 @@ class ColumnarShuffleWriter[K, V](
nativeBufferSize,
nativeMergeBufferSize,
nativeMergeThreshold,
- compressionCodec,
- compressionCodecBackend,
+ compressionCodec.orNull,
+ compressionCodecBackend.orNull,
compressionLevel,
- sortEvictBufferSize,
+ compressionBufferSize,
+ conf.get(SHUFFLE_DISK_WRITE_BUFFER_SIZE).toInt,
bufferCompressThreshold,
GlutenConfig.get.columnarShuffleCompressionMode,
conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 456bff7361..6b52b9e1a1 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -65,6 +65,8 @@ const std::string kUGITokens = "spark.gluten.ugi.tokens";
const std::string kShuffleCompressionCodec =
"spark.gluten.sql.columnar.shuffle.codec";
const std::string kShuffleCompressionCodecBackend =
"spark.gluten.sql.columnar.shuffle.codecBackend";
const std::string kShuffleSpillDiskWriteBufferSize =
"spark.shuffle.spill.diskWriteBufferSize";
+const std::string kSortShuffleReaderDeserializerBufferSize =
+ "spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize";
const std::string kQatBackendName = "qat";
const std::string kIaaBackendName = "iaa";
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index c761920c87..965bb2a7c4 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -825,6 +825,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jstring codecJstr,
jstring codecBackendJstr,
jint compressionLevel,
+ jint compressionBufferSize,
jint diskWriteBufferSize,
jint compressionThreshold,
jstring compressionModeJstr,
@@ -864,6 +865,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
auto partitionWriterOptions = PartitionWriterOptions{
.mergeBufferSize = mergeBufferSize,
.mergeThreshold = mergeThreshold,
+ .compressionBufferSize = compressionBufferSize,
.compressionThreshold = compressionThreshold,
.compressionType = getCompressionType(env, codecJstr),
.compressionTypeStr = getCompressionTypeStr(env, codecJstr),
@@ -1063,7 +1065,8 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
jstring compressionType,
jstring compressionBackend,
jint batchSize,
- jlong bufferSize,
+ jlong readerBufferSize,
+ jlong deserializerBufferSize,
jstring shuffleWriterType) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
@@ -1075,7 +1078,8 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
options.codecBackend = getCodecBackend(env, compressionBackend);
}
options.batchSize = batchSize;
- options.bufferSize = bufferSize;
+ options.readerBufferSize = readerBufferSize;
+ options.deserializerBufferSize = deserializerBufferSize;
options.shuffleWriterType =
ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterType));
std::shared_ptr<arrow::Schema> schema =
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc
b/cpp/core/shuffle/LocalPartitionWriter.cc
index d31cdde68a..8cd328b13f 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -37,7 +37,8 @@ class LocalPartitionWriter::LocalSpiller {
bool isFinal,
std::shared_ptr<arrow::io::OutputStream> os,
std::string spillFile,
- uint32_t compressionThreshold,
+ int32_t compressionBufferSize,
+ int32_t compressionThreshold,
arrow::MemoryPool* pool,
arrow::util::Codec* codec)
: isFinal_(isFinal),
@@ -49,7 +50,8 @@ class LocalPartitionWriter::LocalSpiller {
diskSpill_(std::make_unique<Spill>()) {
if (codec_ != nullptr) {
GLUTEN_ASSIGN_OR_THROW(
- compressedOs_, ShuffleCompressedOutputStream::Make(codec_, os,
arrow::default_memory_pool()));
+ compressedOs_,
+ ShuffleCompressedOutputStream::Make(codec_, compressionBufferSize,
os, arrow::default_memory_pool()));
}
}
@@ -151,7 +153,7 @@ class LocalPartitionWriter::LocalSpiller {
int64_t writePos_{0};
std::string spillFile_;
- uint32_t compressionThreshold_;
+ int32_t compressionThreshold_;
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
@@ -600,7 +602,13 @@ arrow::Status LocalPartitionWriter::requestSpill(bool
isFinal) {
ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile));
}
spiller_ = std::make_unique<LocalSpiller>(
- isFinal, os, std::move(spillFile), options_.compressionThreshold,
payloadPool_.get(), codec_.get());
+ isFinal,
+ os,
+ std::move(spillFile),
+ options_.compressionBufferSize,
+ options_.compressionThreshold,
+ payloadPool_.get(),
+ codec_.get());
}
return arrow::Status::OK();
}
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 07dd2999b0..cb6fec7ae8 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -30,7 +30,8 @@ static constexpr int64_t kDefaultSortBufferThreshold = 64 <<
20;
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
-static constexpr int32_t kDefaultDiskWriteBufferSize = 32 * 1024; // TODO:
compare performance with 1M (spark default)
+static constexpr int32_t kDefaultCompressionBufferSize = 32 * 1024;
+static constexpr int32_t kDefaultDiskWriteBufferSize = 1024 * 1024;
static const std::string kDefaultCompressionTypeStr = "lz4";
static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
@@ -39,6 +40,7 @@ static constexpr bool kEnableBufferedWrite = true;
static constexpr bool kDefaultUseRadixSort = true;
static constexpr int32_t kDefaultSortBufferSize = 4096;
static constexpr int64_t kDefaultReadBufferSize = 1 << 20;
+static constexpr int64_t kDefaultDeserializerBufferSize = 1 << 20;
static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10;
enum ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
@@ -51,7 +53,8 @@ struct ShuffleReaderOptions {
ShuffleWriterType shuffleWriterType = kHashShuffle;
CodecBackend codecBackend = CodecBackend::NONE;
int32_t batchSize = kDefaultBatchSize;
- int64_t bufferSize = kDefaultReadBufferSize;
+ int64_t readerBufferSize = kDefaultReadBufferSize;
+ int64_t deserializerBufferSize = kDefaultDeserializerBufferSize;
};
struct ShuffleWriterOptions {
@@ -73,6 +76,8 @@ struct ShuffleWriterOptions {
struct PartitionWriterOptions {
int32_t mergeBufferSize = kDefaultShuffleWriterBufferSize;
double mergeThreshold = kDefaultMergeBufferThreshold;
+ int32_t compressionBufferSize =
+ kDefaultCompressionBufferSize; //
spark.io.compression.lz4.blockSize,spark.io.compression.zstd.bufferSize
int32_t compressionThreshold = kDefaultCompressionThreshold;
arrow::Compression::type compressionType = arrow::Compression::LZ4_FRAME;
std::string compressionTypeStr = kDefaultCompressionTypeStr;
diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h
index cfe07e5f2c..fafb1fbfd5 100644
--- a/cpp/core/shuffle/Utils.h
+++ b/cpp/core/shuffle/Utils.h
@@ -112,9 +112,13 @@ class MmapFileStream : public arrow::io::InputStream {
class ShuffleCompressedOutputStream : public arrow::io::OutputStream {
public:
/// \brief Create a compressed output stream wrapping the given output
stream.
- static arrow::Result<std::shared_ptr<ShuffleCompressedOutputStream>>
- Make(arrow::util::Codec* codec, const std::shared_ptr<OutputStream>& raw,
arrow::MemoryPool* pool) {
- auto res = std::shared_ptr<ShuffleCompressedOutputStream>(new
ShuffleCompressedOutputStream(codec, raw, pool));
+ static arrow::Result<std::shared_ptr<ShuffleCompressedOutputStream>> Make(
+ arrow::util::Codec* codec,
+ int32_t compressionBufferSize,
+ const std::shared_ptr<OutputStream>& raw,
+ arrow::MemoryPool* pool) {
+ auto res = std::shared_ptr<ShuffleCompressedOutputStream>(
+ new ShuffleCompressedOutputStream(codec, compressionBufferSize, raw,
pool));
RETURN_NOT_OK(res->Init(codec));
return res;
}
@@ -221,13 +225,14 @@ class ShuffleCompressedOutputStream : public
arrow::io::OutputStream {
ShuffleCompressedOutputStream(
arrow::util::Codec* codec,
+ int32_t compressionBufferSize,
const std::shared_ptr<OutputStream>& raw,
arrow::MemoryPool* pool)
- : codec_(codec), raw_(raw), pool_(pool) {}
+ : codec_(codec), compressionBufferSize_(compressionBufferSize),
raw_(raw), pool_(pool) {}
arrow::Status Init(arrow::util::Codec* codec) {
ARROW_ASSIGN_OR_RAISE(compressor_, codec->MakeCompressor());
- ARROW_ASSIGN_OR_RAISE(compressed_, AllocateResizableBuffer(kChunkSize,
pool_));
+ ARROW_ASSIGN_OR_RAISE(compressed_,
AllocateResizableBuffer(compressionBufferSize_, pool_));
compressedPos_ = 0;
isOpen_ = true;
return arrow::Status::OK();
@@ -270,11 +275,8 @@ class ShuffleCompressedOutputStream : public
arrow::io::OutputStream {
return arrow::Status::OK();
}
- // TODO: Support setting chunk size
- // Write 64 KB compressed data at a time
- static const int64_t kChunkSize = 64 * 1024;
-
arrow::util::Codec* codec_;
+ int32_t compressionBufferSize_;
std::shared_ptr<OutputStream> raw_;
arrow::MemoryPool* pool_;
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 9695ce35c1..1161d06fc5 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -86,7 +86,9 @@ RssPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayl
RETURN_NOT_OK(rssOs_->init());
if (codec_ != nullptr) {
ARROW_ASSIGN_OR_RAISE(
- compressedOs_, ShuffleCompressedOutputStream::Make(codec_.get(),
rssOs_, arrow::default_memory_pool()));
+ compressedOs_,
+ ShuffleCompressedOutputStream::Make(
+ codec_.get(), options_.compressionBufferSize, rssOs_,
arrow::default_memory_pool()));
}
lastEvictedPartitionId_ = partitionId;
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index c1dd2bb4c5..d8cfcac5c0 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -270,7 +270,8 @@ std::shared_ptr<ShuffleReader>
VeloxRuntime::createShuffleReader(
veloxCompressionType,
rowType,
options.batchSize,
- options.bufferSize,
+ options.readerBufferSize,
+ options.deserializerBufferSize,
memoryManager()->getArrowMemoryPool(),
ctxVeloxPool,
options.shuffleWriterType);
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index a5c727f602..588a1aa808 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -38,14 +38,11 @@
#include "velox/vector/arrow/Bridge.h"
#include <algorithm>
-#include <iostream>
using namespace facebook::velox;
namespace gluten {
namespace {
-constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize;
-
struct BufferViewReleaser {
BufferViewReleaser() : BufferViewReleaser(nullptr) {}
@@ -378,7 +375,8 @@
VeloxSortShuffleReaderDeserializer::VeloxSortShuffleReaderDeserializer(
const std::shared_ptr<arrow::util::Codec>& codec,
const RowTypePtr& rowType,
int32_t batchSize,
- int64_t bufferSize,
+ int64_t readerBufferSize,
+ int64_t deserializerBufferSize,
arrow::MemoryPool* memoryPool,
facebook::velox::memory::MemoryPool* veloxPool,
int64_t& deserializeTime,
@@ -387,13 +385,14 @@
VeloxSortShuffleReaderDeserializer::VeloxSortShuffleReaderDeserializer(
codec_(codec),
rowType_(rowType),
batchSize_(batchSize),
- veloxPool_(veloxPool),
+ deserializerBufferSize_(deserializerBufferSize),
deserializeTime_(deserializeTime),
- decompressTime_(decompressTime) {
+ decompressTime_(decompressTime),
+ veloxPool_(veloxPool) {
if (codec_ != nullptr) {
GLUTEN_ASSIGN_OR_THROW(in_, CompressedInputStream::Make(codec_.get(),
std::move(in), memoryPool));
} else {
- GLUTEN_ASSIGN_OR_THROW(in_,
arrow::io::BufferedInputStream::Create(bufferSize, memoryPool, std::move(in)));
+ GLUTEN_ASSIGN_OR_THROW(in_,
arrow::io::BufferedInputStream::Create(readerBufferSize, memoryPool,
std::move(in)));
}
}
@@ -409,7 +408,8 @@ std::shared_ptr<ColumnarBatch>
VeloxSortShuffleReaderDeserializer::next() {
}
if (rowBuffer_ == nullptr) {
- rowBuffer_ = AlignedBuffer::allocate<char>(kMaxReadBufferSize, veloxPool_);
+ rowBuffer_ =
+ AlignedBuffer::allocate<char>(deserializerBufferSize_, veloxPool_,
std::nullopt, true /*allocateExact*/);
rowBufferPtr_ = rowBuffer_->asMutable<char>();
data_.reserve(batchSize_);
}
@@ -428,12 +428,18 @@ std::shared_ptr<ColumnarBatch>
VeloxSortShuffleReaderDeserializer::next() {
return nullptr;
}
- GLUTEN_CHECK(
- lastRowSize_ <= kMaxReadBufferSize, "Row size exceeds max read buffer
size: " + std::to_string(lastRowSize_));
-
- if (lastRowSize_ + bytesRead_ > kMaxReadBufferSize) {
+ if (cachedRows_ > 0 && lastRowSize_ + bytesRead_ > rowBuffer_->size()) {
return deserializeToBatch();
}
+
+ if (lastRowSize_ > deserializerBufferSize_) {
+ auto newSize = facebook::velox::bits::nextPowerOfTwo(lastRowSize_);
+ LOG(WARNING) << "Row size " << lastRowSize_ << " exceeds deserializer
buffer size " << rowBuffer_->size()
+ << ". Resizing buffer to " << newSize;
+ rowBuffer_ = AlignedBuffer::allocate<char>(newSize, veloxPool_,
std::nullopt, true /*allocateExact*/);
+ rowBufferPtr_ = rowBuffer_->asMutable<char>();
+ }
+
readNextRow();
}
@@ -558,7 +564,8 @@
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
const facebook::velox::common::CompressionKind veloxCompressionType,
const RowTypePtr& rowType,
int32_t batchSize,
- int64_t bufferSize,
+ int64_t readerBufferSize,
+ int64_t deserializerBufferSize,
arrow::MemoryPool* memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
ShuffleWriterType shuffleWriterType)
@@ -567,7 +574,8 @@
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
veloxCompressionType_(veloxCompressionType),
rowType_(rowType),
batchSize_(batchSize),
- bufferSize_(bufferSize),
+ readerBufferSize_(readerBufferSize),
+ deserializerBufferSize_(deserializerBufferSize),
memoryPool_(memoryPool),
veloxPool_(veloxPool),
shuffleWriterType_(shuffleWriterType) {
@@ -584,7 +592,7 @@ std::unique_ptr<ColumnarBatchIterator>
VeloxShuffleReaderDeserializerFactory::cr
codec_,
rowType_,
batchSize_,
- bufferSize_,
+ readerBufferSize_,
memoryPool_,
veloxPool_.get(),
&isValidityBuffer_,
@@ -598,7 +606,8 @@ std::unique_ptr<ColumnarBatchIterator>
VeloxShuffleReaderDeserializerFactory::cr
codec_,
rowType_,
batchSize_,
- bufferSize_,
+ readerBufferSize_,
+ deserializerBufferSize_,
memoryPool_,
veloxPool_.get(),
deserializeTime_,
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h
b/cpp/velox/shuffle/VeloxShuffleReader.h
index 29ca4d218b..de486d5991 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -73,7 +73,8 @@ class VeloxSortShuffleReaderDeserializer final : public
ColumnarBatchIterator {
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
- int64_t bufferSize,
+ int64_t readerBufferSize,
+ int64_t deserializerBufferSize,
arrow::MemoryPool* memoryPool,
facebook::velox::memory::MemoryPool* veloxPool,
int64_t& deserializeTime,
@@ -91,11 +92,14 @@ class VeloxSortShuffleReaderDeserializer final : public
ColumnarBatchIterator {
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::RowTypePtr rowType_;
+
uint32_t batchSize_;
- facebook::velox::memory::MemoryPool* veloxPool_;
+ int64_t deserializerBufferSize_;
int64_t& deserializeTime_;
int64_t& decompressTime_;
+ facebook::velox::memory::MemoryPool* veloxPool_;
+
facebook::velox::BufferPtr rowBuffer_{nullptr};
char* rowBufferPtr_{nullptr};
uint32_t bytesRead_{0};
@@ -145,7 +149,8 @@ class VeloxShuffleReaderDeserializerFactory {
const facebook::velox::common::CompressionKind veloxCompressionType,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
- int64_t bufferSize,
+ int64_t readerBufferSize,
+ int64_t deserializerBufferSize,
arrow::MemoryPool* memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
ShuffleWriterType shuffleWriterType);
@@ -166,7 +171,8 @@ class VeloxShuffleReaderDeserializerFactory {
facebook::velox::common::CompressionKind veloxCompressionType_;
facebook::velox::RowTypePtr rowType_;
int32_t batchSize_;
- int64_t bufferSize_;
+ int64_t readerBufferSize_;
+ int64_t deserializerBufferSize_;
arrow::MemoryPool* memoryPool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index 7a9c6c8971..4a69c9a7e1 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -367,6 +367,7 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
rowType,
kDefaultBatchSize,
kDefaultReadBufferSize,
+ kDefaultDeserializerBufferSize,
defaultArrowMemoryPool().get(),
pool_,
GetParam().shuffleWriterType);
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 1741d3e34e..440a3a32c5 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -11,97 +11,98 @@ You can add these configurations into spark-defaults.conf
to enable or disable t
## Spark parameters
-| Parameters | Description
[...]
-|--------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| spark.driver.extraClassPath | Gluten Plugin
jar file to prepend to the classpath of the driver.
[...]
-| spark.executor.extraClassPath | Gluten Plugin
jar file to prepend to the classpath of executors.
[...]
-| spark.executor.memory | Amount of
memory to use per executor process, in the same format as JVM memory strings
with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g).
[...]
-| spark.memory.offHeap.size | The absolute
amount of memory which can be used for off-heap allocation, in bytes unless
otherwise specified.<br /> Note: Gluten Plugin will leverage this setting to
allocate memory space for native usage even offHeap is disabled. <br /> The
value is based on your system and it is recommended to set it larger if you are
facing Out of Memory issue in Gluten Plugin
[...]
-| spark.sql.sources.useV1SourceList | A
comma-separated list of data source short names or fully qualified data source
implementation class names for which Data Source V2 code path is disabled. <br
/> Note: Please use V1 source for avro.
[...]
-| spark.sql.join.preferSortMergeJoin | When true,
prefer sort merge join over shuffled hash join. <br /> Note: Please turn off
preferSortMergeJoin.
[...]
-| spark.plugins | To load
Gluten's components by Spark's plug-in loader
[...]
-| spark.shuffle.manager | To turn on
Gluten Columnar Shuffle Plugin
[...]
-| spark.gluten.enabled | Enable Gluten
at runtime, default is true. Fall back to vanilla Spark for all query plans if
set to false. Recommend to enable/disable Gluten through the setting for
`spark.plugins`.
[...]
-| spark.gluten.memory.isolation |
(Experimental) Enable isolated memory mode. If true, Gluten controls the
maximum off-heap memory can be used by each task to X, X = executor memory /
max task slots. It's recommended to set true if Gluten serves concurrent
queries within a single session, since not all memory Gluten allocated is
guaranteed to be spillable. In the case, the feature should be enabled to avoid
OOM. Note when true, setting spark.memory.storageF [...]
-| spark.gluten.ras.enabled | Enables RAS
(relation algebra selector) during physical planning to generate more efficient
query plan. Note, this feature doesn't bring performance profits by default.
Try exploring option `spark.gluten.ras.costModel` for advanced usage.
[...]
-| spark.gluten.sql.columnar.maxBatchSize | Number of
rows to be processed in each batch. Default value is 4096.
[...]
-| spark.gluten.sql.columnar.scanOnly | When enabled,
this config will overwrite all other operators' enabling, and only Scan and
Filter pushdown will be offloaded to native.
[...]
-| spark.gluten.sql.columnar.batchscan | Enable or
Disable Columnar BatchScan, default is true
[...]
-| spark.gluten.sql.columnar.hashagg | Enable or
Disable Columnar Hash Aggregate, default is true
[...]
-| spark.gluten.sql.columnar.project | Enable or
Disable Columnar Project, default is true
[...]
-| spark.gluten.sql.columnar.filter | Enable or
Disable Columnar Filter, default is true
[...]
-| spark.gluten.sql.columnar.sort | Enable or
Disable Columnar Sort, default is true
[...]
-| spark.gluten.sql.columnar.window | Enable or
Disable Columnar Window, default is true
[...]
-| spark.gluten.sql.columnar.shuffledHashJoin | Enable or
Disable ShuffledHashJoin, default is true
[...]
-| spark.gluten.sql.columnar.shuffledHashJoin.optimizeBuildSide | Whether to
allow Gluten to choose an optimal build side for shuffled hash join
[...]
-| spark.gluten.sql.columnar.forceShuffledHashJoin | Force to use
ShuffledHashJoin over SortMergeJoin, default is true. For queries that can
benefit from storaged patitioned join, please set it to false.
[...]
-| spark.gluten.sql.columnar.sortMergeJoin | Enable or
Disable Columnar Sort Merge Join, default is true
[...]
-| spark.gluten.sql.columnar.union | Enable or
Disable Columnar Union, default is true
[...]
-| spark.gluten.sql.columnar.expand | Enable or
Disable Columnar Expand, default is true
[...]
-| spark.gluten.sql.columnar.generate | Enable or
Disable Columnar Generate, default is true
[...]
-| spark.gluten.sql.columnar.limit | Enable or
Disable Columnar Limit, default is true
[...]
-| spark.gluten.sql.columnar.tableCache | Enable or
Disable Columnar Table Cache, default is false
[...]
-| spark.gluten.sql.columnar.broadcastExchange | Enable or
Disable Columnar Broadcast Exchange, default is true
[...]
-| spark.gluten.sql.columnar.broadcastJoin | Enable or
Disable Columnar BroadcastHashJoin, default is true
[...]
-| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | The threshold
to determine whether to use sort-based columnar shuffle. Sort-based shuffle
will be used if the number of partitions is greater than this threshold.
[...]
-| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | The threshold
to determine whether to use sort-based columnar shuffle. Sort-based shuffle
will be used if the number of columns is greater than this threshold.
[...]
-| spark.gluten.sql.columnar.shuffle.codec | Set up the
codec to be used for Columnar Shuffle. If this configuration is not set, will
check the value of spark.io.compression.codec. By default, Gluten use software
compression. Valid options for software compression are lz4, zstd. Valid
options for QAT and IAA is gzip.
[...]
-| spark.gluten.sql.columnar.shuffle.codecBackend | Enable using
hardware accelerators for shuffle de/compression. Valid options are QAT and
IAA.
[...]
-| spark.gluten.sql.columnar.shuffle.compressionMode | Setting
different compression mode in shuffle, Valid options are buffer and rowvector,
buffer option compress each buffer of RowVector individually into one
pre-allocated large buffer, rowvector option first copies each buffer of
RowVector to a large buffer and then compress the entire buffer in one go.
[...]
-| spark.gluten.sql.columnar.shuffle.compression.threshold | If number of
rows in a batch falls below this threshold, will copy all buffers into one
buffer to compress.
[...]
-| spark.gluten.sql.columnar.shuffle.realloc.threshold | Set the
threshold to dynamically adjust the size of shuffle split buffers. The size of
each split buffer is recalculated for each incoming batch of data. If the new
size deviates from the current partition buffer size by a factor outside the
range of [1 - threshold, 1 + threshold], the split buffer will be re-allocated
using the newly calculated size
[...]
-| spark.gluten.sql.columnar.shuffle.merge.threshold | Set the
threshold control the minimum merged size. When a partition buffer is full, and
the number of rows is below (`threshold *
spark.gluten.sql.columnar.maxBatchSize`), it will be saved for merging.
[...]
-| spark.gluten.sql.columnar.shuffle.readerBufferSize | Buffer size
in bytes for shuffle reader reading input stream from local or remote.
[...]
-| spark.gluten.sql.columnar.numaBinding | Set up
NUMABinding, default is false
[...]
-| spark.gluten.sql.columnar.coreRange | Set up the
core range for NUMABinding, only works when numaBinding set to true. <br /> The
setting is based on the number of cores in your system. Use 72 cores as an
example.
[...]
-| spark.gluten.sql.columnar.wholeStage.fallback.threshold | Configure the
threshold for whether whole stage will fall back in AQE supported case by
counting the number of ColumnarToRow & vanilla leaf node
[...]
-| spark.gluten.sql.columnar.query.fallback.threshold | Configure the
threshold for whether query will fall back by counting the number of
ColumnarToRow & vanilla leaf node
[...]
-| spark.gluten.sql.columnar.fallback.ignoreRowToColumnar | When true,
the fallback policy ignores the RowToColumnar when counting fallback number.
[...]
-| spark.gluten.sql.columnar.fallback.preferColumnar | When true,
the fallback policy prefers to use Gluten plan rather than vanilla Spark plan
if the both of them contains ColumnarToRow and the vanilla Spark plan
ColumnarToRow number is not smaller than Gluten plan.
[...]
-| spark.gluten.sql.columnar.force.hashagg | Force to use
hash agg to replace sort agg.
[...]
-| spark.gluten.sql.mergeTwoPhasesAggregate.enabled | Whether to
merge two phases aggregate if there are no other operators between them.
[...]
-| spark.gluten.sql.columnar.vanillaReaders | Enable
vanilla spark's vectorized reader. Please note it may bring perf. overhead due
to extra data transition. We recommend to disable it if most queries can be
fully offloaded to gluten.
[...]
-| spark.gluten.sql.native.bloomFilter | Enable or
Disable native runtime bloom filter.
[...]
-| spark.gluten.sql.native.arrow.reader.enabled | Enable or
Disable native arrow read CSV file format
[...]
-| spark.gluten.shuffleWriter.bufferSize | Set the
number of buffer rows for the shuffle writer
[...]
-| spark.gluten.loadLibFromJar | Controls
whether to load dynamic link library from a packed jar for gluten/cpp. Not
applicable to static build and clickhouse backend.
[...]
-| spark.gluten.loadLibOS | When
`spark.gluten.loadLibFromJar` is true. Manually specify the system os to load
library, e.g., CentOS
[...]
-| spark.gluten.loadLibOSVersion | Manually
specify the system os version to load library, e.g., if
`spark.gluten.loadLibOS` is CentOS, this config can be 7
[...]
-| spark.gluten.expression.blacklist | A black list
of expression to skip transform, multiple values separated by commas.
[...]
-| spark.gluten.sql.columnar.fallback.expressions.threshold | Fall back
filter/project if the height of expression tree reaches this threshold,
considering Spark codegen can bring better performance for such case.
[...]
-| spark.gluten.sql.cartesianProductTransformerEnabled | Config to
enable CartesianProductExecTransformer.
[...]
-| spark.gluten.sql.broadcastNestedLoopJoinTransformerEnabled | Config to
enable BroadcastNestedLoopJoinExecTransformer.
[...]
-| spark.gluten.sql.cacheWholeStageTransformerContext | When true,
`WholeStageTransformer` will cache the `WholeStageTransformerContext` when
executing. It is used to get substrait plan node and native plan string.
[...]
-| spark.gluten.sql.injectNativePlanStringToExplain | When true,
Gluten will inject native plan tree to Spark's explain output.
[...]
-| spark.gluten.sql.fallbackRegexpExpressions | When true,
Gluten will fall back all regexp expressions to avoid any incompatibility risk.
[...]
+| Parameters | Description
[...]
+|---------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| spark.driver.extraClassPath | Gluten
Plugin jar file to prepend to the classpath of the driver.
[...]
+| spark.executor.extraClassPath | Gluten
Plugin jar file to prepend to the classpath of executors.
[...]
+| spark.executor.memory | Amount of
memory to use per executor process, in the same format as JVM memory strings
with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g).
[...]
+| spark.memory.offHeap.size | The absolute
amount of memory which can be used for off-heap allocation, in bytes unless
otherwise specified.<br /> Note: Gluten Plugin will leverage this setting to
allocate memory space for native usage even offHeap is disabled. <br /> The
value is based on your system and it is recommended to set it larger if you are
facing Out of Memory issue in Gluten Plugin
[...]
+| spark.sql.sources.useV1SourceList | A
comma-separated list of data source short names or fully qualified data source
implementation class names for which Data Source V2 code path is disabled. <br
/> Note: Please use V1 source for avro.
[...]
+| spark.sql.join.preferSortMergeJoin | When true,
prefer sort merge join over shuffled hash join. <br /> Note: Please turn off
preferSortMergeJoin.
[...]
+| spark.plugins | To load
Gluten's components by Spark's plug-in loader
[...]
+| spark.shuffle.manager | To turn on
Gluten Columnar Shuffle Plugin
[...]
+| spark.gluten.enabled | Enable
Gluten at runtime, default is true. Fall back to vanilla Spark for all query
plans if set to false. Recommend to enable/disable Gluten through the setting
for `spark.plugins`.
[...]
+| spark.gluten.memory.isolation |
(Experimental) Enable isolated memory mode. If true, Gluten controls the
maximum off-heap memory can be used by each task to X, X = executor memory /
max task slots. It's recommended to set true if Gluten serves concurrent
queries within a single session, since not all memory Gluten allocated is
guaranteed to be spillable. In the case, the feature should be enabled to avoid
OOM. Note when true, setting spark.memory.storage [...]
+| spark.gluten.ras.enabled | Enables RAS
(relation algebra selector) during physical planning to generate more efficient
query plan. Note, this feature doesn't bring performance profits by default.
Try exploring option `spark.gluten.ras.costModel` for advanced usage.
[...]
+| spark.gluten.sql.columnar.maxBatchSize | Number of
rows to be processed in each batch. Default value is 4096.
[...]
+| spark.gluten.sql.columnar.scanOnly | When
enabled, this config will overwrite all other operators' enabling, and only
Scan and Filter pushdown will be offloaded to native.
[...]
+| spark.gluten.sql.columnar.batchscan | Enable or
Disable Columnar BatchScan, default is true
[...]
+| spark.gluten.sql.columnar.hashagg | Enable or
Disable Columnar Hash Aggregate, default is true
[...]
+| spark.gluten.sql.columnar.project | Enable or
Disable Columnar Project, default is true
[...]
+| spark.gluten.sql.columnar.filter | Enable or
Disable Columnar Filter, default is true
[...]
+| spark.gluten.sql.columnar.sort | Enable or
Disable Columnar Sort, default is true
[...]
+| spark.gluten.sql.columnar.window | Enable or
Disable Columnar Window, default is true
[...]
+| spark.gluten.sql.columnar.shuffledHashJoin | Enable or
Disable ShuffledHashJoin, default is true
[...]
+| spark.gluten.sql.columnar.shuffledHashJoin.optimizeBuildSide | Whether to
allow Gluten to choose an optimal build side for shuffled hash join
[...]
+| spark.gluten.sql.columnar.forceShuffledHashJoin | Force to use
ShuffledHashJoin over SortMergeJoin, default is true. For queries that can
benefit from storaged patitioned join, please set it to false.
[...]
+| spark.gluten.sql.columnar.sortMergeJoin | Enable or
Disable Columnar Sort Merge Join, default is true
[...]
+| spark.gluten.sql.columnar.union | Enable or
Disable Columnar Union, default is true
[...]
+| spark.gluten.sql.columnar.expand | Enable or
Disable Columnar Expand, default is true
[...]
+| spark.gluten.sql.columnar.generate | Enable or
Disable Columnar Generate, default is true
[...]
+| spark.gluten.sql.columnar.limit | Enable or
Disable Columnar Limit, default is true
[...]
+| spark.gluten.sql.columnar.tableCache | Enable or
Disable Columnar Table Cache, default is false
[...]
+| spark.gluten.sql.columnar.broadcastExchange | Enable or
Disable Columnar Broadcast Exchange, default is true
[...]
+| spark.gluten.sql.columnar.broadcastJoin | Enable or
Disable Columnar BroadcastHashJoin, default is true
[...]
+| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | The
threshold to determine whether to use sort-based columnar shuffle. Sort-based
shuffle will be used if the number of partitions is greater than this
threshold.
[...]
+| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | The
threshold to determine whether to use sort-based columnar shuffle. Sort-based
shuffle will be used if the number of columns is greater than this threshold.
[...]
+| spark.gluten.sql.columnar.shuffle.codec | Set up the
codec to be used for Columnar Shuffle. If this configuration is not set, will
check the value of spark.io.compression.codec. By default, Gluten use software
compression. Valid options for software compression are lz4, zstd. Valid
options for QAT and IAA is gzip.
[...]
+| spark.gluten.sql.columnar.shuffle.codecBackend | Enable using
hardware accelerators for shuffle de/compression. Valid options are QAT and
IAA.
[...]
+| spark.gluten.sql.columnar.shuffle.compressionMode | Setting
different compression mode in shuffle, Valid options are buffer and rowvector,
buffer option compress each buffer of RowVector individually into one
pre-allocated large buffer, rowvector option first copies each buffer of
RowVector to a large buffer and then compress the entire buffer in one go.
[...]
+| spark.gluten.sql.columnar.shuffle.compression.threshold | If number of
rows in a batch falls below this threshold, will copy all buffers into one
buffer to compress.
[...]
+| spark.gluten.sql.columnar.shuffle.realloc.threshold | Set the
threshold to dynamically adjust the size of shuffle split buffers. The size of
each split buffer is recalculated for each incoming batch of data. If the new
size deviates from the current partition buffer size by a factor outside the
range of [1 - threshold, 1 + threshold], the split buffer will be re-allocated
using the newly calculated size
[...]
+| spark.gluten.sql.columnar.shuffle.merge.threshold | Set the
threshold control the minimum merged size. When a partition buffer is full, and
the number of rows is below (`threshold *
spark.gluten.sql.columnar.maxBatchSize`), it will be saved for merging.
[...]
+| spark.gluten.sql.columnar.shuffle.readerBufferSize | Buffer size
in bytes for shuffle reader reading input stream from local or remote.
[...]
+| spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize | Buffer size
in bytes for sort-based shuffle reader deserializing raw input to columnar
batch.
[...]
+| spark.gluten.sql.columnar.numaBinding | Set up
NUMABinding, default is false
[...]
+| spark.gluten.sql.columnar.coreRange | Set up the
core range for NUMABinding, only works when numaBinding set to true. <br /> The
setting is based on the number of cores in your system. Use 72 cores as an
example.
[...]
+| spark.gluten.sql.columnar.wholeStage.fallback.threshold | Configure
the threshold for whether whole stage will fall back in AQE supported case by
counting the number of ColumnarToRow & vanilla leaf node
[...]
+| spark.gluten.sql.columnar.query.fallback.threshold | Configure
the threshold for whether query will fall back by counting the number of
ColumnarToRow & vanilla leaf node
[...]
+| spark.gluten.sql.columnar.fallback.ignoreRowToColumnar | When true,
the fallback policy ignores the RowToColumnar when counting fallback number.
[...]
+| spark.gluten.sql.columnar.fallback.preferColumnar | When true,
the fallback policy prefers to use Gluten plan rather than vanilla Spark plan
if the both of them contains ColumnarToRow and the vanilla Spark plan
ColumnarToRow number is not smaller than Gluten plan.
[...]
+| spark.gluten.sql.columnar.force.hashagg | Force to use
hash agg to replace sort agg.
[...]
+| spark.gluten.sql.mergeTwoPhasesAggregate.enabled | Whether to
merge two phases aggregate if there are no other operators between them.
[...]
+| spark.gluten.sql.columnar.vanillaReaders | Enable
vanilla spark's vectorized reader. Please note it may bring perf. overhead due
to extra data transition. We recommend to disable it if most queries can be
fully offloaded to gluten.
[...]
+| spark.gluten.sql.native.bloomFilter | Enable or
Disable native runtime bloom filter.
[...]
+| spark.gluten.sql.native.arrow.reader.enabled | Enable or
Disable native arrow read CSV file format
[...]
+| spark.gluten.shuffleWriter.bufferSize | Set the
number of buffer rows for the shuffle writer
[...]
+| spark.gluten.loadLibFromJar | Controls
whether to load dynamic link library from a packed jar for gluten/cpp. Not
applicable to static build and clickhouse backend.
[...]
+| spark.gluten.loadLibOS | When
`spark.gluten.loadLibFromJar` is true. Manually specify the system os to load
library, e.g., CentOS
[...]
+| spark.gluten.loadLibOSVersion | Manually
specify the system os version to load library, e.g., if
`spark.gluten.loadLibOS` is CentOS, this config can be 7
[...]
+| spark.gluten.expression.blacklist | A black list
of expression to skip transform, multiple values separated by commas.
[...]
+| spark.gluten.sql.columnar.fallback.expressions.threshold | Fall back
filter/project if the height of expression tree reaches this threshold,
considering Spark codegen can bring better performance for such case.
[...]
+| spark.gluten.sql.cartesianProductTransformerEnabled | Config to
enable CartesianProductExecTransformer.
[...]
+| spark.gluten.sql.broadcastNestedLoopJoinTransformerEnabled | Config to
enable BroadcastNestedLoopJoinExecTransformer.
[...]
+| spark.gluten.sql.cacheWholeStageTransformerContext | When true,
`WholeStageTransformer` will cache the `WholeStageTransformerContext` when
executing. It is used to get substrait plan node and native plan string.
[...]
+| spark.gluten.sql.injectNativePlanStringToExplain | When true,
Gluten will inject native plan tree to Spark's explain output.
[...]
+| spark.gluten.sql.fallbackRegexpExpressions | When true,
Gluten will fall back all regexp expressions to avoid any incompatibility risk.
[...]
## Velox Parameters
The following configurations are related to Velox settings.
-| Parameters |
Description
| Recommend
Setting |
-|----------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|-------------------|
-| spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems | The
default number of expected items for the velox bloomfilter.
| 1000000L
|
-| spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits | The
default number of bits to use for the velox bloom filter.
| 8388608L
|
-| spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits | The
max number of bits to use for the velox bloom filter.
| 4194304L
|
-| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled |
Disables caching if false. File handle cache should be disabled if files are
mutable, i.e. file content may change while file path stays the same. |
|
-| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | Set
the directory size guess for velox file scan.
|
|
-| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | Set
the file preload threshold for velox file scan.
|
|
-| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | Set
the prefetch row groups for velox file scan.
|
|
-| spark.gluten.sql.columnar.backend.velox.loadQuantum | Set
the load quantum for velox file scan.
|
|
-| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance | Set
the max coalesced distance for velox file scan.
|
|
-| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | Set
the max coalesced bytes for velox file scan.
|
|
-| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | Set
prefetch cache min pct for velox file scan.
|
|
-| spark.gluten.velox.awsSdkLogLevel | Log
granularity of AWS C++ SDK in velox.
| FATAL
|
-| spark.gluten.velox.castFromVarcharAddTrimNode | If
enabled, will add a trim node to CAST-from-varchar. Default is false.
|
|
-| spark.gluten.velox.fs.s3a.retry.mode | Retry
mode for AWS s3 connection error, can be "legacy", "standard" and "adaptive".
| legacy
|
-| spark.gluten.velox.fs.s3a.connect.timeout |
Timeout for AWS s3 connection.
| 1s
|
-| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled |
Enable velox orc scan. If disabled, vanilla spark orc scan will be used.
| true
|
-| spark.gluten.sql.complexType.scan.fallback.enabled | Force
fallback for complex type scan, including struct, map, array.
| true
|
+| Parameters |
Description
| Recommend Setting |
+|----------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------|
+| spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems | The
default number of expected items for the velox bloomfilter.
|
1000000L |
+| spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits | The
default number of bits to use for the velox bloom filter.
|
8388608L |
+| spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits | The
max number of bits to use for the velox bloom filter.
|
4194304L |
+| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled |
Disables caching if false. File handle cache should be disabled if files are
mutable, i.e. file content may change while file path stays the same.
| |
+| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | Set
the directory size guess for velox file scan.
|
|
+| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | Set
the file preload threshold for velox file scan.
|
|
+| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | Set
the prefetch row groups for velox file scan.
|
|
+| spark.gluten.sql.columnar.backend.velox.loadQuantum | Set
the load quantum for velox file scan.
|
|
+| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance | Set
the max coalesced distance for velox file scan.
|
|
+| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | Set
the max coalesced bytes for velox file scan.
|
|
+| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | Set
prefetch cache min pct for velox file scan.
|
|
+| spark.gluten.velox.awsSdkLogLevel | Log
granularity of AWS C++ SDK in velox.
|
FATAL |
+| spark.gluten.velox.castFromVarcharAddTrimNode | If
enabled, will add a trim node to CAST-from-varchar. Default is false.
|
|
+| spark.gluten.velox.fs.s3a.retry.mode | Retry
mode for AWS s3 connection error, can be "legacy", "standard" and "adaptive".
|
legacy |
+| spark.gluten.velox.fs.s3a.connect.timeout |
Timeout for AWS s3 connection.
| 1s |
+| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled |
Enable velox orc scan. If disabled, vanilla spark orc scan will be used.
| true |
+| spark.gluten.sql.complexType.scan.fallback.enabled | Force
fallback for complex type scan, including struct, map, array.
| true
|
| spark.gluten.velox.offHeapBroadcastBuildRelation.enabled |
Experimental: If enabled, broadcast build relation will use offheap memory.
Otherwise, broadcast build relation will use onheap memory, default value is
false | |
-| spark.gluten.auto.adjustStageResource.enabled |
Experimental: If enabled, gluten will try to set the stage resource according
to stage execution plan. NOTE: Only workes when aqe is enabled at the same
time. | false |
-| spark.gluten.auto.adjustStageResources.heap.ratio |
Experimental: Increase executor heap memory when match adjust stage resource
rule. |
2.0d |
-| spark.gluten.auto.adjustStageResources.fallenNode.ratio.threshold |
Experimental: Increase executor heap memory when stage contains fallen node
count exceeds the total node count ratio. |
0.5d |
+| spark.gluten.auto.adjustStageResource.enabled |
Experimental: If enabled, gluten will try to set the stage resource according
to stage execution plan. NOTE: Only workes when aqe is enabled at the same
time. | false |
+| spark.gluten.auto.adjustStageResources.heap.ratio |
Experimental: Increase executor heap memory when match adjust stage resource
rule.
| 2.0d |
+| spark.gluten.auto.adjustStageResources.fallenNode.ratio.threshold |
Experimental: Increase executor heap memory when stage contains fallen node
count exceeds the total node count ratio.
| 0.5d |
Additionally, you can control the configurations of gluten at thread level by
local property.
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
index 8d031cf079..46787d4209 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
@@ -40,7 +40,8 @@ public class ShuffleReaderJniWrapper implements RuntimeAware {
String compressionType,
String compressionCodecBackend,
int batchSize,
- long bufferSize,
+ long readerBufferSize,
+ long deserializerBufferSize,
String shuffleWriterType);
public native long readStream(long shuffleReaderHandle, JniByteInputStream
jniIn);
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
index 575e383afc..90fea2c11e 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
@@ -60,6 +60,7 @@ public class ShuffleWriterJniWrapper implements RuntimeAware {
String codecBackend,
int compressionLevel,
int compressionBufferSize,
+ int diskWriteBufferSize,
int bufferCompressThreshold,
String compressionMode,
int sortBufferInitialSize,
@@ -82,6 +83,7 @@ public class ShuffleWriterJniWrapper implements RuntimeAware {
codecBackend,
compressionLevel,
compressionBufferSize,
+ diskWriteBufferSize,
bufferCompressThreshold,
compressionMode,
sortBufferInitialSize,
@@ -114,6 +116,7 @@ public class ShuffleWriterJniWrapper implements
RuntimeAware {
String codec,
int compressionLevel,
int compressionBufferSize,
+ int diskWriteBufferSize,
int bufferCompressThreshold,
String compressionMode,
int sortBufferInitialSize,
@@ -137,6 +140,7 @@ public class ShuffleWriterJniWrapper implements
RuntimeAware {
null,
compressionLevel,
compressionBufferSize,
+ diskWriteBufferSize,
bufferCompressThreshold,
compressionMode,
sortBufferInitialSize,
@@ -165,6 +169,7 @@ public class ShuffleWriterJniWrapper implements
RuntimeAware {
String codecBackend,
int compressionLevel,
int compressionBufferSize,
+ int diskWriteBufferSize,
int bufferCompressThreshold,
String compressionMode,
int sortBufferInitialSize,
diff --git
a/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
b/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
index 03290c3445..737c42c4ac 100644
---
a/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
+++
b/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
@@ -87,21 +87,24 @@ abstract class CelebornColumnarShuffleWriter[K, V](
protected val blockManager: BlockManager = SparkEnv.get.blockManager
- protected val customizedCompressionCodec: String =
+ protected val compressionCodec: Option[String] =
if (conf.getBoolean(SHUFFLE_COMPRESS.key,
SHUFFLE_COMPRESS.defaultValue.get)) {
- GlutenShuffleUtils.getCompressionCodec(conf)
+ Some(GlutenShuffleUtils.getCompressionCodec(conf))
} else {
- null // uncompressed
+ None
}
- protected val compressionLevel: Int =
- GlutenShuffleUtils.getCompressionLevel(
- conf,
- customizedCompressionCodec,
- GlutenConfig.get.columnarShuffleCodecBackend.orNull)
+ protected val compressionLevel: Int = {
+ compressionCodec
+ .map(codec => GlutenShuffleUtils.getCompressionLevel(conf, codec))
+ .getOrElse(GlutenShuffleUtils.DEFAULT_COMPRESSION_LEVEL)
+ }
- protected val compressionBufferSize: Int =
- GlutenShuffleUtils.getSortEvictBufferSize(conf, customizedCompressionCodec)
+ protected val compressionBufferSize: Int = {
+ compressionCodec
+ .map(codec => GlutenShuffleUtils.getCompressionBufferSize(conf, codec))
+ .getOrElse(0)
+ }
protected val bufferCompressThreshold: Int =
GlutenConfig.get.columnarShuffleCompressionThreshold
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
index 8278c0dfa7..b06ccd8b29 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
@@ -35,6 +35,9 @@ object GlutenShuffleUtils {
val HashPartitioningShortName = "hash"
val RangePartitioningShortName = "range"
+ // Follow arrow default compression level `kUseDefaultCompressionLevel`
+ val DEFAULT_COMPRESSION_LEVEL: Int = Int.MinValue
+
def getStartPartitionId(partition: NativePartitioning, partitionId: Int):
Int = {
partition.getShortName match {
case RoundRobinPartitioningShortName =>
@@ -80,18 +83,17 @@ object GlutenShuffleUtils {
}
}
- def getCompressionLevel(conf: SparkConf, codec: String,
compressionCodecBackend: String): Int = {
- if ("zstd" == codec && compressionCodecBackend == null) {
+ def getCompressionLevel(conf: SparkConf, codec: String): Int = {
+ if ("zstd" == codec) {
conf.getInt(
IO_COMPRESSION_ZSTD_LEVEL.key,
IO_COMPRESSION_ZSTD_LEVEL.defaultValue.getOrElse(1))
} else {
- // Follow arrow default compression level `kUseDefaultCompressionLevel`
- Int.MinValue
+ DEFAULT_COMPRESSION_LEVEL
}
}
- def getSortEvictBufferSize(conf: SparkConf, codec: String): Int = {
+ def getCompressionBufferSize(conf: SparkConf, codec: String): Int = {
def checkAndGetBufferSize(entry: ConfigEntry[Long]): Int = {
val bufferSize = conf.get(entry).toInt
if (bufferSize < 4) {
@@ -104,7 +106,7 @@ object GlutenShuffleUtils {
} else if ("zstd" == codec) {
checkAndGetBufferSize(IO_COMPRESSION_ZSTD_BUFFERSIZE)
} else {
- checkAndGetBufferSize(SHUFFLE_DISK_WRITE_BUFFER_SIZE)
+ throw new UnsupportedOperationException(s"Unsupported compression codec
$codec.")
}
}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 22d269cc25..5f7000ef10 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -194,6 +194,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def columnarShuffleReaderBufferSize: Long =
getConf(COLUMNAR_SHUFFLE_READER_BUFFER_SIZE)
+ def columnarSortShuffleDeserializerBufferSize: Long =
+ getConf(COLUMNAR_SORT_SHUFFLE_DESERIALIZER_BUFFER_SIZE)
+
def maxBatchSize: Int = getConf(COLUMNAR_MAX_BATCH_SIZE)
def shuffleWriterBufferSize: Int = getConf(SHUFFLE_WRITER_BUFFER_SIZE)
@@ -1072,6 +1075,14 @@ object GlutenConfig {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1MB")
+ val COLUMNAR_SORT_SHUFFLE_DESERIALIZER_BUFFER_SIZE =
+ buildConf("spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize")
+ .internal()
+ .doc("Buffer size in bytes for sort-based shuffle reader deserializing
raw input to " +
+ "columnar batch.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("1MB")
+
val COLUMNAR_MAX_BATCH_SIZE =
buildConf("spark.gluten.sql.columnar.maxBatchSize")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]