This is an automated email from the ASF dual-hosted git repository.

marin-ma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ad03c2d991 [VL] Restore hash shuffle reader payload merging (#12097)
ad03c2d991 is described below

commit ad03c2d9912ee3dbbe51f93817a8f87f71cfa458
Author: Zhen Li <[email protected]>
AuthorDate: Tue May 19 03:22:15 2026 +0800

    [VL] Restore hash shuffle reader payload merging (#12097)
---
 .../VeloxCelebornColumnarBatchSerializer.scala     |   6 +-
 .../org/apache/gluten/config/VeloxConfig.scala     |  17 ++
 .../vectorized/ColumnarBatchSerializer.scala       |   7 +-
 cpp/core/jni/JniWrapper.cc                         |   4 +-
 cpp/core/shuffle/Options.h                         |   4 +
 cpp/velox/compute/VeloxRuntime.cc                  |   3 +-
 cpp/velox/shuffle/VeloxShuffleReader.cc            | 173 +++++++++--
 cpp/velox/shuffle/VeloxShuffleReader.h             |  16 +-
 cpp/velox/tests/VeloxShuffleWriterTest.cc          | 322 +++++++++++++++++++++
 docs/velox-configuration.md                        | 143 ++++-----
 .../gluten/vectorized/ShuffleReaderJniWrapper.java |   3 +-
 11 files changed, 596 insertions(+), 102 deletions(-)

diff --git 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index f377ea99f6..2c36c773f4 100644
--- 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++ 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.shuffle
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType}
+import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig}
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.runtime.Runtimes
 import org.apache.gluten.utils.ArrowAbiUtil
