This is an automated email from the ASF dual-hosted git repository.

felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b1211a8bbd [GLUTEN-8025][VL] Respect config kSpillReadBufferSize and 
add spill compression codec (#8045)
b1211a8bbd is described below

commit b1211a8bbd8d966f51dd7a277aafdcdd56063c3e
Author: Jin Chengcheng <[email protected]>
AuthorDate: Sat Dec 14 08:25:01 2024 +0800

    [GLUTEN-8025][VL] Respect config kSpillReadBufferSize and add spill 
compression codec (#8045)
    
    Respect Spark config UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE. as Velox 
kSpillReadBufferSize.
    Add the config 
spark.gluten.sql.columnar.backend.velox.spillCompressionCodec, if not set, use 
Spark config spark.io.compression.codec instead.
    Respect Spark spark.shuffle.spill.diskWriteBufferSize as Velox 
kSpillWriteBufferSize.
---
 cpp/core/config/GlutenConfig.h                     |  1 +
 cpp/velox/compute/WholeStageResultIterator.cc      | 12 ++++++++---
 cpp/velox/config/VeloxConfig.h                     |  8 ++++++--
 .../scala/org/apache/gluten/GlutenConfig.scala     | 24 +++++++++++++---------
 4 files changed, 30 insertions(+), 15 deletions(-)

diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 5a61b27a80..3207d3bec5 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -66,6 +66,7 @@ const std::string kUGITokens = "spark.gluten.ugi.tokens";
 
 const std::string kShuffleCompressionCodec = 
"spark.gluten.sql.columnar.shuffle.codec";
 const std::string kShuffleCompressionCodecBackend = 
"spark.gluten.sql.columnar.shuffle.codecBackend";
+const std::string kShuffleSpillDiskWriteBufferSize = 
"spark.shuffle.spill.diskWriteBufferSize";
 const std::string kQatBackendName = "qat";
 const std::string kIaaBackendName = "iaa";
 
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index ba67c18255..1bf783fb8c 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -516,17 +516,23 @@ std::unordered_map<std::string, std::string> 
WholeStageResultIterator::getQueryC
     configs[velox::core::QueryConfig::kMaxSpillBytes] =
         std::to_string(veloxCfg_->get<uint64_t>(kMaxSpillBytes, 
107374182400LL));
     configs[velox::core::QueryConfig::kSpillWriteBufferSize] =
-        std::to_string(veloxCfg_->get<uint64_t>(kSpillWriteBufferSize, 4L * 
1024 * 1024));
+        
std::to_string(veloxCfg_->get<uint64_t>(kShuffleSpillDiskWriteBufferSize, 1L * 
1024 * 1024));
+    configs[velox::core::QueryConfig::kSpillReadBufferSize] =
+        std::to_string(veloxCfg_->get<int32_t>(kSpillReadBufferSize, 1L * 1024 
* 1024));
     configs[velox::core::QueryConfig::kSpillStartPartitionBit] =
         std::to_string(veloxCfg_->get<uint8_t>(kSpillStartPartitionBit, 29));
     configs[velox::core::QueryConfig::kSpillNumPartitionBits] =
         std::to_string(veloxCfg_->get<uint8_t>(kSpillPartitionBits, 3));
     configs[velox::core::QueryConfig::kSpillableReservationGrowthPct] =
         std::to_string(veloxCfg_->get<uint8_t>(kSpillableReservationGrowthPct, 
25));
-    configs[velox::core::QueryConfig::kSpillCompressionKind] =
-        veloxCfg_->get<std::string>(kSpillCompressionKind, "lz4");
     configs[velox::core::QueryConfig::kSpillPrefixSortEnabled] =
         veloxCfg_->get<std::string>(kSpillPrefixSortEnabled, "false");
+    if (veloxCfg_->get<bool>(kSparkShuffleSpillCompress, true)) {
+      configs[velox::core::QueryConfig::kSpillCompressionKind] =
+          veloxCfg_->get<std::string>(kSpillCompressionKind, 
veloxCfg_->get<std::string>(kCompressionKind, "lz4"));
+    } else {
+      configs[velox::core::QueryConfig::kSpillCompressionKind] = "none";
+    }
     configs[velox::core::QueryConfig::kSparkBloomFilterExpectedNumItems] =
         std::to_string(veloxCfg_->get<int64_t>(kBloomFilterExpectedNumItems, 
1000000));
     configs[velox::core::QueryConfig::kSparkBloomFilterNumBits] =
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 4ae82f263d..f882e72065 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -38,13 +38,17 @@ const std::string kSpillStartPartitionBit = 
"spark.gluten.sql.columnar.backend.v
 const std::string kSpillPartitionBits = 
"spark.gluten.sql.columnar.backend.velox.spillPartitionBits";
 const std::string kMaxSpillRunRows = 
"spark.gluten.sql.columnar.backend.velox.MaxSpillRunRows";
 const std::string kMaxSpillBytes = 
"spark.gluten.sql.columnar.backend.velox.MaxSpillBytes";
-const std::string kSpillWriteBufferSize = 
"spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize";
+const std::string kSpillReadBufferSize = 
"spark.unsafe.sorter.spill.reader.buffer.size";
 const uint64_t kMaxSpillFileSizeDefault = 1L * 1024 * 1024 * 1024;
 
 const std::string kSpillableReservationGrowthPct =
     "spark.gluten.sql.columnar.backend.velox.spillableReservationGrowthPct";
-const std::string kSpillCompressionKind = "spark.io.compression.codec";
 const std::string kSpillPrefixSortEnabled = 
"spark.gluten.sql.columnar.backend.velox.spillPrefixsortEnabled";
+// Whether to compress data spilled. Compression will use 
spark.io.compression.codec or kSpillCompressionKind.
+const std::string kSparkShuffleSpillCompress = "spark.shuffle.spill.compress";
+const std::string kCompressionKind = "spark.io.compression.codec";
+/// The compression codec to use for spilling. Use kCompressionKind if not set.
+const std::string kSpillCompressionKind = 
"spark.gluten.sql.columnar.backend.velox.spillCompressionCodec";
 const std::string kMaxPartialAggregationMemoryRatio =
     "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio";
 const std::string kMaxExtendedPartialAggregationMemoryRatio =
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 15704f1450..296153346e 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -321,8 +321,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def veloxMaxSpillBytes: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_BYTES)
 
-  def veloxMaxWriteBufferSize: Long = 
conf.getConf(COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE)
-
   def veloxBloomFilterExpectedNumItems: Long =
     conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS)
 
@@ -571,6 +569,12 @@ object GlutenConfig {
   val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
   val SPARK_REDACTION_REGEX = "spark.redaction.regex"
   val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer"
+  val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE = 
"spark.unsafe.sorter.spill.reader.buffer.size"
+  val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
+  val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE = 
"spark.shuffle.spill.diskWriteBufferSize"
+  val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
+  val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
+  val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true
 
   // For Soft Affinity Scheduling
   // Enable Soft Affinity Scheduling, default value is false
@@ -734,7 +738,14 @@ object GlutenConfig {
         COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString),
       (
         GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key,
-        GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString)
+        GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString),
+      (
+        SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
+        SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT.toString),
+      (
+        SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
+        SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT.toString),
+      (SPARK_SHUFFLE_SPILL_COMPRESS, 
SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString)
     )
     keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, 
e._2)))
 
@@ -1605,13 +1616,6 @@ object GlutenConfig {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("100G")
 
-  val COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE =
-    buildConf("spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize")
-      .internal()
-      .doc("The maximum write buffer size")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("4M")
-
   val MAX_PARTITION_PER_WRITERS_SESSION =
     
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to