This is an automated email from the ASF dual-hosted git repository. zhli1142015 pushed a commit to branch vl-hash-shuffle-reader-stream-merge in repository https://gitbox.apache.org/repos/asf/gluten.git
commit 418503b6c8a764caefb87a5ee7aeeb9ad61f704b Author: zhli1142015 <[email protected]> AuthorDate: Fri May 15 23:13:47 2026 +0800 [VL] Add hash shuffle reader stream merge config Gate the reader-side raw payload merge fast path behind a Velox config and document how it complements VeloxResizeBatchesExec. Co-authored-by: Copilot <[email protected]> --- .../VeloxCelebornColumnarBatchSerializer.scala | 6 +- .../org/apache/gluten/config/VeloxConfig.scala | 17 +++ .../vectorized/ColumnarBatchSerializer.scala | 7 +- cpp/core/jni/JniWrapper.cc | 4 +- cpp/core/shuffle/Options.h | 4 + cpp/velox/compute/VeloxRuntime.cc | 3 +- cpp/velox/shuffle/VeloxShuffleReader.cc | 17 ++- cpp/velox/shuffle/VeloxShuffleReader.h | 6 +- cpp/velox/tests/VeloxShuffleWriterTest.cc | 67 +++++++--- docs/velox-configuration.md | 143 +++++++++++---------- .../gluten/vectorized/ShuffleReaderJniWrapper.java | 3 +- 11 files changed, 177 insertions(+), 100 deletions(-) diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala index f377ea99f6..2c36c773f4 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala @@ -17,7 +17,7 @@ package org.apache.spark.shuffle import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType} +import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig} import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.runtime.Runtimes import org.apache.gluten.utils.ArrowAbiUtil @@ -95,6 +95,7 @@ private class CelebornColumnarBatchSerializerInstance( val batchSize = GlutenConfig.get.maxBatchSize val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize val deserializerBufferSize = GlutenConfig.get.columnarSortShuffleDeserializerBufferSize + val enableHashShuffleReaderStreamMerge = VeloxConfig.get.enableHashShuffleReaderStreamMerge val handle = jniWrapper .make( cSchema.memoryAddress(), @@ -103,7 +104,8 @@ private class CelebornColumnarBatchSerializerInstance( batchSize, readerBufferSize, deserializerBufferSize, - shuffleWriterType.name + shuffleWriterType.name, + enableHashShuffleReaderStreamMerge ) // Close shuffle reader instance as lately as the end of task processing, // since the native reader could hold a reference to memory pool that diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index dbc833f046..9ea203d42b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -37,6 +37,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def veloxResizeBatchesShuffleOutput: Boolean = getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT) + def enableHashShuffleReaderStreamMerge: Boolean = + getConf(COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED) + case class ResizeRange(min: Int, max: Int) { assert(max >= min) assert(min > 0, "Min batch size should be larger than 0") @@ -322,6 +325,20 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(false) + val COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED = + buildConf("spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled") + .doc( + "Enables a reader-side raw payload merge fast path for plain hash shuffle payloads " + + "within each shuffle input stream. This path merges payload buffers before Velox " + + "vectors are materialized, so it has lower per-batch overhead than generic " + + "VeloxResizeBatchesExec resizing, but it only covers plain payloads. Complex types " + + "and dictionary-encoded payloads are not merged by this path. " + + "VeloxResizeBatchesExec can still be enabled separately as a generic complement " + + "for types and encodings not covered by this fast path. If false, each hash " + + "shuffle payload is returned as its own columnar batch.") + .booleanConf + .createWithDefault(false) + val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE = buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize") .doc( diff --git a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala index 3b5fce63f8..284d931ecc 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala @@ -17,7 +17,7 @@ package org.apache.gluten.vectorized import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType} +import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig} import org.apache.gluten.iterator.ClosableIterator import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.runtime.Runtimes @@ -104,6 +104,7 @@ private class ColumnarBatchSerializerInstanceImpl( val batchSize = GlutenConfig.get.maxBatchSize val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize val deserializerBufferSize = GlutenConfig.get.columnarSortShuffleDeserializerBufferSize + val enableHashShuffleReaderStreamMerge = VeloxConfig.get.enableHashShuffleReaderStreamMerge val shuffleReaderHandle = jniWrapper.make( cSchema.memoryAddress(), compressionCodec, @@ -111,7 +112,9 @@ private class ColumnarBatchSerializerInstanceImpl( batchSize, readerBufferSize, deserializerBufferSize, - shuffleWriterType.name) + shuffleWriterType.name, + enableHashShuffleReaderStreamMerge + ) // Close shuffle reader instance as lately as the end of task processing, // since the native reader could hold a reference to memory pool that // was used to create all buffers read from shuffle reader. The pool diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 9e194be6ea..70689c950a 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -1211,7 +1211,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe jint batchSize, jlong readerBufferSize, jlong deserializerBufferSize, - jstring shuffleWriterType) { + jstring shuffleWriterType, + jboolean enableHashShuffleReaderStreamMerge) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); @@ -1223,6 +1224,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe options.batchSize = batchSize; options.readerBufferSize = readerBufferSize; options.deserializerBufferSize = deserializerBufferSize; + options.enableHashShuffleReaderStreamMerge = enableHashShuffleReaderStreamMerge; options.shuffleWriterType = ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterType)); std::shared_ptr<arrow::Schema> schema = diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 1d7f9ad9f9..ea3aff10cf 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -63,6 +63,10 @@ struct ShuffleReaderOptions { // Buffer size when deserializing rows into columnar batches. Only used for sort-based shuffle. int64_t deserializerBufferSize = kDefaultDeserializerBufferSize; + + // Whether to enable the reader-side raw payload merge fast path for plain hash shuffle payloads within one input + // stream. + bool enableHashShuffleReaderStreamMerge = false; }; struct ShuffleWriterOptions { diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 1e2cc6f308..e3eac17a22 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -585,7 +585,8 @@ std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader( options.readerBufferSize, options.deserializerBufferSize, memoryManager(), - options.shuffleWriterType); + options.shuffleWriterType, + options.enableHashShuffleReaderStreamMerge); return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory)); } diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 2e5f1f625a..a469d5c770 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -471,6 +471,7 @@ VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer( VeloxMemoryManager* memoryManager, std::vector<bool> isValidityBuffer, bool hasComplexType, + bool enableStreamMerge, int64_t& deserializeTime, int64_t& decompressTime) : streamReader_(streamReader), @@ -482,12 +483,17 @@ VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer( memoryManager_(memoryManager), isValidityBuffer_(std::move(isValidityBuffer)), hasComplexType_(hasComplexType), + enableStreamMerge_(enableStreamMerge), deserializeTime_(deserializeTime), decompressTime_(decompressTime) {} bool VeloxHashShuffleReaderDeserializer::shouldSkipMerge() const { - // Complex type or dictionary encodings do not support merging. - return hasComplexType_ || !dictionaryFields_.empty(); + // Stream merge is a reader-side raw payload fast path: for plain payloads it + // concatenates buffers before Velox vectors are materialized, avoiding the generic + // RowVector append cost paid by VeloxResizeBatchesExec. Keep complex and dictionary + // payloads on the existing per-payload path; VeloxResizeBatchesExec can be enabled + // separately as the generic complement for those cases. + return !enableStreamMerge_ || hasComplexType_ || !dictionaryFields_.empty(); } bool VeloxHashShuffleReaderDeserializer::resolveNextBlockType() { @@ -912,7 +918,8 @@ VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory( int64_t readerBufferSize, int64_t deserializerBufferSize, VeloxMemoryManager* memoryManager, - ShuffleWriterType shuffleWriterType) + ShuffleWriterType shuffleWriterType, + bool enableHashShuffleReaderStreamMerge) : schema_(schema), codec_(codec), veloxCompressionType_(veloxCompressionType), @@ -921,7 +928,8 @@ VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory( readerBufferSize_(readerBufferSize), deserializerBufferSize_(deserializerBufferSize), memoryManager_(memoryManager), - shuffleWriterType_(shuffleWriterType) { + shuffleWriterType_(shuffleWriterType), + enableHashShuffleReaderStreamMerge_(enableHashShuffleReaderStreamMerge) { initFromSchema(); } @@ -952,6 +960,7 @@ std::unique_ptr<ColumnarBatchIterator> VeloxShuffleReaderDeserializerFactory::cr memoryManager_, isValidityBuffer_, hasComplexType_, + enableHashShuffleReaderStreamMerge_, deserializeTime_, decompressTime_); case ShuffleWriterType::kSortShuffle: diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index 0b08fe675a..f92f0a2cc3 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -39,6 +39,7 @@ class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { VeloxMemoryManager* memoryManager, std::vector<bool> isValidityBuffer, bool hasComplexType, + bool enableStreamMerge, int64_t& deserializeTime, int64_t& decompressTime); @@ -60,6 +61,7 @@ class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { VeloxMemoryManager* memoryManager_; std::vector<bool> isValidityBuffer_; bool hasComplexType_; + bool enableStreamMerge_; int64_t& deserializeTime_; int64_t& decompressTime_; @@ -171,7 +173,8 @@ class VeloxShuffleReaderDeserializerFactory { int64_t readerBufferSize, int64_t deserializerBufferSize, VeloxMemoryManager* memoryManager, - ShuffleWriterType shuffleWriterType); + ShuffleWriterType shuffleWriterType, + bool enableHashShuffleReaderStreamMerge = false); std::unique_ptr<ColumnarBatchIterator> createDeserializer(const std::shared_ptr<StreamReader>& streamReader); @@ -195,6 +198,7 @@ class VeloxShuffleReaderDeserializerFactory { bool hasComplexType_{false}; ShuffleWriterType shuffleWriterType_; + bool enableHashShuffleReaderStreamMerge_; int64_t deserializeTime_{0}; int64_t decompressTime_{0}; diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index e0d900b2b1..18046629d4 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -19,6 +19,7 @@ #include <arrow/io/api.h> #include <cstring> +#include <optional> #include "shuffle/Payload.h" #include "shuffle/VeloxHashShuffleWriter.h" @@ -537,20 +538,36 @@ class VeloxShuffleReaderStreamMergeTest : public ::testing::Test, public VeloxSh std::vector<RowVectorPtr> readStreams( const RowTypePtr& rowType, int32_t batchSize, - std::vector<std::shared_ptr<arrow::io::InputStream>> streams) { + std::vector<std::shared_ptr<arrow::io::InputStream>> streams, + std::optional<bool> enableStreamMerge = std::nullopt) { const auto schema = toArrowSchema(rowType, getDefaultMemoryManager()->getLeafMemoryPool().get()); std::shared_ptr<arrow::util::Codec> codec = createCompressionCodec(arrow::Compression::UNCOMPRESSED, CodecBackend::NONE); - auto deserializerFactory = std::make_unique<VeloxShuffleReaderDeserializerFactory>( - schema, - codec, - arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED), - rowType, - batchSize, - kDefaultReadBufferSize, - kDefaultDeserializerBufferSize, - getDefaultMemoryManager(), - ShuffleWriterType::kHashShuffle); + std::unique_ptr<VeloxShuffleReaderDeserializerFactory> deserializerFactory; + if (enableStreamMerge.has_value()) { + deserializerFactory = std::make_unique<VeloxShuffleReaderDeserializerFactory>( + schema, + codec, + arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED), + rowType, + batchSize, + kDefaultReadBufferSize, + kDefaultDeserializerBufferSize, + getDefaultMemoryManager(), + ShuffleWriterType::kHashShuffle, + enableStreamMerge.value()); + } else { + deserializerFactory = std::make_unique<VeloxShuffleReaderDeserializerFactory>( + schema, + codec, + arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED), + rowType, + batchSize, + kDefaultReadBufferSize, + kDefaultDeserializerBufferSize, + getDefaultMemoryManager(), + ShuffleWriterType::kHashShuffle); + } auto reader = std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory)); const auto iter = reader->read(std::make_shared<MultiStreamReader>(std::move(streams))); @@ -595,7 +612,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinStream) { std::vector<std::shared_ptr<arrow::io::InputStream>> streams = {writeSinglePartitionStream(inputs)}; - auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), 2); ASSERT_EQ(output[0]->size(), kBatchSize); @@ -604,6 +621,22 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinStream) { facebook::velox::test::assertEqualVectors(inputs[3], output[1]); } +TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeByDefault) { + constexpr int32_t kBatchSize = 100; + std::vector<RowVectorPtr> inputs = { + makeRowVector({makeFlatVector<int32_t>({1, 2})}), + makeRowVector({makeFlatVector<int32_t>({3, 4})}), + makeRowVector({makeFlatVector<int32_t>({5, 6})})}; + + const auto rowType = facebook::velox::asRowType(inputs[0]->type()); + auto output = readStreams(rowType, kBatchSize, {writeSinglePartitionStream(inputs)}); + + ASSERT_EQ(output.size(), inputs.size()); + for (size_t i = 0; i < inputs.size(); ++i) { + facebook::velox::test::assertEqualVectors(inputs[i], output[i]); + } +} + TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinEachStreamOnly) { constexpr int32_t kBatchSize = 100; std::vector<RowVectorPtr> inputs = { @@ -615,7 +648,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinEachStreamOnly) std::vector<std::shared_ptr<arrow::io::InputStream>> streams = { writeSinglePartitionStream({inputs[0], inputs[1]}), writeSinglePartitionStream({inputs[2], inputs[3]})}; - auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), 2); facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[0], inputs[1]}), output[0]); @@ -636,7 +669,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeAcrossStreams) { streams.push_back(writeSinglePartitionStream(input)); } - auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), inputs.size()); for (size_t i = 0; i < inputs.size(); ++i) { @@ -661,7 +694,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderCarriesOverPayloadThatWouldE std::vector<std::shared_ptr<arrow::io::InputStream>> streams = {writeSinglePartitionStream(inputs)}; - auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), 2); facebook::velox::test::assertEqualVectors(inputs[0], output[0]); @@ -675,7 +708,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderFlushesMergedRowsBeforeDicti std::vector<std::shared_ptr<arrow::io::InputStream>> streams = { writeSinglePartitionStream(plainInput), writeSinglePartitionStream(dictionaryInput, true)}; - auto output = readStreams(facebook::velox::asRowType(plainInput->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(plainInput->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), 2); facebook::velox::test::assertEqualVectors(plainInput, output[0]); @@ -694,7 +727,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeComplexTypeStrea streams.push_back(writeSinglePartitionStream(input)); } - auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), inputs.size()); facebook::velox::test::assertEqualVectors(inputs[0], output[0]); diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index a608dfbc45..a0c0691e2f 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -9,77 +9,78 @@ nav_order: 16 ## Gluten Velox backend configurations -| Key | Default | Description [...] -|----------------------------------------------------------------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] -| spark.gluten.sql.columnar.backend.velox.IOThreads | <undefined> | The Size of the IO thread pool in the Connector. This thread pool is used for split preloading and DirectBufferedInput. By default, the value is the same as the maximum task slots per Spark executor. [...] -| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver | 2 | The split preload per task [...] -| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct | 90 | If partial aggregation aggregationPct greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] -| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows | 100000 | If partial aggregation input rows number greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] -| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout in milliseconds when waiting for runtime-scoped async work to finish during teardown. [...] -| spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. [...] -| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan [...] -| spark.gluten.sql.columnar.backend.velox.checkUsageLeak | true | Enable check memory usage leak. [...] -| spark.gluten.sql.columnar.backend.velox.cudf.batchSize | 2147483647 | Cudf input batch size after shuffle reader [...] -| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan | false | Enable cudf table scan [...] -| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation | true | Heuristics you can apply to validate a cuDF/GPU plan and only offload when the entire stage can be fully and profitably executed on GPU [...] -| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent | 50 | The initial percent of GPU memory to allocate for memory resource for one thread. [...] -| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. [...] -| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes | 1028MB | Maximum bytes to prefetch in CPU memory during GPU shuffle read while waiting for GPU available. [...] -| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize [...] -| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | true | Enable validation fallback for TimestampNTZ type. When true (default), any plan containing TimestampNTZ will fall back to Spark execution. Set to false during development/testing of TimestampNTZ support to allow native execution. [...] -| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled | false | Disables caching if false. File handle cache should be disabled if files are mutable, i.e. file content may change while file path stays the same. [...] -| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | 1MB | Set the file preload threshold for velox file scan, refer to Velox's file-preload-threshold [...] -| spark.gluten.sql.columnar.backend.velox.floatingPointMode | loose | Config used to control the tolerance of floating point operations alignment with Spark. When the mode is set to strict, flushing is disabled for sum(float/double)and avg(float/double). When set to loose, flushing will be enabled. [...] -| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation | true | Enable flushable aggregation. If true, Gluten will try converting regular aggregation into Velox's flushable aggregation when applicable. A flushable aggregation could emit intermediate result at anytime when memory is full / data reduction ratio is low. [...] -| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | 32KB | Set the footer estimated size for velox file scan, refer to Velox's footer-estimated-size [...] -| spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize | 0b | The maximum byte size of Bloom filter that can be generated from hash probe. When set to 0, no Bloom filter will be generated. To achieve optimal performance, this should not be too larger than the CPU cache size on the host. [...] -| spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled | true | Whether hash probe can generate any dynamic filter (including Bloom filter) and push down to upstream operators. [...] -| spark.gluten.sql.columnar.backend.velox.loadQuantum | 256MB | Set the load quantum for velox file scan, recommend to use the default value (256MB) for performance consideration. If Velox cache is enabled, it can be 8MB at most. [...] -| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | 64MB | Set the max coalesced bytes for velox file scan [...] -| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance | 512KB | Set the max coalesced distance bytes for velox file scan [...] -| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes | 100 | Controls maximum number of compiled regular expression patterns per function instance per thread of execution. [...] -| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory | <undefined> | Set the max extended memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] -| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio | 0.15 | Set the max extended memory of partial aggregation as maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] -| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory | <undefined> | Set the max memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] -| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio | 0.1 | Set the max memory of partial aggregation as maxPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] -| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession | 10000 | Maximum number of partitions per a single table writer instance. [...] -| spark.gluten.sql.columnar.backend.velox.maxSpillBytes | 100G | The maximum file size of a query [...] -| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize | 1GB | The maximum size of a single spill file created [...] -| spark.gluten.sql.columnar.backend.velox.maxSpillLevel | 4 | The max allowed spilling level with zero being the initial spilling level [...] -| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows | 3M | The maximum row size of a single spill run [...] -| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize | 0b | The target file size for each output file when writing data. 0 means no limit on target file size, and the actual file size will be determined by other factors such as max partition number and shuffle batch size. [...] -| spark.gluten.sql.columnar.backend.velox.memCacheSize | 1GB | The memory cache size [...] -| spark.gluten.sql.columnar.backend.velox.memInitCapacity | 8MB | The initial memory capacity to reserve for a newly created Velox query memory pool. [...] -| spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks | true | Whether to allow memory capacity transfer between memory pools from different tasks. [...] -| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. [...] -| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. [...] -| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. [...] -| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes | 1MB | The page size in bytes is for compression. [...] -| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. [...] -| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan [...] -| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. [...] -| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. [...] -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput | true | If true, combine small columnar batches together before sending to shuffle. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize [...] -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize | <undefined> | The minimum batch size for shuffle. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to true. Default value: 0.25 * <max batch size> [...] -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize | <undefined> | The minimum batch size for shuffle input and output. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. The same applies for batches output by shuffle read. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is s [...] -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput | false | If true, combine small columnar batches together right after shuffle read. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize [...] -| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished | false | Show velox full task metrics when finished. [...] -| spark.gluten.sql.columnar.backend.velox.spillFileSystem | local | The filesystem used to store spill data. local: The local file system. heap-over-local: Write file to JVM heap if having extra heap space. Otherwise write to local file system. [...] -| spark.gluten.sql.columnar.backend.velox.spillStrategy | auto | none: Disable spill on Velox backend; auto: Let Spark memory manager manage Velox's spilling [...] -| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads | 1 | The IO threads for cache promoting [...] -| spark.gluten.sql.columnar.backend.velox.ssdCachePath | /tmp | The folder to store the cache files, better on SSD [...] -| spark.gluten.sql.columnar.backend.velox.ssdCacheShards | 1 | The cache shards [...] -| spark.gluten.sql.columnar.backend.velox.ssdCacheSize | 1GB | The SSD cache size, will do memory caching only if this value = 0 [...] -| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes | 0 | Checkpoint after every 'checkpointIntervalBytes' for SSD cache. 0 means no checkpointing. [...] -| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled | false | If true, checksum write to SSD is enabled. [...] -| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. [...] -| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. [...] -| spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing [...] -| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled | false | Whether to apply dynamic filters pushed down from hash probe in the ValueStream (shuffle reader) operator to filter rows before they reach the hash join. [...] -| spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. [...] -| spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. [...] -| spark.gluten.velox.broadcast.build.targetBytesPerThread | 32MB | It is used to calculate the number of hash table build threads. Based on our testing across various thresholds (1MB to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided the most significant performance gains. [...] -| spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. [...] +| Key | Default | Description [...] +|----------------------------------------------------------------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| spark.gluten.sql.columnar.backend.velox.IOThreads | <undefined> | The Size of the IO thread pool in the Connector. This thread pool is used for split preloading and DirectBufferedInput. By default, the value is the same as the maximum task slots per Spark executor. [...] +| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver | 2 | The split preload per task [...] +| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct | 90 | If partial aggregation aggregationPct greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] +| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows | 100000 | If partial aggregation input rows number greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] +| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout in milliseconds when waiting for runtime-scoped async work to finish during teardown. [...] +| spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. [...] +| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan [...] +| spark.gluten.sql.columnar.backend.velox.checkUsageLeak | true | Enable check memory usage leak. [...] +| spark.gluten.sql.columnar.backend.velox.cudf.batchSize | 2147483647 | Cudf input batch size after shuffle reader [...] +| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan | false | Enable cudf table scan [...] +| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation | true | Heuristics you can apply to validate a cuDF/GPU plan and only offload when the entire stage can be fully and profitably executed on GPU [...] +| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent | 50 | The initial percent of GPU memory to allocate for memory resource for one thread. [...] +| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. [...] +| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes | 1028MB | Maximum bytes to prefetch in CPU memory during GPU shuffle read while waiting for GPU available. [...] +| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize [...] +| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | true | Enable validation fallback for TimestampNTZ type. When true (default), any plan containing TimestampNTZ will fall back to Spark execution. Set to false during development/testing of TimestampNTZ support to allow native execution. [...] +| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled | false | Disables caching if false. File handle cache should be disabled if files are mutable, i.e. file content may change while file path stays the same. [...] +| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | 1MB | Set the file preload threshold for velox file scan, refer to Velox's file-preload-threshold [...] +| spark.gluten.sql.columnar.backend.velox.floatingPointMode | loose | Config used to control the tolerance of floating point operations alignment with Spark. When the mode is set to strict, flushing is disabled for sum(float/double)and avg(float/double). When set to loose, flushing will be enabled. [...] +| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation | true | Enable flushable aggregation. If true, Gluten will try converting regular aggregation into Velox's flushable aggregation when applicable. A flushable aggregation could emit intermediate result at anytime when memory is full / data reduction ratio is low. [...] +| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | 32KB | Set the footer estimated size for velox file scan, refer to Velox's footer-estimated-size [...] +| spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize | 0b | The maximum byte size of Bloom filter that can be generated from hash probe. When set to 0, no Bloom filter will be generated. To achieve optimal performance, this should not be too larger than the CPU cache size on the host. [...] +| spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled | true | Whether hash probe can generate any dynamic filter (including Bloom filter) and push down to upstream operators. [...] +| spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled | false | Enables a reader-side raw payload merge fast path for plain hash shuffle payloads within each shuffle input stream. This path merges payload buffers before Velox vectors are materialized, so it has lower per-batch overhead than generic VeloxResizeBatchesExec resizing, but it only covers plain payloads. Complex types and dictionary-encoded payloads are not merged by this path. VeloxRes [...] +| spark.gluten.sql.columnar.backend.velox.loadQuantum | 256MB | Set the load quantum for velox file scan, recommend to use the default value (256MB) for performance consideration. If Velox cache is enabled, it can be 8MB at most. [...] +| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | 64MB | Set the max coalesced bytes for velox file scan [...] +| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance | 512KB | Set the max coalesced distance bytes for velox file scan [...] +| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes | 100 | Controls maximum number of compiled regular expression patterns per function instance per thread of execution. [...] +| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory | <undefined> | Set the max extended memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] +| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio | 0.15 | Set the max extended memory of partial aggregation as maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] +| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory | <undefined> | Set the max memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] +| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio | 0.1 | Set the max memory of partial aggregation as maxPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. [...] +| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession | 10000 | Maximum number of partitions per a single table writer instance. [...] +| spark.gluten.sql.columnar.backend.velox.maxSpillBytes | 100G | The maximum file size of a query [...] +| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize | 1GB | The maximum size of a single spill file created [...] +| spark.gluten.sql.columnar.backend.velox.maxSpillLevel | 4 | The max allowed spilling level with zero being the initial spilling level [...] +| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows | 3M | The maximum row size of a single spill run [...] +| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize | 0b | The target file size for each output file when writing data. 0 means no limit on target file size, and the actual file size will be determined by other factors such as max partition number and shuffle batch size. [...] +| spark.gluten.sql.columnar.backend.velox.memCacheSize | 1GB | The memory cache size [...] +| spark.gluten.sql.columnar.backend.velox.memInitCapacity | 8MB | The initial memory capacity to reserve for a newly created Velox query memory pool. [...] +| spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks | true | Whether to allow memory capacity transfer between memory pools from different tasks. [...] +| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. [...] +| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. [...] +| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. [...] +| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes | 1MB | The page size in bytes is for compression. [...] +| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. [...] +| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan [...] +| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. [...] +| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. [...] +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput | true | If true, combine small columnar batches together before sending to shuffle. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize [...] +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize | <undefined> | The minimum batch size for shuffle. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to true. Default value: 0.25 * <max batch size> [...] +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize | <undefined> | The minimum batch size for shuffle input and output. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. The same applies for batches output by shuffle read. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is s [...] +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput | false | If true, combine small columnar batches together right after shuffle read. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize [...] +| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished | false | Show velox full task metrics when finished. [...] +| spark.gluten.sql.columnar.backend.velox.spillFileSystem | local | The filesystem used to store spill data. local: The local file system. heap-over-local: Write file to JVM heap if having extra heap space. Otherwise write to local file system. [...] +| spark.gluten.sql.columnar.backend.velox.spillStrategy | auto | none: Disable spill on Velox backend; auto: Let Spark memory manager manage Velox's spilling [...] +| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads | 1 | The IO threads for cache promoting [...] +| spark.gluten.sql.columnar.backend.velox.ssdCachePath | /tmp | The folder to store the cache files, better on SSD [...] +| spark.gluten.sql.columnar.backend.velox.ssdCacheShards | 1 | The cache shards [...] +| spark.gluten.sql.columnar.backend.velox.ssdCacheSize | 1GB | The SSD cache size, will do memory caching only if this value = 0 [...] +| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes | 0 | Checkpoint after every 'checkpointIntervalBytes' for SSD cache. 0 means no checkpointing. [...] +| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled | false | If true, checksum write to SSD is enabled. [...] +| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. [...] +| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. [...] +| spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing [...] +| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled | false | Whether to apply dynamic filters pushed down from hash probe in the ValueStream (shuffle reader) operator to filter rows before they reach the hash join. [...] +| spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. [...] +| spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. [...] +| spark.gluten.velox.broadcast.build.targetBytesPerThread | 32MB | It is used to calculate the number of hash table build threads. Based on our testing across various thresholds (1MB to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided the most significant performance gains. [...] +| spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. [...] ## Gluten Velox backend *experimental* configurations diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java index 6a0f2130d7..449bc86558 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java @@ -42,7 +42,8 @@ public class ShuffleReaderJniWrapper implements RuntimeAware { int batchSize, long readerBufferSize, long deserializerBufferSize, - String shuffleWriterType); + String shuffleWriterType, + boolean enableHashShuffleReaderStreamMerge); public native long read(long shuffleReaderHandle, ShuffleStreamReader streamReader); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