@@ -95,6 +95,7 @@ private class CelebornColumnarBatchSerializerInstance(
     val batchSize = GlutenConfig.get.maxBatchSize
     val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
     val deserializerBufferSize = 
GlutenConfig.get.columnarSortShuffleDeserializerBufferSize
+    val enableHashShuffleReaderStreamMerge = 
VeloxConfig.get.enableHashShuffleReaderStreamMerge
     val handle = jniWrapper
       .make(
         cSchema.memoryAddress(),
@@ -103,7 +104,8 @@ private class CelebornColumnarBatchSerializerInstance(
         batchSize,
         readerBufferSize,
         deserializerBufferSize,
-        shuffleWriterType.name
+        shuffleWriterType.name,
+        enableHashShuffleReaderStreamMerge
       )
     // Close shuffle reader instance as lately as the end of task processing,
     // since the native reader could hold a reference to memory pool that
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index dbc833f046..9ea203d42b 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -37,6 +37,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
   def veloxResizeBatchesShuffleOutput: Boolean =
     getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT)
 
+  def enableHashShuffleReaderStreamMerge: Boolean =
+    getConf(COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED)
+
   case class ResizeRange(min: Int, max: Int) {
     assert(max >= min)
     assert(min > 0, "Min batch size should be larger than 0")
@@ -322,6 +325,20 @@ object VeloxConfig extends ConfigRegistry {
       .booleanConf
       .createWithDefault(false)
 
+  val COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled")
+      .doc(
+        "Enables a reader-side raw payload merge fast path for plain hash 
shuffle payloads " +
+          "within each shuffle input stream. This path merges payload buffers 
before Velox " +
+          "vectors are materialized, so it has lower per-batch overhead than 
generic " +
+          "VeloxResizeBatchesExec resizing, but it only covers plain payloads. 
Complex types " +
+          "and dictionary-encoded payloads are not merged by this path. " +
+          "VeloxResizeBatchesExec can still be enabled separately as a generic 
complement " +
+          "for types and encodings not covered by this fast path. If false, 
each hash " +
+          "shuffle payload is returned as its own columnar batch.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
     
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
       .doc(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index 3b5fce63f8..284d931ecc 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.vectorized
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType}
+import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig}
 import org.apache.gluten.iterator.ClosableIterator
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.runtime.Runtimes
@@ -104,6 +104,7 @@ private class ColumnarBatchSerializerInstanceImpl(
     val batchSize = GlutenConfig.get.maxBatchSize
     val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
     val deserializerBufferSize = 
GlutenConfig.get.columnarSortShuffleDeserializerBufferSize
+    val enableHashShuffleReaderStreamMerge = 
VeloxConfig.get.enableHashShuffleReaderStreamMerge
     val shuffleReaderHandle = jniWrapper.make(
       cSchema.memoryAddress(),
       compressionCodec,
@@ -111,7 +112,9 @@ private class ColumnarBatchSerializerInstanceImpl(
       batchSize,
       readerBufferSize,
       deserializerBufferSize,
-      shuffleWriterType.name)
+      shuffleWriterType.name,
+      enableHashShuffleReaderStreamMerge
+    )
     // Close shuffle reader instance as lately as the end of task processing,
     // since the native reader could hold a reference to memory pool that
     // was used to create all buffers read from shuffle reader. The pool
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 4a3215ae66..f109865840 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1211,7 +1211,8 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
     jint batchSize,
     jlong readerBufferSize,
     jlong deserializerBufferSize,
-    jstring shuffleWriterType) {
+    jstring shuffleWriterType,
+    jboolean enableHashShuffleReaderStreamMerge) {
   JNI_METHOD_START
   auto ctx = getRuntime(env, wrapper);
 
@@ -1223,6 +1224,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
   options.batchSize = batchSize;
   options.readerBufferSize = readerBufferSize;
   options.deserializerBufferSize = deserializerBufferSize;
+  options.enableHashShuffleReaderStreamMerge = 
enableHashShuffleReaderStreamMerge;
 
   options.shuffleWriterType = 
ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterType));
   std::shared_ptr<arrow::Schema> schema =
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 1d7f9ad9f9..ea3aff10cf 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -63,6 +63,10 @@ struct ShuffleReaderOptions {
 
   // Buffer size when deserializing rows into columnar batches. Only used for 
sort-based shuffle.
   int64_t deserializerBufferSize = kDefaultDeserializerBufferSize;
+
+  // Whether to enable the reader-side raw payload merge fast path for plain 
hash shuffle payloads within one input
+  // stream.
+  bool enableHashShuffleReaderStreamMerge = false;
 };
 
 struct ShuffleWriterOptions {
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index 1e2cc6f308..e3eac17a22 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -585,7 +585,8 @@ std::shared_ptr<ShuffleReader> 
VeloxRuntime::createShuffleReader(
       options.readerBufferSize,
       options.deserializerBufferSize,
       memoryManager(),
-      options.shuffleWriterType);
+      options.shuffleWriterType,
+      options.enableHashShuffleReaderStreamMerge);
 
   return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
 }
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index bb2010378e..a469d5c770 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -56,6 +56,11 @@ arrow::Result<BlockType> 
readBlockType(arrow::io::InputStream* inputStream) {
   return type;
 }
 
+uint32_t validateHashShuffleReaderBatchSize(int32_t batchSize) {
+  GLUTEN_CHECK(batchSize > 0, fmt::format("Hash shuffle reader batch size must 
be positive, but got {}", batchSize));
+  return static_cast<uint32_t>(batchSize);
+}
+
 struct BufferViewReleaser {
   BufferViewReleaser() : BufferViewReleaser(nullptr) {}
 
@@ -300,6 +305,23 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
   return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
 }
 
+std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
+    RowTypePtr type,
+    std::unique_ptr<InMemoryPayload> payload,
+    memory::MemoryPool* pool,
+    int64_t& deserializeTime) {
+  ScopedTimer timer(&deserializeTime);
+  std::vector<BufferPtr> veloxBuffers;
+  auto numBuffers = payload->numBuffers();
+  veloxBuffers.reserve(numBuffers);
+  for (size_t i = 0; i < numBuffers; ++i) {
+    GLUTEN_ASSIGN_OR_THROW(auto buffer, payload->readBufferAt(i));
+    veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
+  }
+  auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, {}, {}, 
pool);
+  return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
+}
+
 arrow::Result<BufferPtr>
 readDictionaryBuffer(arrow::io::InputStream* in, 
facebook::velox::memory::MemoryPool* pool, arrow::util::Codec* codec) {
   size_t bufferSize;
@@ -444,23 +466,45 @@ 
VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
     const std::shared_ptr<arrow::Schema>& schema,
     const std::shared_ptr<arrow::util::Codec>& codec,
     const facebook::velox::RowTypePtr& rowType,
+    int32_t batchSize,
     int64_t readerBufferSize,
     VeloxMemoryManager* memoryManager,
+    std::vector<bool> isValidityBuffer,
+    bool hasComplexType,
+    bool enableStreamMerge,
     int64_t& deserializeTime,
     int64_t& decompressTime)
     : streamReader_(streamReader),
       schema_(schema),
       codec_(codec),
       rowType_(rowType),
+      batchSize_(validateHashShuffleReaderBatchSize(batchSize)),
       readerBufferSize_(readerBufferSize),
       memoryManager_(memoryManager),
+      isValidityBuffer_(std::move(isValidityBuffer)),
+      hasComplexType_(hasComplexType),
+      enableStreamMerge_(enableStreamMerge),
       deserializeTime_(deserializeTime),
       decompressTime_(decompressTime) {}
 
+bool VeloxHashShuffleReaderDeserializer::shouldSkipMerge() const {
+  // Stream merge is a reader-side raw payload fast path: for plain payloads it
+  // concatenates buffers before Velox vectors are materialized, avoiding the 
generic
+  // RowVector append cost paid by VeloxResizeBatchesExec. Keep complex and 
dictionary
+  // payloads on the existing per-payload path; VeloxResizeBatchesExec can be 
enabled
+  // separately as the generic complement for those cases.
+  return !enableStreamMerge_ || hasComplexType_ || !dictionaryFields_.empty();
+}
+
 bool VeloxHashShuffleReaderDeserializer::resolveNextBlockType() {
+  if (blockTypeResolved_) {
+    return true;
+  }
+
   GLUTEN_ASSIGN_OR_THROW(auto blockType, readBlockType(in_.get()));
   switch (blockType) {
     case BlockType::kEndOfStream:
+      in_ = nullptr;
       return false;
     case BlockType::kDictionary: {
       VeloxDictionaryReader reader(rowType_, 
memoryManager_->getLeafMemoryPool().get(), codec_.get());
@@ -485,6 +529,7 @@ bool 
VeloxHashShuffleReaderDeserializer::resolveNextBlockType() {
     default:
       throw GlutenException(fmt::format("Unsupported block type: {}", 
static_cast<int32_t>(blockType)));
   }
+  blockTypeResolved_ = true;
   return true;
 }
 
@@ -499,6 +544,12 @@ void VeloxHashShuffleReaderDeserializer::loadNextStream() {
     return;
   }
 
+  if (!dictionaryFields_.empty() || !dictionaries_.empty()) {
+    dictionaryFields_.clear();
+    dictionaries_.clear();
+  }
+  blockTypeResolved_ = false;
+
   if (readerBufferSize_ > 0) {
     GLUTEN_ASSIGN_OR_THROW(
         in_,
@@ -510,36 +561,106 @@ void 
VeloxHashShuffleReaderDeserializer::loadNextStream() {
 }
 
 std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
-  if (in_ == nullptr) {
-    loadNextStream();
+  while (true) {
+    if (in_ == nullptr) {
+      if (merged_) {
+        return makeColumnarBatch(
+            rowType_, std::move(merged_), 
memoryManager_->getLeafMemoryPool().get(), deserializeTime_);
+      }
 
-    if (reachedEos_) {
-      return nullptr;
+      loadNextStream();
+
+      if (reachedEos_) {
+        return nullptr;
+      }
+    }
+    if (resolveNextBlockType()) {
+      break;
     }
   }
 
-  while (!resolveNextBlockType()) {
-    loadNextStream();
-
-    if (reachedEos_) {
-      return nullptr;
+  if (shouldSkipMerge()) {
+    if (merged_) {
+      return makeColumnarBatch(
+          rowType_, std::move(merged_), 
memoryManager_->getLeafMemoryPool().get(), deserializeTime_);
     }
+
+    uint32_t numRows = 0;
+    GLUTEN_ASSIGN_OR_THROW(
+        auto arrowBuffers,
+        BlockPayload::deserialize(
+            in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(), 
numRows, deserializeTime_, decompressTime_));
+
+    blockTypeResolved_ = false;
+
+    return makeColumnarBatch(
+        rowType_,
+        numRows,
+        std::move(arrowBuffers),
+        dictionaryFields_,
+        dictionaries_,
+        memoryManager_->getLeafMemoryPool().get(),
+        deserializeTime_);
   }
 
+  std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers{};
   uint32_t numRows = 0;
-  GLUTEN_ASSIGN_OR_THROW(
-      auto arrowBuffers,
-      BlockPayload::deserialize(
-          in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(), 
numRows, deserializeTime_, decompressTime_));
+  while (!merged_ || merged_->numRows() < batchSize_) {
+    if (in_ == nullptr) {
+      if (merged_) {
+        break;
+      }
+
+      loadNextStream();
+      if (reachedEos_) {
+        break;
+      }
+    }
+    if (!resolveNextBlockType()) {
+      continue;
+    }
+
+    if (shouldSkipMerge()) {
+      break;
+    }
+
+    GLUTEN_ASSIGN_OR_THROW(
+        arrowBuffers,
+        BlockPayload::deserialize(
+            in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(), 
numRows, deserializeTime_, decompressTime_));
 
-  return makeColumnarBatch(
-      rowType_,
-      numRows,
-      std::move(arrowBuffers),
-      dictionaryFields_,
-      dictionaries_,
-      memoryManager_->getLeafMemoryPool().get(),
-      deserializeTime_);
+    blockTypeResolved_ = false;
+
+    if (!merged_) {
+      merged_ = std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_, 
schema_, std::move(arrowBuffers));
+      arrowBuffers.clear();
+      continue;
+    }
+
+    auto mergedRows = merged_->numRows() + numRows;
+    if (mergedRows > batchSize_) {
+      break;
+    }
+
+    auto append = std::make_unique<InMemoryPayload>(numRows, 
&isValidityBuffer_, schema_, std::move(arrowBuffers));
+    GLUTEN_ASSIGN_OR_THROW(
+        merged_,
+        InMemoryPayload::merge(std::move(merged_), std::move(append), 
memoryManager_->defaultArrowMemoryPool()));
+    arrowBuffers.clear();
+  }
+
+  if (!merged_) {
+    return nullptr;
+  }
+
+  auto columnarBatch =
+      makeColumnarBatch(rowType_, std::move(merged_), 
memoryManager_->getLeafMemoryPool().get(), deserializeTime_);
+
+  if (!arrowBuffers.empty()) {
+    merged_ = std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_, 
schema_, std::move(arrowBuffers));
+  }
+
+  return columnarBatch;
 }
 
 VeloxSortShuffleReaderDeserializer::VeloxSortShuffleReaderDeserializer(
@@ -797,7 +918,8 @@ 
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
     int64_t readerBufferSize,
     int64_t deserializerBufferSize,
     VeloxMemoryManager* memoryManager,
-    ShuffleWriterType shuffleWriterType)
+    ShuffleWriterType shuffleWriterType,
+    bool enableHashShuffleReaderStreamMerge)
     : schema_(schema),
       codec_(codec),
       veloxCompressionType_(veloxCompressionType),
@@ -806,7 +928,8 @@ 
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
       readerBufferSize_(readerBufferSize),
       deserializerBufferSize_(deserializerBufferSize),
       memoryManager_(memoryManager),
-      shuffleWriterType_(shuffleWriterType) {
+      shuffleWriterType_(shuffleWriterType),
+      enableHashShuffleReaderStreamMerge_(enableHashShuffleReaderStreamMerge) {
   initFromSchema();
 }
 
@@ -832,8 +955,12 @@ std::unique_ptr<ColumnarBatchIterator> 
VeloxShuffleReaderDeserializerFactory::cr
           schema_,
           codec_,
           rowType_,
+          batchSize_,
           readerBufferSize_,
           memoryManager_,
+          isValidityBuffer_,
+          hasComplexType_,
+          enableHashShuffleReaderStreamMerge_,
           deserializeTime_,
           decompressTime_);
     case ShuffleWriterType::kSortShuffle:
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h 
b/cpp/velox/shuffle/VeloxShuffleReader.h
index f30595dde4..f92f0a2cc3 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -34,14 +34,20 @@ class VeloxHashShuffleReaderDeserializer final : public 
ColumnarBatchIterator {
       const std::shared_ptr<arrow::Schema>& schema,
       const std::shared_ptr<arrow::util::Codec>& codec,
       const facebook::velox::RowTypePtr& rowType,
+      int32_t batchSize,
       int64_t readerBufferSize,
       VeloxMemoryManager* memoryManager,
+      std::vector<bool> isValidityBuffer,
+      bool hasComplexType,
+      bool enableStreamMerge,
       int64_t& deserializeTime,
       int64_t& decompressTime);
 
   std::shared_ptr<ColumnarBatch> next() override;
 
  private:
+  bool shouldSkipMerge() const;
+
   bool resolveNextBlockType();
 
   void loadNextStream();
@@ -50,15 +56,21 @@ class VeloxHashShuffleReaderDeserializer final : public 
ColumnarBatchIterator {
   std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::util::Codec> codec_;
   facebook::velox::RowTypePtr rowType_;
+  uint32_t batchSize_;
   int64_t readerBufferSize_;
   VeloxMemoryManager* memoryManager_;
+  std::vector<bool> isValidityBuffer_;
+  bool hasComplexType_;
+  bool enableStreamMerge_;
 
   int64_t& deserializeTime_;
   int64_t& decompressTime_;
 
   std::shared_ptr<arrow::io::InputStream> in_{nullptr};
 
+  std::unique_ptr<InMemoryPayload> merged_{nullptr};
   bool reachedEos_{false};
+  bool blockTypeResolved_{false};
 
   std::vector<int32_t> dictionaryFields_{};
   std::vector<facebook::velox::VectorPtr> dictionaries_{};
@@ -161,7 +173,8 @@ class VeloxShuffleReaderDeserializerFactory {
       int64_t readerBufferSize,
       int64_t deserializerBufferSize,
       VeloxMemoryManager* memoryManager,
-      ShuffleWriterType shuffleWriterType);
+      ShuffleWriterType shuffleWriterType,
+      bool enableHashShuffleReaderStreamMerge = false);
 
   std::unique_ptr<ColumnarBatchIterator> createDeserializer(const 
std::shared_ptr<StreamReader>& streamReader);
 
@@ -185,6 +198,7 @@ class VeloxShuffleReaderDeserializerFactory {
   bool hasComplexType_{false};
 
   ShuffleWriterType shuffleWriterType_;
+  bool enableHashShuffleReaderStreamMerge_;
 
   int64_t deserializeTime_{0};
   int64_t decompressTime_{0};
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index bfb783f4be..18046629d4 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -18,6 +18,10 @@
 #include <arrow/c/bridge.h>
 #include <arrow/io/api.h>
 
+#include <cstring>
+#include <optional>
+
+#include "shuffle/Payload.h"
 #include "shuffle/VeloxHashShuffleWriter.h"
 #include "shuffle/VeloxRssSortShuffleWriter.h"
 #include "shuffle/VeloxSortShuffleWriter.h"
@@ -191,6 +195,23 @@ std::shared_ptr<PartitionWriter> createPartitionWriter(
   }
 }
 
+class MultiStreamReader : public StreamReader {
+ public:
+  explicit 
MultiStreamReader(std::vector<std::shared_ptr<arrow::io::InputStream>> streams)
+      : streams_(std::move(streams)) {}
+
+  std::shared_ptr<arrow::io::InputStream> readNextStream(arrow::MemoryPool*) 
override {
+    if (index_ >= streams_.size()) {
+      return nullptr;
+    }
+    return std::move(streams_[index_++]);
+  }
+
+ private:
+  std::vector<std::shared_ptr<arrow::io::InputStream>> streams_;
+  size_t index_{0};
+};
+
 } // namespace
 
 class VeloxShuffleWriterTestEnvironment : public ::testing::Environment {
@@ -441,6 +462,307 @@ class RoundRobinPartitioningShuffleWriterTest : public 
VeloxShuffleWriterTest {
   }
 };
 
+class VeloxShuffleReaderStreamMergeTest : public ::testing::Test, public 
VeloxShuffleWriterTestBase {
+ protected:
+  void SetUp() override {
+    VeloxShuffleWriterTestBase::setUpTestData();
+  }
+
+  std::shared_ptr<arrow::io::InputStream> writeSinglePartitionStream(
+      const RowVectorPtr& vector,
+      bool enableDictionary = false) {
+    return writeSinglePartitionStream(std::vector<RowVectorPtr>{vector}, 
enableDictionary);
+  }
+
+  std::shared_ptr<arrow::io::InputStream> writeSinglePartitionStream(
+      const std::vector<RowVectorPtr>& vectors,
+      bool enableDictionary = false) {
+    GLUTEN_ASSIGN_OR_THROW(auto dataFile, 
createTempShuffleFile(localDirs_[0]));
+
+    auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
+    shuffleWriterOptions->partitioning = Partitioning::kSingle;
+    shuffleWriterOptions->splitBufferSize = 1024;
+
+    auto partitionWriter = createPartitionWriter(
+        PartitionWriterType::kLocal, 1, dataFile, localDirs_, 
arrow::Compression::UNCOMPRESSED, 0, 0, enableDictionary);
+    GLUTEN_ASSIGN_OR_THROW(
+        auto shuffleWriter,
+        VeloxShuffleWriter::create(
+            ShuffleWriterType::kHashShuffle, 1, partitionWriter, 
shuffleWriterOptions, getDefaultMemoryManager()));
+
+    for (const auto& vector : vectors) {
+      GLUTEN_THROW_NOT_OK(
+          shuffleWriter->write(std::make_shared<VeloxColumnarBatch>(vector), 
ShuffleWriter::kMinMemLimit));
+    }
+    GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
+
+    const auto& lengths = shuffleWriter->partitionLengths();
+    VELOX_CHECK_EQ(lengths.size(), 1);
+
+    std::shared_ptr<arrow::io::ReadableFile> file;
+    GLUTEN_ASSIGN_OR_THROW(file, arrow::io::ReadableFile::Open(dataFile));
+    readableFiles_.push_back(file);
+
+    GLUTEN_ASSIGN_OR_THROW(auto in, 
arrow::io::RandomAccessFile::GetStream(file, 0, lengths[0]));
+    return in;
+  }
+
+  std::shared_ptr<arrow::io::InputStream> 
makeDictionaryPayloadOnlyStream(uint32_t numRows) {
+    GLUTEN_ASSIGN_OR_THROW(
+        auto indices, arrow::AllocateResizableBuffer(sizeof(int32_t) * 
numRows, arrow::default_memory_pool()));
+    std::vector<int32_t> rawIndices(numRows, 0);
+    std::memcpy(indices->mutable_data(), rawIndices.data(), sizeof(int32_t) * 
numRows);
+    std::shared_ptr<arrow::Buffer> indicesBuffer = std::move(indices);
+
+    std::vector<std::shared_ptr<arrow::Buffer>> 
buffers{std::shared_ptr<arrow::Buffer>{}, indicesBuffer};
+    static const std::vector<bool> kStringDictionaryPayloadValidity = {true, 
false};
+    GLUTEN_ASSIGN_OR_THROW(
+        auto payload,
+        BlockPayload::fromBuffers(
+            Payload::kUncompressed,
+            numRows,
+            std::move(buffers),
+            &kStringDictionaryPayloadValidity,
+            arrow::default_memory_pool(),
+            nullptr));
+
+    GLUTEN_ASSIGN_OR_THROW(auto os, 
arrow::io::BufferOutputStream::Create(1024, arrow::default_memory_pool()));
+    static constexpr uint8_t kDictionaryPayload = 
static_cast<uint8_t>(BlockType::kDictionaryPayload);
+    GLUTEN_THROW_NOT_OK(os->Write(&kDictionaryPayload, 
sizeof(kDictionaryPayload)));
+    GLUTEN_THROW_NOT_OK(payload->serialize(os.get()));
+
+    GLUTEN_ASSIGN_OR_THROW(auto buffer, os->Finish());
+    return std::make_shared<arrow::io::BufferReader>(buffer);
+  }
+
+  std::vector<RowVectorPtr> readStreams(
+      const RowTypePtr& rowType,
+      int32_t batchSize,
+      std::vector<std::shared_ptr<arrow::io::InputStream>> streams,
+      std::optional<bool> enableStreamMerge = std::nullopt) {
+    const auto schema = toArrowSchema(rowType, 
getDefaultMemoryManager()->getLeafMemoryPool().get());
+    std::shared_ptr<arrow::util::Codec> codec =
+        createCompressionCodec(arrow::Compression::UNCOMPRESSED, 
CodecBackend::NONE);
+    std::unique_ptr<VeloxShuffleReaderDeserializerFactory> deserializerFactory;
+    if (enableStreamMerge.has_value()) {
+      deserializerFactory = 
std::make_unique<VeloxShuffleReaderDeserializerFactory>(
+          schema,
+          codec,
+          arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED),
+          rowType,
+          batchSize,
+          kDefaultReadBufferSize,
+          kDefaultDeserializerBufferSize,
+          getDefaultMemoryManager(),
+          ShuffleWriterType::kHashShuffle,
+          enableStreamMerge.value());
+    } else {
+      deserializerFactory = 
std::make_unique<VeloxShuffleReaderDeserializerFactory>(
+          schema,
+          codec,
+          arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED),
+          rowType,
+          batchSize,
+          kDefaultReadBufferSize,
+          kDefaultDeserializerBufferSize,
+          getDefaultMemoryManager(),
+          ShuffleWriterType::kHashShuffle);
+    }
+
+    auto reader = 
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
+    const auto iter = 
reader->read(std::make_shared<MultiStreamReader>(std::move(streams)));
+
+    std::vector<RowVectorPtr> output;
+    while (iter->hasNext()) {
+      
output.push_back(std::dynamic_pointer_cast<VeloxColumnarBatch>(iter->next())->getRowVector());
+    }
+    return output;
+  }
+
+  std::vector<std::shared_ptr<arrow::io::ReadableFile>> readableFiles_;
+};
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinStream) {
+  constexpr int32_t kBatchSize = 6;
+  std::vector<RowVectorPtr> inputs = {
+      makeRowVector({
+          makeFlatVector<int32_t>({1, 2}),
+          makeFlatVector<bool>({true, false}),
+          makeFlatVector<StringView>({"a", "bb"}),
+          makeNullableFlatVector<int64_t>({10, std::nullopt}),
+      }),
+      makeRowVector({
+          makeFlatVector<int32_t>({3, 4}),
+          makeFlatVector<bool>({false, true}),
+          makeFlatVector<StringView>({"ccc", "dddd"}),
+          makeNullableFlatVector<int64_t>({std::nullopt, 40}),
+      }),
+      makeRowVector({
+          makeFlatVector<int32_t>({5, 6}),
+          makeFlatVector<bool>({true, true}),
+          makeFlatVector<StringView>({"eeeee", "ffffff"}),
+          makeNullableFlatVector<int64_t>({50, 60}),
+      }),
+      makeRowVector({
+          makeFlatVector<int32_t>({7, 8}),
+          makeFlatVector<bool>({false, false}),
+          makeFlatVector<StringView>({"ggggggg", "hhhhhhhh"}),
+          makeNullableFlatVector<int64_t>({std::nullopt, std::nullopt}),
+      })};
+
+  std::vector<std::shared_ptr<arrow::io::InputStream>> streams = 
{writeSinglePartitionStream(inputs)};
+
+  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams), true);
+
+  ASSERT_EQ(output.size(), 2);
+  ASSERT_EQ(output[0]->size(), kBatchSize);
+  ASSERT_EQ(output[1]->size(), inputs[3]->size());
+  facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[0], 
inputs[1], inputs[2]}), output[0]);
+  facebook::velox::test::assertEqualVectors(inputs[3], output[1]);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeByDefault) {
+  constexpr int32_t kBatchSize = 100;
+  std::vector<RowVectorPtr> inputs = {
+      makeRowVector({makeFlatVector<int32_t>({1, 2})}),
+      makeRowVector({makeFlatVector<int32_t>({3, 4})}),
+      makeRowVector({makeFlatVector<int32_t>({5, 6})})};
+
+  const auto rowType = facebook::velox::asRowType(inputs[0]->type());
+  auto output = readStreams(rowType, kBatchSize, 
{writeSinglePartitionStream(inputs)});
+
+  ASSERT_EQ(output.size(), inputs.size());
+  for (size_t i = 0; i < inputs.size(); ++i) {
+    facebook::velox::test::assertEqualVectors(inputs[i], output[i]);
+  }
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderMergesWithinEachStreamOnly) {
+  constexpr int32_t kBatchSize = 100;
+  std::vector<RowVectorPtr> inputs = {
+      makeRowVector({makeFlatVector<int32_t>({1, 2})}),
+      makeRowVector({makeFlatVector<int32_t>({3, 4})}),
+      makeRowVector({makeFlatVector<int32_t>({5, 6})}),
+      makeRowVector({makeFlatVector<int32_t>({7, 8})})};
+
+  std::vector<std::shared_ptr<arrow::io::InputStream>> streams = {
+      writeSinglePartitionStream({inputs[0], inputs[1]}), 
writeSinglePartitionStream({inputs[2], inputs[3]})};
+
+  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams), true);
+
+  ASSERT_EQ(output.size(), 2);
+  facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[0], 
inputs[1]}), output[0]);
+  facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[2], 
inputs[3]}), output[1]);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeAcrossStreams) 
{
+  constexpr int32_t kBatchSize = 6;
+  std::vector<RowVectorPtr> inputs = {
+      makeRowVector({makeFlatVector<int32_t>({1, 2})}),
+      makeRowVector({makeFlatVector<int32_t>({3, 4})}),
+      makeRowVector({makeFlatVector<int32_t>({5, 6})}),
+      makeRowVector({makeFlatVector<int32_t>({7, 8})})};
+
+  std::vector<std::shared_ptr<arrow::io::InputStream>> streams;
+  streams.reserve(inputs.size());
+  for (const auto& input : inputs) {
+    streams.push_back(writeSinglePartitionStream(input));
+  }
+
+  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams), true);
+
+  ASSERT_EQ(output.size(), inputs.size());
+  for (size_t i = 0; i < inputs.size(); ++i) {
+    facebook::velox::test::assertEqualVectors(inputs[i], output[i]);
+  }
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderRejectsNonPositiveBatchSize) {
+  auto input = makeRowVector({makeFlatVector<int32_t>({1})});
+  const auto rowType = facebook::velox::asRowType(input->type());
+
+  EXPECT_THROW((void)readStreams(rowType, 0, {}), GlutenException);
+  EXPECT_THROW((void)readStreams(rowType, -1, {}), GlutenException);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderCarriesOverPayloadThatWouldExceedBatchSize) {
+  constexpr int32_t kBatchSize = 6;
+  std::vector<RowVectorPtr> inputs = {
+      makeRowVector({makeFlatVector<int32_t>({1, 2, 3, 4})}),
+      makeRowVector({makeFlatVector<int32_t>({5, 6, 7, 8})}),
+      makeRowVector({makeFlatVector<int32_t>({9})})};
+
+  std::vector<std::shared_ptr<arrow::io::InputStream>> streams = 
{writeSinglePartitionStream(inputs)};
+
+  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams), true);
+
+  ASSERT_EQ(output.size(), 2);
+  facebook::velox::test::assertEqualVectors(inputs[0], output[0]);
+  facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[1], 
inputs[2]}), output[1]);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderFlushesMergedRowsBeforeDictionaryStream) {
+  constexpr int32_t kBatchSize = 100;
+  auto plainInput = makeRowVector({makeFlatVector<StringView>({"plain-1", 
"plain-2"})});
+  auto dictionaryInput = makeRowVector({makeFlatVector<StringView>({"same", 
"same", "same", "same"})});
+  std::vector<std::shared_ptr<arrow::io::InputStream>> streams = {
+      writeSinglePartitionStream(plainInput), 
writeSinglePartitionStream(dictionaryInput, true)};
+
+  auto output = readStreams(facebook::velox::asRowType(plainInput->type()), 
kBatchSize, std::move(streams), true);
+
+  ASSERT_EQ(output.size(), 2);
+  facebook::velox::test::assertEqualVectors(plainInput, output[0]);
+  facebook::velox::test::assertEqualVectors(dictionaryInput, output[1]);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderDoesNotMergeComplexTypeStreams) {
+  constexpr int32_t kBatchSize = 100;
+  std::vector<RowVectorPtr> inputs = {
+      makeRowVector({makeArrayVector<int64_t>({{1, 2}, {3}})}),
+      makeRowVector({makeArrayVector<int64_t>({{4}, {5, 6}})})};
+
+  std::vector<std::shared_ptr<arrow::io::InputStream>> streams;
+  streams.reserve(inputs.size());
+  for (const auto& input : inputs) {
+    streams.push_back(writeSinglePartitionStream(input));
+  }
+
+  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams), true);
+
+  ASSERT_EQ(output.size(), inputs.size());
+  facebook::velox::test::assertEqualVectors(inputs[0], output[0]);
+  facebook::velox::test::assertEqualVectors(inputs[1], output[1]);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderDoesNotReuseDictionaryAcrossStreams) {
+  auto dictionaryInput = makeRowVector({makeFlatVector<StringView>({"same", 
"same", "same", "same"})});
+  std::vector<std::shared_ptr<arrow::io::InputStream>> streams = {
+      writeSinglePartitionStream(dictionaryInput, true), 
makeDictionaryPayloadOnlyStream(2)};
+
+  const auto rowType = facebook::velox::asRowType(dictionaryInput->type());
+  const auto schema = toArrowSchema(rowType, 
getDefaultMemoryManager()->getLeafMemoryPool().get());
+  std::shared_ptr<arrow::util::Codec> codec =
+      createCompressionCodec(arrow::Compression::UNCOMPRESSED, 
CodecBackend::NONE);
+  auto deserializerFactory = 
std::make_unique<VeloxShuffleReaderDeserializerFactory>(
+      schema,
+      codec,
+      arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED),
+      rowType,
+      kDefaultBatchSize,
+      kDefaultReadBufferSize,
+      kDefaultDeserializerBufferSize,
+      getDefaultMemoryManager(),
+      ShuffleWriterType::kHashShuffle);
+
+  auto reader = 
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
+  const auto iter = 
reader->read(std::make_shared<MultiStreamReader>(std::move(streams)));
+
+  ASSERT_TRUE(iter->hasNext());
+  facebook::velox::test::assertEqualVectors(
+      dictionaryInput, 
std::dynamic_pointer_cast<VeloxColumnarBatch>(iter->next())->getRowVector());
+  EXPECT_THROW((void)iter->hasNext(), GlutenException);
+}
+
 TEST_P(SinglePartitioningShuffleWriterTest, single) {
   if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
     return;
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index a608dfbc45..a0c0691e2f 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -9,77 +9,78 @@ nav_order: 16
 
 ## Gluten Velox backend configurations
 
-|                                       Key                                    
    |      Default      |                                                       
                                                                                
                                                                               
Description                                                                     
                                                                                
               [...]
-|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| spark.gluten.sql.columnar.backend.velox.IOThreads                            
    | &lt;undefined&gt; | The Size of the IO thread pool in the Connector. This 
thread pool is used for split preloading and DirectBufferedInput. By default, 
the value is the same as the maximum task slots per Spark executor.             
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver                
    | 2                 | The split preload per task                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct      
    | 90                | If partial aggregation aggregationPct greater than 
this value, partial aggregation may be early abandoned. Note: this option only 
works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                                [...]
-| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows     
    | 100000            | If partial aggregation input rows number greater than 
this value,  partial aggregation may be early abandoned. Note: this option only 
works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                            [...]
-| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping           
    | 30000ms           | Timeout in milliseconds when waiting for 
runtime-scoped async work to finish during teardown.                            
                                                                                
                                                                                
                                                                                
                           [...]
-| spark.gluten.sql.columnar.backend.velox.cacheEnabled                         
    | false             | Enable Velox cache, default off. It's recommended to 
enablesoft-affinity as well when enable velox cache.                            
                                                                                
                                                                                
                                                                                
               [...]
-| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct                  
    | 0                 | Set prefetch cache min pct for velox file scan        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.checkUsageLeak                       
    | true              | Enable check memory usage leak.                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.batchSize                       
    | 2147483647        | Cudf input batch size after shuffle reader            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan                 
    | false             | Enable cudf table scan                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation                
    | true              | Heuristics you can apply to validate a cuDF/GPU plan 
and only offload when the entire stage can be fully and profitably executed on 
GPU                                                                             
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent                   
    | 50                | The initial percent of GPU memory to allocate for 
memory resource for one thread.                                                 
                                                                                
                                                                                
                                                                                
                  [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource                  
    | async             | GPU RMM memory resource.                              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes         
    | 1028MB            | Maximum bytes to prefetch in CPU memory during GPU 
shuffle read while waiting for GPU available.                                   
                                                                                
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.directorySizeGuess                   
    | 32KB              | Deprecated, rename to 
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                     
                                                                                
                                                                                
                                                                                
                                              [...]
-| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation         
    | true              | Enable validation fallback for TimestampNTZ type. 
When true (default), any plan containing TimestampNTZ will fall back to Spark 
execution. Set to false during development/testing of TimestampNTZ support to 
allow native execution.                                                         
                                                                                
                      [...]
-| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled               
    | false             | Disables caching if false. File handle cache should 
be disabled if files are mutable, i.e. file content may change while file path 
stays the same.                                                                 
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold                 
    | 1MB               | Set the file preload threshold for velox file scan, 
refer to Velox's file-preload-threshold                                         
                                                                                
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.columnar.backend.velox.floatingPointMode                    
    | loose             | Config used to control the tolerance of floating 
point operations alignment with Spark. When the mode is set to strict, flushing 
is disabled for sum(float/double)and avg(float/double). When set to loose, 
flushing will be enabled.                                                       
                                                                                
                        [...]
-| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation          
    | true              | Enable flushable aggregation. If true, Gluten will 
try converting regular aggregation into Velox's flushable aggregation when 
applicable. A flushable aggregation could emit intermediate result at anytime 
when memory is full / data reduction ratio is low.                              
                                                                                
                        [...]
-| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                  
    | 32KB              | Set the footer estimated size for velox file scan, 
refer to Velox's footer-estimated-size                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| 
spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize   
 | 0b                | The maximum byte size of Bloom filter that can be 
generated from hash probe. When set to 0, no Bloom filter will be generated. To 
achieve optimal performance, this should not be too larger than the CPU cache 
size on the host.                                                               
                                                                                
                    [...]
-| 
spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled 
 | true              | Whether hash probe can generate any dynamic filter 
(including Bloom filter) and push down to upstream operators.                   
                                                                                
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.loadQuantum                          
    | 256MB             | Set the load quantum for velox file scan, recommend 
to use the default value (256MB) for performance consideration. If Velox cache 
is enabled, it can be 8MB at most.                                              
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes                    
    | 64MB              | Set the max coalesced bytes for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance                 
    | 512KB             | Set the max coalesced distance bytes for velox file 
scan                                                                            
                                                                                
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes                   
    | 100               | Controls maximum number of compiled regular 
expression patterns per function instance per thread of execution.              
                                                                                
                                                                                
                                                                                
                        [...]
-| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory  
    | &lt;undefined&gt; | Set the max extended memory of partial aggregation in 
bytes. When this option is set to a value greater than 0, it will override 
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio.
 Note: this option only works when flushable partial aggregation is enabled. 
Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
        [...]
-| 
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio
 | 0.15              | Set the max extended memory of partial aggregation as 
maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option 
only works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                           [...]
-| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory          
    | &lt;undefined&gt; | Set the max memory of partial aggregation in bytes. 
When this option is set to a value greater than 0, it will override 
spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note: 
this option only works when flushable partial aggregation is enabled. Ignored 
when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. 
                              [...]
-| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio     
    | 0.1               | Set the max memory of partial aggregation as 
maxPartialAggregationMemoryRatio of offheap size. Note: this option only works 
when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                                            [...]
-| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession       
    | 10000             | Maximum number of partitions per a single table 
writer instance.                                                                
                                                                                
                                                                                
                                                                                
                    [...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillBytes                        
    | 100G              | The maximum file size of a query                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize                     
    | 1GB               | The maximum size of a single spill file created       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillLevel                        
    | 4                 | The max allowed spilling level with zero being the 
initial spilling level                                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows                      
    | 3M                | The maximum row size of a single spill run            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize                    
    | 0b                | The target file size for each output file when 
writing data. 0 means no limit on target file size, and the actual file size 
will be determined by other factors such as max partition number and shuffle 
batch size.                                                                     
                                                                                
                           [...]
-| spark.gluten.sql.columnar.backend.velox.memCacheSize                         
    | 1GB               | The memory cache size                                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.memInitCapacity                      
    | 8MB               | The initial memory capacity to reserve for a newly 
created Velox query memory pool.                                                
                                                                                
                                                                                
                                                                                
                 [...]
-| 
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks   
 | true              | Whether to allow memory capacity transfer between memory 
pools from different tasks.                                                     
                                                                                
                                                                                
                                                                                
           [...]
-| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages                   
    | false             | Use explicit huge pages for Velox memory allocation.  
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled                     
    | true              | Enable velox orc scan. If disabled, vanilla spark orc 
scan will be used.                                                              
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames                    
    | true              | Maps table field names to file field names using 
names, not indices for ORC files.                                               
                                                                                
                                                                                
                                                                                
                   [...]
-| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes                
    | 1MB               | The page size in bytes is for compression.            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames                
    | true              | Maps table field names to file field names using 
names, not indices for Parquet files.                                           
                                                                                
                                                                                
                                                                                
                   [...]
-| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups                    
    | 1                 | Set the prefetch row groups for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled                    
    | false             | Enable query tracing flag.                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs                     
    | 3600000ms         | The max time in ms to wait for memory reclaim.        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput           
    | true              | If true, combine small columnar batches together 
before sending to shuffle. The default minimum output batch size is equal to 
0.25 * spark.gluten.sql.columnar.maxBatchSize                                   
                                                                                
                                                                                
                      [...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize   
    | &lt;undefined&gt; | The minimum batch size for shuffle. If size of an 
input batch is smaller than the value, it will be combined with other batches 
before sending to shuffle. Only functions when 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to 
true. Default value: 0.25 * <max batch size>                                    
                                                        [...]
-| 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize
 | &lt;undefined&gt; | The minimum batch size for shuffle input and output. If 
size of an input batch is smaller than the value, it will be combined with 
other batches before sending to shuffle. The same applies for batches output by 
shuffle read. Only functions when 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is s [...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput          
    | false             | If true, combine small columnar batches together 
right after shuffle read. The default minimum output batch size is equal to 
0.25 * spark.gluten.sql.columnar.maxBatchSize                                   
                                                                                
                                                                                
                       [...]
-| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished          
    | false             | Show velox full task metrics when finished.           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.spillFileSystem                      
    | local             | The filesystem used to store spill data. local: The 
local file system. heap-over-local: Write file to JVM heap if having extra heap 
space. Otherwise write to local file system.                                    
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.columnar.backend.velox.spillStrategy                        
    | auto              | none: Disable spill on Velox backend; auto: Let Spark 
memory manager manage Velox's spilling                                          
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads                    
    | 1                 | The IO threads for cache promoting                    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdCachePath                         
    | /tmp              | The folder to store the cache files, better on SSD    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheShards                       
    | 1                 | The cache shards                                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheSize                         
    | 1GB               | The SSD cache size, will do memory caching only if 
this value = 0                                                                  
                                                                                
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes           
    | 0                 | Checkpoint after every 'checkpointIntervalBytes' for 
SSD cache. 0 means no checkpointing.                                            
                                                                                
                                                                                
                                                                                
               [...]
-| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled                   
    | false             | If true, checksum write to SSD is enabled.            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled   
    | false             | If true, checksum read verification from SSD is 
enabled.                                                                        
                                                                                
                                                                                
                                                                                
                    [...]
-| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow                    
    | false             | True if copy on write should be disabled.             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdODirect                           
    | false             | The O_DIRECT flag for cache writing                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled    
    | false             | Whether to apply dynamic filters pushed down from 
hash probe in the ValueStream (shuffle reader) operator to filter rows before 
they reach the hash join.                                                       
                                                                                
                                                                                
                    [...]
-| spark.gluten.sql.enable.enhancedFeatures                                     
    | true              | Enable some features including iceberg native write 
and other features.                                                             
                                                                                
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.rewrite.castArrayToString                                   
    | true              | When true, rewrite `cast(array as String)` to 
`concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. 
                                                                                
                                                                                
                                                                                
                      [...]
-| spark.gluten.velox.broadcast.build.targetBytesPerThread                      
    | 32MB              | It is used to calculate the number of hash table 
build threads. Based on our testing across various thresholds (1MB to 128MB), 
we recommend a value of 32MB or 64MB, as these consistently provided the most 
significant performance gains.                                                  
                                                                                
                       [...]
-| spark.gluten.velox.castFromVarcharAddTrimNode                                
    | false             | If true, will add a trim node which has the same 
semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.           
                                                                                
                                                                                
                                                                                
                   [...]
+|                                       Key                                    
    |      Default      |                                                       
                                                                                
                                                                                
                                                                          
Description                                                                     
                    [...]
+|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| spark.gluten.sql.columnar.backend.velox.IOThreads                            
    | &lt;undefined&gt; | The Size of the IO thread pool in the Connector. This 
thread pool is used for split preloading and DirectBufferedInput. By default, 
the value is the same as the maximum task slots per Spark executor.             
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver                
    | 2                 | The split preload per task                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct      
    | 90                | If partial aggregation aggregationPct greater than 
this value, partial aggregation may be early abandoned. Note: this option only 
works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                                [...]
+| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows     
    | 100000            | If partial aggregation input rows number greater than 
this value,  partial aggregation may be early abandoned. Note: this option only 
works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                            [...]
+| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping           
    | 30000ms           | Timeout in milliseconds when waiting for 
runtime-scoped async work to finish during teardown.                            
                                                                                
                                                                                
                                                                                
                           [...]
+| spark.gluten.sql.columnar.backend.velox.cacheEnabled                         
    | false             | Enable Velox cache, default off. It's recommended to 
enablesoft-affinity as well when enable velox cache.                            
                                                                                
                                                                                
                                                                                
               [...]
+| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct                  
    | 0                 | Set prefetch cache min pct for velox file scan        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.checkUsageLeak                       
    | true              | Enable check memory usage leak.                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.batchSize                       
    | 2147483647        | Cudf input batch size after shuffle reader            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan                 
    | false             | Enable cudf table scan                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation                
    | true              | Heuristics you can apply to validate a cuDF/GPU plan 
and only offload when the entire stage can be fully and profitably executed on 
GPU                                                                             
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent                   
    | 50                | The initial percent of GPU memory to allocate for 
memory resource for one thread.                                                 
                                                                                
                                                                                
                                                                                
                  [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource                  
    | async             | GPU RMM memory resource.                              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes         
    | 1028MB            | Maximum bytes to prefetch in CPU memory during GPU 
shuffle read while waiting for GPU available.                                   
                                                                                
                                                                                
                                                                                
                 [...]
+| spark.gluten.sql.columnar.backend.velox.directorySizeGuess                   
    | 32KB              | Deprecated, rename to 
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                     
                                                                                
                                                                                
                                                                                
                                              [...]
+| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation         
    | true              | Enable validation fallback for TimestampNTZ type. 
When true (default), any plan containing TimestampNTZ will fall back to Spark 
execution. Set to false during development/testing of TimestampNTZ support to 
allow native execution.                                                         
                                                                                
                      [...]
+| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled               
    | false             | Disables caching if false. File handle cache should 
be disabled if files are mutable, i.e. file content may change while file path 
stays the same.                                                                 
                                                                                
                                                                                
                 [...]
+| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold                 
    | 1MB               | Set the file preload threshold for velox file scan, 
refer to Velox's file-preload-threshold                                         
                                                                                
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.columnar.backend.velox.floatingPointMode                    
    | loose             | Config used to control the tolerance of floating 
point operations alignment with Spark. When the mode is set to strict, flushing 
is disabled for sum(float/double)and avg(float/double). When set to loose, 
flushing will be enabled.                                                       
                                                                                
                        [...]
+| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation          
    | true              | Enable flushable aggregation. If true, Gluten will 
try converting regular aggregation into Velox's flushable aggregation when 
applicable. A flushable aggregation could emit intermediate result at anytime 
when memory is full / data reduction ratio is low.                              
                                                                                
                        [...]
+| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                  
    | 32KB              | Set the footer estimated size for velox file scan, 
refer to Velox's footer-estimated-size                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| 
spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize   
 | 0b                | The maximum byte size of Bloom filter that can be 
generated from hash probe. When set to 0, no Bloom filter will be generated. To 
achieve optimal performance, this should not be too larger than the CPU cache 
size on the host.                                                               
                                                                                
                    [...]
+| 
spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled 
 | true              | Whether hash probe can generate any dynamic filter 
(including Bloom filter) and push down to upstream operators.                   
                                                                                
                                                                                
                                                                                
                 [...]
+| 
spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled  
 | false             | Enables a reader-side raw payload merge fast path for 
plain hash shuffle payloads within each shuffle input stream. This path merges 
payload buffers before Velox vectors are materialized, so it has lower 
per-batch overhead than generic VeloxResizeBatchesExec resizing, but it only 
covers plain payloads. Complex types and dictionary-encoded payloads are not 
merged by this path. VeloxRes [...]
+| spark.gluten.sql.columnar.backend.velox.loadQuantum                          
    | 256MB             | Set the load quantum for velox file scan, recommend 
to use the default value (256MB) for performance consideration. If Velox cache 
is enabled, it can be 8MB at most.                                              
                                                                                
                                                                                
                 [...]
+| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes                    
    | 64MB              | Set the max coalesced bytes for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance                 
    | 512KB             | Set the max coalesced distance bytes for velox file 
scan                                                                            
                                                                                
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes                   
    | 100               | Controls maximum number of compiled regular 
expression patterns per function instance per thread of execution.              
                                                                                
                                                                                
                                                                                
                        [...]
+| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory  
    | &lt;undefined&gt; | Set the max extended memory of partial aggregation in 
bytes. When this option is set to a value greater than 0, it will override 
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio.
 Note: this option only works when flushable partial aggregation is enabled. 
Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
        [...]
+| 
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio
 | 0.15              | Set the max extended memory of partial aggregation as 
maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option 
only works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                           [...]
+| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory          
    | &lt;undefined&gt; | Set the max memory of partial aggregation in bytes. 
When this option is set to a value greater than 0, it will override 
spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note: 
this option only works when flushable partial aggregation is enabled. Ignored 
when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. 
                              [...]
+| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio     
    | 0.1               | Set the max memory of partial aggregation as 
maxPartialAggregationMemoryRatio of offheap size. Note: this option only works 
when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                                            [...]
+| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession       
    | 10000             | Maximum number of partitions per a single table 
writer instance.                                                                
                                                                                
                                                                                
                                                                                
                    [...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillBytes                        
    | 100G              | The maximum file size of a query                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize                     
    | 1GB               | The maximum size of a single spill file created       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillLevel                        
    | 4                 | The max allowed spilling level with zero being the 
initial spilling level                                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows                      
    | 3M                | The maximum row size of a single spill run            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize                    
    | 0b                | The target file size for each output file when 
writing data. 0 means no limit on target file size, and the actual file size 
will be determined by other factors such as max partition number and shuffle 
batch size.                                                                     
                                                                                
                           [...]
+| spark.gluten.sql.columnar.backend.velox.memCacheSize                         
    | 1GB               | The memory cache size                                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.memInitCapacity                      
    | 8MB               | The initial memory capacity to reserve for a newly 
created Velox query memory pool.                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| 
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks   
 | true              | Whether to allow memory capacity transfer between memory 
pools from different tasks.                                                     
                                                                                
                                                                                
                                                                                
           [...]
+| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages                   
    | false             | Use explicit huge pages for Velox memory allocation.  
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled                     
    | true              | Enable velox orc scan. If disabled, vanilla spark orc 
scan will be used.                                                              
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames                    
    | true              | Maps table field names to file field names using 
names, not indices for ORC files.                                               
                                                                                
                                                                                
                                                                                
                   [...]
+| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes                
    | 1MB               | The page size in bytes is for compression.            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames                
    | true              | Maps table field names to file field names using 
names, not indices for Parquet files.                                           
                                                                                
                                                                                
                                                                                
                   [...]
+| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups                    
    | 1                 | Set the prefetch row groups for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled                    
    | false             | Enable query tracing flag.                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs                     
    | 3600000ms         | The max time in ms to wait for memory reclaim.        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput           
    | true              | If true, combine small columnar batches together 
before sending to shuffle. The default minimum output batch size is equal to 
0.25 * spark.gluten.sql.columnar.maxBatchSize                                   
                                                                                
                                                                                
                      [...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize   
    | &lt;undefined&gt; | The minimum batch size for shuffle. If size of an 
input batch is smaller than the value, it will be combined with other batches 
before sending to shuffle. Only functions when 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to 
true. Default value: 0.25 * <max batch size>                                    
                                                        [...]
+| 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize
 | &lt;undefined&gt; | The minimum batch size for shuffle input and output. If 
size of an input batch is smaller than the value, it will be combined with 
other batches before sending to shuffle. The same applies for batches output by 
shuffle read. Only functions when 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is s [...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput          
    | false             | If true, combine small columnar batches together 
right after shuffle read. The default minimum output batch size is equal to 
0.25 * spark.gluten.sql.columnar.maxBatchSize                                   
                                                                                
                                                                                
                       [...]
+| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished          
    | false             | Show velox full task metrics when finished.           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.spillFileSystem                      
    | local             | The filesystem used to store spill data. local: The 
local file system. heap-over-local: Write file to JVM heap if having extra heap 
space. Otherwise write to local file system.                                    
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.columnar.backend.velox.spillStrategy                        
    | auto              | none: Disable spill on Velox backend; auto: Let Spark 
memory manager manage Velox's spilling                                          
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads                    
    | 1                 | The IO threads for cache promoting                    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdCachePath                         
    | /tmp              | The folder to store the cache files, better on SSD    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheShards                       
    | 1                 | The cache shards                                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheSize                         
    | 1GB               | The SSD cache size, will do memory caching only if 
this value = 0                                                                  
                                                                                
                                                                                
                                                                                
                 [...]
+| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes           
    | 0                 | Checkpoint after every 'checkpointIntervalBytes' for 
SSD cache. 0 means no checkpointing.                                            
                                                                                
                                                                                
                                                                                
               [...]
+| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled                   
    | false             | If true, checksum write to SSD is enabled.            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled   
    | false             | If true, checksum read verification from SSD is 
enabled.                                                                        
                                                                                
                                                                                
                                                                                
                    [...]
+| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow                    
    | false             | True if copy on write should be disabled.             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdODirect                           
    | false             | The O_DIRECT flag for cache writing                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled    
    | false             | Whether to apply dynamic filters pushed down from 
hash probe in the ValueStream (shuffle reader) operator to filter rows before 
they reach the hash join.                                                       
                                                                                
                                                                                
                    [...]
+| spark.gluten.sql.enable.enhancedFeatures                                     
    | true              | Enable some features including iceberg native write 
and other features.                                                             
                                                                                
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.rewrite.castArrayToString                                   
    | true              | When true, rewrite `cast(array as String)` to 
`concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. 
                                                                                
                                                                                
                                                                                
                      [...]
+| spark.gluten.velox.broadcast.build.targetBytesPerThread                      
    | 32MB              | It is used to calculate the number of hash table 
build threads. Based on our testing across various thresholds (1MB to 128MB), 
we recommend a value of 32MB or 64MB, as these consistently provided the most 
significant performance gains.                                                  
                                                                                
                       [...]
+| spark.gluten.velox.castFromVarcharAddTrimNode                                
    | false             | If true, will add a trim node which has the same 
semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.           
                                                                                
                                                                                
                                                                                
                   [...]
 
 ## Gluten Velox backend *experimental* configurations
 
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
index 6a0f2130d7..449bc86558 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
@@ -42,7 +42,8 @@ public class ShuffleReaderJniWrapper implements RuntimeAware {
       int batchSize,
       long readerBufferSize,
       long deserializerBufferSize,
-      String shuffleWriterType);
+      String shuffleWriterType,
+      boolean enableHashShuffleReaderStreamMerge);
 
   public native long read(long shuffleReaderHandle, ShuffleStreamReader 
streamReader);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to