This is an automated email from the ASF dual-hosted git repository. marong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c0d633dd5 [VL] Row-based sort shuffle follow-up (minor) (#6628)
c0d633dd5 is described below
commit c0d633dd57423d84634b38bcbf9248241d8f5f8a
Author: Rong Ma <[email protected]>
AuthorDate: Tue Jul 30 12:44:01 2024 +0800
[VL] Row-based sort shuffle follow-up (minor) (#6628)
---
cpp/velox/shuffle/RadixSort.h | 10 ++-
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 7 +-
docs/Configuration.md | 120 ++++++++++++++--------------
3 files changed, 68 insertions(+), 69 deletions(-)
diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h
index 17f05d349..90921962a 100644
--- a/cpp/velox/shuffle/RadixSort.h
+++ b/cpp/velox/shuffle/RadixSort.h
@@ -21,7 +21,9 @@
namespace gluten {
-template <typename Element>
+// Spark radix sort implementation. This implementation is for shuffle sort
only as it removes unused
+// params (desc, signed) in shuffle.
+//
https://github.com/apache/spark/blob/308669fc301916837bacb7c3ec1ecef93190c094/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/RadixSort.java#L25
class RadixSort {
public:
/**
@@ -39,7 +41,7 @@ class RadixSort {
* @return The starting index of the sorted data within the given array. We
return this instead
* of always copying the data back to position zero for efficiency.
*/
- static int32_t sort(Element* array, size_t size, int64_t numRecords, int32_t
startByteIndex, int32_t endByteIndex) {
+ static int32_t sort(uint64_t* array, size_t size, int64_t numRecords,
int32_t startByteIndex, int32_t endByteIndex) {
assert(startByteIndex >= 0 && "startByteIndex should >= 0");
assert(endByteIndex <= 7 && "endByteIndex should <= 7");
assert(endByteIndex > startByteIndex);
@@ -75,7 +77,7 @@ class RadixSort {
* @param outIndex the starting index where sorted output data should be
written.
*/
static void sortAtByte(
- Element* array,
+ uint64_t* array,
int64_t numRecords,
std::vector<int64_t>& counts,
int32_t byteIdx,
@@ -103,7 +105,7 @@ class RadixSort {
* significant byte. If the byte does not need sorting the vector
entry will be empty.
*/
static std::vector<std::vector<int64_t>>
- getCounts(Element* array, int64_t numRecords, int32_t startByteIndex,
int32_t endByteIndex) {
+ getCounts(uint64_t* array, int64_t numRecords, int32_t startByteIndex,
int32_t endByteIndex) {
std::vector<std::vector<int64_t>> counts;
counts.resize(8);
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 0015ba9d3..d7db69659 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -220,12 +220,9 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions()
{
{
ScopedTimer timer(&sortTime_);
if (options_.useRadixSort) {
- begin = RadixSort<uint64_t>::sort(
- arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex,
kPartitionIdEndByteIndex);
+ begin = RadixSort::sort(arrayPtr_, arraySize_, numRecords,
kPartitionIdStartByteIndex, kPartitionIdEndByteIndex);
} else {
- auto ptr = arrayPtr_;
- qsort(ptr, numRecords, sizeof(uint64_t), compare);
- (void)ptr;
+ std::sort(arrayPtr_, arrayPtr_ + numRecords);
}
}
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 2c2bd4de1..4e47564a6 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -11,65 +11,66 @@ You can add these configurations into spark-defaults.conf
to enable or disable t
## Spark parameters
-| Parameters | Description
[...]
-|------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| spark.driver.extraClassPath | To add Gluten
Plugin jar file in Spark Driver
[...]
-| spark.executor.extraClassPath | To add Gluten
Plugin jar file in Spark Executor
[...]
-| spark.executor.memory | To set up how
much memory to be used for Spark Executor.
[...]
-| spark.memory.offHeap.size | To set up how
much memory to be used for Java OffHeap.<br /> Please notice Gluten Plugin will
leverage this setting to allocate memory space for native usage even offHeap is
disabled. <br /> The value is based on your system and it is recommended to set
it larger if you are facing Out of Memory issue in Gluten Plugin
[...]
-| spark.sql.sources.useV1SourceList | Choose to use
V1 source
[...]
-| spark.sql.join.preferSortMergeJoin | To turn off
preferSortMergeJoin in Spark
[...]
-| spark.plugins | To load
Gluten's components by Spark's plug-in loader
[...]
-| spark.shuffle.manager | To turn on
Gluten Columnar Shuffle Plugin
[...]
-| spark.gluten.enabled | Enable Gluten,
default is true. Just an experimental property. Recommend to enable/disable
Gluten through the setting for `spark.plugins`.
[...]
-| spark.gluten.memory.isolation | (Experimental)
Enable isolated memory mode. If true, Gluten controls the maximum off-heap
memory can be used by each task to X, X = executor memory / max task slots.
It's recommended to set true if Gluten serves concurrent queries within a
single session, since not all memory Gluten allocated is guaranteed to be
spillable. In the case, the feature should be enabled to avoid OOM. Note when
true, setting spark.memory.storageFra [...]
-| spark.gluten.ras.enabled | Experimental:
Enables RAS (relation algebra selector) during physical planning to generate
more efficient query plan. Note, this feature is still in development and may
not bring performance profits.
[...]
-| spark.gluten.sql.columnar.maxBatchSize | Number of rows
to be processed in each batch. Default value is 4096.
[...]
-| spark.gluten.sql.columnar.scanOnly | When enabled,
this config will overwrite all other operators' enabling, and only Scan and
Filter pushdown will be offloaded to native.
[...]
-| spark.gluten.sql.columnar.batchscan | Enable or
Disable Columnar BatchScan, default is true
[...]
-| spark.gluten.sql.columnar.hashagg | Enable or
Disable Columnar Hash Aggregate, default is true
[...]
-| spark.gluten.sql.columnar.project | Enable or
Disable Columnar Project, default is true
[...]
-| spark.gluten.sql.columnar.filter | Enable or
Disable Columnar Filter, default is true
[...]
-| spark.gluten.sql.columnar.sort | Enable or
Disable Columnar Sort, default is true
[...]
-| spark.gluten.sql.columnar.window | Enable or
Disable Columnar Window, default is true
[...]
-| spark.gluten.sql.columnar.shuffledHashJoin | Enable or
Disable ShuffledHashJoin, default is true
[...]
-| spark.gluten.sql.columnar.forceShuffledHashJoin | Force to use
ShuffledHashJoin over SortMergeJoin, default is true. For queries that can
benefit from storaged patitioned join, please set it to false.
[...]
-| spark.gluten.sql.columnar.sortMergeJoin | Enable or
Disable Columnar Sort Merge Join, default is true
[...]
-| spark.gluten.sql.columnar.union | Enable or
Disable Columnar Union, default is true
[...]
-| spark.gluten.sql.columnar.expand | Enable or
Disable Columnar Expand, default is true
[...]
-| spark.gluten.sql.columnar.generate | Enable or
Disable Columnar Generate, default is true
[...]
-| spark.gluten.sql.columnar.limit | Enable or
Disable Columnar Limit, default is true
[...]
-| spark.gluten.sql.columnar.tableCache | Enable or
Disable Columnar Table Cache, default is false
[...]
-| spark.gluten.sql.columnar.broadcastExchange | Enable or
Disable Columnar Broadcast Exchange, default is true
[...]
-| spark.gluten.sql.columnar.broadcastJoin | Enable or
Disable Columnar BroadcastHashJoin, default is true
[...]
-| spark.gluten.sql.columnar.shuffle.sort.threshold | The threshold
to determine whether to use sort-based columnar shuffle. Sort-based shuffle
will be used if the number of partitions is greater than this threshold.
[...]
-| spark.gluten.sql.columnar.shuffle.codec | Set up the
codec to be used for Columnar Shuffle. If this configuration is not set, will
check the value of spark.io.compression.codec. By default, Gluten use software
compression. Valid options for software compression are lz4, zstd. Valid
options for QAT and IAA is gzip.
[...]
-| spark.gluten.sql.columnar.shuffle.codecBackend | Enable using
hardware accelerators for shuffle de/compression. Valid options are QAT and
IAA.
[...]
-| spark.gluten.sql.columnar.shuffle.compressionMode | Setting
different compression mode in shuffle, Valid options are buffer and rowvector,
buffer option compress each buffer of RowVector individually into one
pre-allocated large buffer, rowvector option first copies each buffer of
RowVector to a large buffer and then compress the entire buffer in one go.
[...]
-| spark.gluten.sql.columnar.shuffle.compression.threshold | If number of
rows in a batch falls below this threshold, will copy all buffers into one
buffer to compress.
[...]
-| spark.gluten.sql.columnar.shuffle.realloc.threshold | Set the
threshold to dynamically adjust the size of shuffle split buffers. The size of
each split buffer is recalculated for each incoming batch of data. If the new
size deviates from the current partition buffer size by a factor outside the
range of [1 - threshold, 1 + threshold], the split buffer will be re-allocated
using the newly calculated size
[...]
-| spark.gluten.sql.columnar.shuffle.merge.threshold | Set the
threshold control the minimum merged size. When a partition buffer is full, and
the number of rows is below (`threshold *
spark.gluten.sql.columnar.maxBatchSize`), it will be saved for merging.
[...]
-| spark.gluten.sql.columnar.numaBinding | Set up
NUMABinding, default is false
[...]
-| spark.gluten.sql.columnar.coreRange | Set up the core
range for NUMABinding, only works when numaBinding set to true. <br /> The
setting is based on the number of cores in your system. Use 72 cores as an
example.
[...]
-| spark.gluten.sql.columnar.wholeStage.fallback.threshold | Configure the
threshold for whether whole stage will fall back in AQE supported case by
counting the number of ColumnarToRow & vanilla leaf node
[...]
-| spark.gluten.sql.columnar.query.fallback.threshold | Configure the
threshold for whether query will fall back by counting the number of
ColumnarToRow & vanilla leaf node
[...]
-| spark.gluten.sql.columnar.fallback.ignoreRowToColumnar | When true, the
fallback policy ignores the RowToColumnar when counting fallback number.
[...]
-| spark.gluten.sql.columnar.fallback.preferColumnar | When true, the
fallback policy prefers to use Gluten plan rather than vanilla Spark plan if
the both of them contains ColumnarToRow and the vanilla Spark plan
ColumnarToRow number is not smaller than Gluten plan.
[...]
-| spark.gluten.sql.columnar.force.hashagg | Force to use
hash agg to replace sort agg.
[...]
-| spark.gluten.sql.columnar.vanillaReaders | Enable vanilla
spark's vectorized reader. Please note it may bring perf. overhead due to extra
data transition. We recommend to disable it if most queries can be fully
offloaded to gluten.
[...]
-| spark.gluten.sql.native.bloomFilter | Enable or
Disable native runtime bloom filter.
[...]
-| spark.gluten.sql.native.arrow.reader.enabled | Enable or
Disable native arrow read CSV file format
[...]
-| spark.gluten.shuffleWriter.bufferSize | Set the number
of buffer rows for the shuffle writer
[...]
-| spark.gluten.loadLibFromJar | Controls
whether to load dynamic link library from a packed jar for gluten/cpp. Not
applicable to static build and clickhouse backend.
[...]
-| spark.gluten.loadLibOS | When
`spark.gluten.loadLibFromJar` is true. Manually specify the system os to load
library, e.g., CentOS
[...]
-| spark.gluten.loadLibOSVersion | Manually
specify the system os version to load library, e.g., if
`spark.gluten.loadLibOS` is CentOS, this config can be 7
[...]
-| spark.gluten.expression.blacklist | A black list of
expression to skip transform, multiple values separated by commas.
[...]
-| spark.gluten.sql.columnar.fallback.expressions.threshold | Fall back
filter/project if the height of expression tree reaches this threshold,
considering Spark codegen can bring better performance for such case.
[...]
-| spark.gluten.sql.cartesianProductTransformerEnabled | Config to
enable CartesianProductExecTransformer.
[...]
-| spark.gluten.sql.broadcastNestedLoopJoinTransformerEnabled | Config to
enable BroadcastNestedLoopJoinExecTransformer.
[...]
-| spark.gluten.sql.cacheWholeStageTransformerContext | When true,
`WholeStageTransformer` will cache the `WholeStageTransformerContext` when
executing. It is used to get substrait plan node and native plan string.
[...]
-| spark.gluten.sql.injectNativePlanStringToExplain | When true,
Gluten will inject native plan tree to explain string inside
`WholeStageTransformerContext`.
[...]
-| spark.gluten.sql.fallbackRegexpExpressions | When true,
Gluten will fall back all regexp expressions to avoid any incompatibility risk.
[...]
+| Parameters | Description
[...]
+|-------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| spark.driver.extraClassPath | To add Gluten
Plugin jar file in Spark Driver
[...]
+| spark.executor.extraClassPath | To add Gluten
Plugin jar file in Spark Executor
[...]
+| spark.executor.memory | To set up how
much memory to be used for Spark Executor.
[...]
+| spark.memory.offHeap.size | To set up how
much memory to be used for Java OffHeap.<br /> Please notice Gluten Plugin will
leverage this setting to allocate memory space for native usage even offHeap is
disabled. <br /> The value is based on your system and it is recommended to set
it larger if you are facing Out of Memory issue in Gluten Plugin
[...]
+| spark.sql.sources.useV1SourceList | Choose to use
V1 source
[...]
+| spark.sql.join.preferSortMergeJoin | To turn off
preferSortMergeJoin in Spark
[...]
+| spark.plugins | To load
Gluten's components by Spark's plug-in loader
[...]
+| spark.shuffle.manager | To turn on
Gluten Columnar Shuffle Plugin
[...]
+| spark.gluten.enabled | Enable Gluten,
default is true. Just an experimental property. Recommend to enable/disable
Gluten through the setting for `spark.plugins`.
[...]
+| spark.gluten.memory.isolation | (Experimental)
Enable isolated memory mode. If true, Gluten controls the maximum off-heap
memory can be used by each task to X, X = executor memory / max task slots.
It's recommended to set true if Gluten serves concurrent queries within a
single session, since not all memory Gluten allocated is guaranteed to be
spillable. In the case, the feature should be enabled to avoid OOM. Note when
true, setting spark.memory.storageFr [...]
+| spark.gluten.ras.enabled | Experimental:
Enables RAS (relation algebra selector) during physical planning to generate
more efficient query plan. Note, this feature is still in development and may
not bring performance profits.
[...]
+| spark.gluten.sql.columnar.maxBatchSize | Number of rows
to be processed in each batch. Default value is 4096.
[...]
+| spark.gluten.sql.columnar.scanOnly | When enabled,
this config will overwrite all other operators' enabling, and only Scan and
Filter pushdown will be offloaded to native.
[...]
+| spark.gluten.sql.columnar.batchscan | Enable or
Disable Columnar BatchScan, default is true
[...]
+| spark.gluten.sql.columnar.hashagg | Enable or
Disable Columnar Hash Aggregate, default is true
[...]
+| spark.gluten.sql.columnar.project | Enable or
Disable Columnar Project, default is true
[...]
+| spark.gluten.sql.columnar.filter | Enable or
Disable Columnar Filter, default is true
[...]
+| spark.gluten.sql.columnar.sort | Enable or
Disable Columnar Sort, default is true
[...]
+| spark.gluten.sql.columnar.window | Enable or
Disable Columnar Window, default is true
[...]
+| spark.gluten.sql.columnar.shuffledHashJoin | Enable or
Disable ShuffledHashJoin, default is true
[...]
+| spark.gluten.sql.columnar.forceShuffledHashJoin | Force to use
ShuffledHashJoin over SortMergeJoin, default is true. For queries that can
benefit from storaged patitioned join, please set it to false.
[...]
+| spark.gluten.sql.columnar.sortMergeJoin | Enable or
Disable Columnar Sort Merge Join, default is true
[...]
+| spark.gluten.sql.columnar.union | Enable or
Disable Columnar Union, default is true
[...]
+| spark.gluten.sql.columnar.expand | Enable or
Disable Columnar Expand, default is true
[...]
+| spark.gluten.sql.columnar.generate | Enable or
Disable Columnar Generate, default is true
[...]
+| spark.gluten.sql.columnar.limit | Enable or
Disable Columnar Limit, default is true
[...]
+| spark.gluten.sql.columnar.tableCache | Enable or
Disable Columnar Table Cache, default is false
[...]
+| spark.gluten.sql.columnar.broadcastExchange | Enable or
Disable Columnar Broadcast Exchange, default is true
[...]
+| spark.gluten.sql.columnar.broadcastJoin | Enable or
Disable Columnar BroadcastHashJoin, default is true
[...]
+| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | The threshold
to determine whether to use sort-based columnar shuffle. Sort-based shuffle
will be used if the number of partitions is greater than this threshold.
[...]
+| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | The threshold
to determine whether to use sort-based columnar shuffle. Sort-based shuffle
will be used if the number of columns is greater than this threshold.
[...]
+| spark.gluten.sql.columnar.shuffle.codec | Set up the
codec to be used for Columnar Shuffle. If this configuration is not set, will
check the value of spark.io.compression.codec. By default, Gluten use software
compression. Valid options for software compression are lz4, zstd. Valid
options for QAT and IAA is gzip.
[...]
+| spark.gluten.sql.columnar.shuffle.codecBackend | Enable using
hardware accelerators for shuffle de/compression. Valid options are QAT and
IAA.
[...]
+| spark.gluten.sql.columnar.shuffle.compressionMode | Setting
different compression mode in shuffle, Valid options are buffer and rowvector,
buffer option compress each buffer of RowVector individually into one
pre-allocated large buffer, rowvector option first copies each buffer of
RowVector to a large buffer and then compress the entire buffer in one go.
[...]
+| spark.gluten.sql.columnar.shuffle.compression.threshold | If number of
rows in a batch falls below this threshold, will copy all buffers into one
buffer to compress.
[...]
+| spark.gluten.sql.columnar.shuffle.realloc.threshold | Set the
threshold to dynamically adjust the size of shuffle split buffers. The size of
each split buffer is recalculated for each incoming batch of data. If the new
size deviates from the current partition buffer size by a factor outside the
range of [1 - threshold, 1 + threshold], the split buffer will be re-allocated
using the newly calculated size
[...]
+| spark.gluten.sql.columnar.shuffle.merge.threshold | Set the
threshold control the minimum merged size. When a partition buffer is full, and
the number of rows is below (`threshold *
spark.gluten.sql.columnar.maxBatchSize`), it will be saved for merging.
[...]
+| spark.gluten.sql.columnar.numaBinding | Set up
NUMABinding, default is false
[...]
+| spark.gluten.sql.columnar.coreRange | Set up the
core range for NUMABinding, only works when numaBinding set to true. <br /> The
setting is based on the number of cores in your system. Use 72 cores as an
example.
[...]
+| spark.gluten.sql.columnar.wholeStage.fallback.threshold | Configure the
threshold for whether whole stage will fall back in AQE supported case by
counting the number of ColumnarToRow & vanilla leaf node
[...]
+| spark.gluten.sql.columnar.query.fallback.threshold | Configure the
threshold for whether query will fall back by counting the number of
ColumnarToRow & vanilla leaf node
[...]
+| spark.gluten.sql.columnar.fallback.ignoreRowToColumnar | When true, the
fallback policy ignores the RowToColumnar when counting fallback number.
[...]
+| spark.gluten.sql.columnar.fallback.preferColumnar | When true, the
fallback policy prefers to use Gluten plan rather than vanilla Spark plan if
the both of them contains ColumnarToRow and the vanilla Spark plan
ColumnarToRow number is not smaller than Gluten plan.
[...]
+| spark.gluten.sql.columnar.force.hashagg | Force to use
hash agg to replace sort agg.
[...]
+| spark.gluten.sql.columnar.vanillaReaders | Enable vanilla
spark's vectorized reader. Please note it may bring perf. overhead due to extra
data transition. We recommend to disable it if most queries can be fully
offloaded to gluten.
[...]
+| spark.gluten.sql.native.bloomFilter | Enable or
Disable native runtime bloom filter.
[...]
+| spark.gluten.sql.native.arrow.reader.enabled | Enable or
Disable native arrow read CSV file format
[...]
+| spark.gluten.shuffleWriter.bufferSize | Set the number
of buffer rows for the shuffle writer
[...]
+| spark.gluten.loadLibFromJar | Controls
whether to load dynamic link library from a packed jar for gluten/cpp. Not
applicable to static build and clickhouse backend.
[...]
+| spark.gluten.loadLibOS | When
`spark.gluten.loadLibFromJar` is true. Manually specify the system os to load
library, e.g., CentOS
[...]
+| spark.gluten.loadLibOSVersion | Manually
specify the system os version to load library, e.g., if
`spark.gluten.loadLibOS` is CentOS, this config can be 7
[...]
+| spark.gluten.expression.blacklist | A black list
of expression to skip transform, multiple values separated by commas.
[...]
+| spark.gluten.sql.columnar.fallback.expressions.threshold | Fall back
filter/project if the height of expression tree reaches this threshold,
considering Spark codegen can bring better performance for such case.
[...]
+| spark.gluten.sql.cartesianProductTransformerEnabled | Config to
enable CartesianProductExecTransformer.
[...]
+| spark.gluten.sql.broadcastNestedLoopJoinTransformerEnabled | Config to
enable BroadcastNestedLoopJoinExecTransformer.
[...]
+| spark.gluten.sql.cacheWholeStageTransformerContext | When true,
`WholeStageTransformer` will cache the `WholeStageTransformerContext` when
executing. It is used to get substrait plan node and native plan string.
[...]
+| spark.gluten.sql.injectNativePlanStringToExplain | When true,
Gluten will inject native plan tree to explain string inside
`WholeStageTransformerContext`.
[...]
+| spark.gluten.sql.fallbackRegexpExpressions | When true,
Gluten will fall back all regexp expressions to avoid any incompatibility risk.
[...]
## Velox Parameters
@@ -94,7 +95,6 @@ The following configurations are related to Velox settings.
| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled |
Enable velox orc scan. If disabled, vanilla spark orc scan will be used.
| true
|
| spark.gluten.sql.complexType.scan.fallback.enabled | Force
fallback for complex type scan, including struct, map, array.
| true
|
-
Additionally, you can control the configurations of gluten at thread level by
local property.
| Parameters | Description
| Recommend Setting |
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
