This is an automated email from the ASF dual-hosted git repository.
felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b1211a8bbd [GLUTEN-8025][VL] Respect config kSpillReadBufferSize and
add spill compression codec (#8045)
b1211a8bbd is described below
commit b1211a8bbd8d966f51dd7a277aafdcdd56063c3e
Author: Jin Chengcheng <[email protected]>
AuthorDate: Sat Dec 14 08:25:01 2024 +0800
[GLUTEN-8025][VL] Respect config kSpillReadBufferSize and add spill
compression codec (#8045)
Respect Spark config UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE. as Velox
kSpillReadBufferSize.
Add the config
spark.gluten.sql.columnar.backend.velox.spillCompressionCodec, if not set, use
Spark config spark.io.compression.codec instead.
Respect Spark spark.shuffle.spill.diskWriteBufferSize as Velox
kSpillWriteBufferSize.
---
cpp/core/config/GlutenConfig.h | 1 +
cpp/velox/compute/WholeStageResultIterator.cc | 12 ++++++++---
cpp/velox/config/VeloxConfig.h | 8 ++++++--
.../scala/org/apache/gluten/GlutenConfig.scala | 24 +++++++++++++---------
4 files changed, 30 insertions(+), 15 deletions(-)
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 5a61b27a80..3207d3bec5 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -66,6 +66,7 @@ const std::string kUGITokens = "spark.gluten.ugi.tokens";
const std::string kShuffleCompressionCodec =
"spark.gluten.sql.columnar.shuffle.codec";
const std::string kShuffleCompressionCodecBackend =
"spark.gluten.sql.columnar.shuffle.codecBackend";
+const std::string kShuffleSpillDiskWriteBufferSize =
"spark.shuffle.spill.diskWriteBufferSize";
const std::string kQatBackendName = "qat";
const std::string kIaaBackendName = "iaa";
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index ba67c18255..1bf783fb8c 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -516,17 +516,23 @@ std::unordered_map<std::string, std::string>
WholeStageResultIterator::getQueryC
configs[velox::core::QueryConfig::kMaxSpillBytes] =
std::to_string(veloxCfg_->get<uint64_t>(kMaxSpillBytes,
107374182400LL));
configs[velox::core::QueryConfig::kSpillWriteBufferSize] =
- std::to_string(veloxCfg_->get<uint64_t>(kSpillWriteBufferSize, 4L *
1024 * 1024));
+
std::to_string(veloxCfg_->get<uint64_t>(kShuffleSpillDiskWriteBufferSize, 1L *
1024 * 1024));
+ configs[velox::core::QueryConfig::kSpillReadBufferSize] =
+ std::to_string(veloxCfg_->get<int32_t>(kSpillReadBufferSize, 1L * 1024
* 1024));
configs[velox::core::QueryConfig::kSpillStartPartitionBit] =
std::to_string(veloxCfg_->get<uint8_t>(kSpillStartPartitionBit, 29));
configs[velox::core::QueryConfig::kSpillNumPartitionBits] =
std::to_string(veloxCfg_->get<uint8_t>(kSpillPartitionBits, 3));
configs[velox::core::QueryConfig::kSpillableReservationGrowthPct] =
std::to_string(veloxCfg_->get<uint8_t>(kSpillableReservationGrowthPct,
25));
- configs[velox::core::QueryConfig::kSpillCompressionKind] =
- veloxCfg_->get<std::string>(kSpillCompressionKind, "lz4");
configs[velox::core::QueryConfig::kSpillPrefixSortEnabled] =
veloxCfg_->get<std::string>(kSpillPrefixSortEnabled, "false");
+ if (veloxCfg_->get<bool>(kSparkShuffleSpillCompress, true)) {
+ configs[velox::core::QueryConfig::kSpillCompressionKind] =
+ veloxCfg_->get<std::string>(kSpillCompressionKind,
veloxCfg_->get<std::string>(kCompressionKind, "lz4"));
+ } else {
+ configs[velox::core::QueryConfig::kSpillCompressionKind] = "none";
+ }
configs[velox::core::QueryConfig::kSparkBloomFilterExpectedNumItems] =
std::to_string(veloxCfg_->get<int64_t>(kBloomFilterExpectedNumItems,
1000000));
configs[velox::core::QueryConfig::kSparkBloomFilterNumBits] =
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 4ae82f263d..f882e72065 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -38,13 +38,17 @@ const std::string kSpillStartPartitionBit =
"spark.gluten.sql.columnar.backend.v
const std::string kSpillPartitionBits =
"spark.gluten.sql.columnar.backend.velox.spillPartitionBits";
const std::string kMaxSpillRunRows =
"spark.gluten.sql.columnar.backend.velox.MaxSpillRunRows";
const std::string kMaxSpillBytes =
"spark.gluten.sql.columnar.backend.velox.MaxSpillBytes";
-const std::string kSpillWriteBufferSize =
"spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize";
+const std::string kSpillReadBufferSize =
"spark.unsafe.sorter.spill.reader.buffer.size";
const uint64_t kMaxSpillFileSizeDefault = 1L * 1024 * 1024 * 1024;
const std::string kSpillableReservationGrowthPct =
"spark.gluten.sql.columnar.backend.velox.spillableReservationGrowthPct";
-const std::string kSpillCompressionKind = "spark.io.compression.codec";
const std::string kSpillPrefixSortEnabled =
"spark.gluten.sql.columnar.backend.velox.spillPrefixsortEnabled";
+// Whether to compress data spilled. Compression will use
spark.io.compression.codec or kSpillCompressionKind.
+const std::string kSparkShuffleSpillCompress = "spark.shuffle.spill.compress";
+const std::string kCompressionKind = "spark.io.compression.codec";
+/// The compression codec to use for spilling. Use kCompressionKind if not set.
+const std::string kSpillCompressionKind =
"spark.gluten.sql.columnar.backend.velox.spillCompressionCodec";
const std::string kMaxPartialAggregationMemoryRatio =
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio";
const std::string kMaxExtendedPartialAggregationMemoryRatio =
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 15704f1450..296153346e 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -321,8 +321,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def veloxMaxSpillBytes: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_BYTES)
- def veloxMaxWriteBufferSize: Long =
conf.getConf(COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE)
-
def veloxBloomFilterExpectedNumItems: Long =
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS)
@@ -571,6 +569,12 @@ object GlutenConfig {
val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_REDACTION_REGEX = "spark.redaction.regex"
val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer"
+ val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE =
"spark.unsafe.sorter.spill.reader.buffer.size"
+ val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
+ val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE =
"spark.shuffle.spill.diskWriteBufferSize"
+ val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
+ val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
+ val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true
// For Soft Affinity Scheduling
// Enable Soft Affinity Scheduling, default value is false
@@ -734,7 +738,14 @@ object GlutenConfig {
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString),
(
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key,
- GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString)
+ GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString),
+ (
+ SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
+ SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT.toString),
+ (
+ SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
+ SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT.toString),
+ (SPARK_SHUFFLE_SPILL_COMPRESS,
SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1,
e._2)))
@@ -1605,13 +1616,6 @@ object GlutenConfig {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("100G")
- val COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE =
- buildConf("spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize")
- .internal()
- .doc("The maximum write buffer size")
- .bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("4M")
-
val MAX_PARTITION_PER_WRITERS_SESSION =
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]