This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d7560eaa94 [GLUTEN-9571][VL] Support more parquet write configurations
(#9572)
d7560eaa94 is described below
commit d7560eaa9401e55e7be6595f54dafcaf70b59b5e
Author: WangGuangxin <[email protected]>
AuthorDate: Wed Jul 23 16:28:54 2025 +0800
[GLUTEN-9571][VL] Support more parquet write configurations (#9572)
Support parquet writer configurations on Velox parquet write:
parquet.page.size
parquet.compression.codec.zstd.level
parquet.enable.dictionary
parquet.writer.version
A doc on parquet write support for Spark/Gluten/Velox is also added
---
.../velox/VeloxParquetWriterInjects.scala | 7 ++++
cpp/core/config/GlutenConfig.h | 8 ++++
.../operators/writer/VeloxParquetDataSource.cc | 37 ++++++++++++++---
docs/Configuration.md | 46 ++++++++++++++++++++++
.../org/apache/gluten/config/GlutenConfig.scala | 4 ++
5 files changed, 96 insertions(+), 6 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
index a66b271cb1..14f1c6d63b 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
@@ -47,6 +47,13 @@ class VeloxParquetWriterInjects extends
VeloxFormatWriterInjects {
options
.get(GlutenConfig.PARQUET_GZIP_WINDOW_SIZE)
.foreach(sparkOptions.put(GlutenConfig.PARQUET_GZIP_WINDOW_SIZE, _))
+
+ Seq(
+ GlutenConfig.PARQUET_ZSTD_COMPRESSION_LEVEL,
+ GlutenConfig.PARQUET_DATAPAGE_SIZE,
+ GlutenConfig.PARQUET_ENABLE_DICTIONARY,
+ GlutenConfig.PARQUET_WRITER_VERSION
+ ).foreach(key => options.get(key).foreach(sparkOptions.put(key, _)))
sparkOptions.asJava
}
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 5fb9040e32..98d992a354 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -58,6 +58,14 @@ const std::string kParquetBlockRows = "parquet.block.rows";
const std::string kParquetGzipWindowSize = "parquet.gzip.windowSize";
const std::string kGzipWindowSize4k = "4096";
+const std::string kParquetZSTDCompressionLevel =
"parquet.compression.codec.zstd.level";
+
+const std::string kParquetDataPageSize = "parquet.page.size";
+
+const std::string kParquetEnableDictionary = "parquet.enable.dictionary";
+
+const std::string kParquetWriterVersion = "parquet.writer.version";
+
const std::string kParquetCompressionCodec =
"spark.sql.parquet.compression.codec";
const std::string kColumnarToRowMemoryThreshold =
"spark.gluten.sql.columnarToRowMemoryThreshold";
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSource.cc
b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
index 4d67569d37..68215177a8 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSource.cc
+++ b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
@@ -31,6 +31,7 @@
#include "velox/core/QueryConfig.h"
#include "velox/core/QueryCtx.h"
#include "velox/dwio/common/Options.h"
+#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
using namespace facebook;
using namespace facebook::velox::dwio::common;
@@ -46,17 +47,17 @@ std::unique_ptr<facebook::velox::parquet::WriterOptions>
VeloxParquetDataSource:
const std::unordered_map<std::string, std::string>& sparkConfs) {
int64_t maxRowGroupBytes = 134217728; // 128MB
int64_t maxRowGroupRows = 100000000; // 100M
- if (sparkConfs.find(kParquetBlockSize) != sparkConfs.end()) {
- maxRowGroupBytes =
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockSize)->second));
+ if (auto it = sparkConfs.find(kParquetBlockSize); it != sparkConfs.end()) {
+ maxRowGroupBytes = static_cast<int64_t>(stoi(it->second));
}
- if (sparkConfs.find(kParquetBlockRows) != sparkConfs.end()) {
- maxRowGroupRows =
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockRows)->second));
+ if (auto it = sparkConfs.find(kParquetBlockRows); it != sparkConfs.end()) {
+ maxRowGroupRows = static_cast<int64_t>(stoi(it->second));
}
auto writeOption =
std::make_unique<facebook::velox::parquet::WriterOptions>();
writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds
/*micro*/;
auto compressionCodec = CompressionKind::CompressionKind_SNAPPY;
- if (sparkConfs.find(kParquetCompressionCodec) != sparkConfs.end()) {
- auto compressionCodecStr =
sparkConfs.find(kParquetCompressionCodec)->second;
+ if (auto it = sparkConfs.find(kParquetCompressionCodec); it !=
sparkConfs.end()) {
+ auto compressionCodecStr = it->second;
// spark support none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
if (boost::iequals(compressionCodecStr, "snappy")) {
compressionCodec = CompressionKind::CompressionKind_SNAPPY;
@@ -79,6 +80,12 @@ std::unique_ptr<facebook::velox::parquet::WriterOptions>
VeloxParquetDataSource:
compressionCodec = CompressionKind::CompressionKind_LZ4;
} else if (boost::iequals(compressionCodecStr, "zstd")) {
compressionCodec = CompressionKind::CompressionKind_ZSTD;
+ if (auto it = sparkConfs.find(kParquetZSTDCompressionLevel); it !=
sparkConfs.end()) {
+ auto compressionLevel = std::stoi(it->second);
+ auto codecOptions =
std::make_shared<facebook::velox::parquet::arrow::util::CodecOptions>();
+ codecOptions->compression_level = compressionLevel;
+ writeOption->codecOptions = std::move(codecOptions);
+ }
} else if (boost::iequals(compressionCodecStr, "uncompressed")) {
compressionCodec = CompressionKind::CompressionKind_NONE;
} else if (boost::iequals(compressionCodecStr, "none")) {
@@ -91,6 +98,24 @@ std::unique_ptr<facebook::velox::parquet::WriterOptions>
VeloxParquetDataSource:
maxRowGroupRows, maxRowGroupBytes, [&]() { return false; });
};
writeOption->parquetWriteTimestampTimeZone = getConfigValue(sparkConfs,
kSessionTimezone, std::nullopt);
+ if (auto it = sparkConfs.find(kParquetDataPageSize); it != sparkConfs.end())
{
+ auto dataPageSize = std::stoi(it->second);
+ writeOption->dataPageSize = dataPageSize;
+ }
+ if (auto it = sparkConfs.find(kParquetWriterVersion); it !=
sparkConfs.end()) {
+ auto parquetVersion = it->second;
+ if (boost::iequals(parquetVersion, "v2")) {
+ writeOption->useParquetDataPageV2 = true;
+ }
+ }
+ if (auto it = sparkConfs.find(kParquetEnableDictionary); it !=
sparkConfs.end()) {
+ auto enableDictionary = it->second;
+ if (boost::iequals(enableDictionary, "true")) {
+ writeOption->enableDictionary = true;
+ } else {
+ writeOption->enableDictionary = false;
+ }
+ }
return writeOption;
}
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 6476cbfd2f..63619aee90 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -172,3 +172,49 @@ nav_order: 15
| spark.gluten.supported.scala.udfs
|| Supported scala udf names.
[...]
| spark.gluten.ui.enabled | true
| Whether to enable the gluten web UI, If true, attach the gluten UI
page to the Spark web UI.
[...]
+## Parquet write configurations
+| | parquet-mr default
| Spark default | Velox Default | Gluten Support |
+|---------------------------------------------------|
------------------------------------------ |---------------| -------------
|----------------|
+| -------------------Spark---------------- |
| | | |
+| spark.sql.parquet.outputTimestampType |
| int96 | | |
+| spark.sql.parquet.writeLegacyFormat |
| false | | |
+| -------------------Velox/Arrow---------------- |
| | | |
+| write_batch_size |
| | 1024 | Y (batch size) |
+| rowgroup_length |
| | 1M | |
+| compression_level |
| | 0 | |
+| page_index |
| | false | |
+| decimal_as_integer |
| | false | |
+| statistics_enabled |
| | false | |
+| -------------------parquet-mr---------------- |
| | | |
+| parquet.summary.metadata.level | all
| | | |
+| parquet.enable.summary-metadata | true
| | | |
+| parquet.block.size | 128m
| | | Y |
+| parquet.page.size | 1m
| | 1M | Y |
+| parquet.compression | uncompressed
| snappy | uncompressed | Y |
+| parquet.write.support.class |
org.apache.parquet.hadoop.api.WriteSupport | | |
|
+| parquet.enable.dictionary | true
| | true | Y |
+| parquet.dictionary.page.size | 1m
| | 1m | |
+| parquet.validation | false
| | | |
+| parquet.writer.version | PARQUET_1_0
| | PARQUET_2_6 | Y |
+| parquet.memory.pool.ratio | 0.95
| | | |
+| parquet.memory.min.chunk.size | 1m
| | | |
+| parquet.writer.max-padding | 8m
| | | |
+| parquet.page.size.row.check.min | 100
| | | |
+| parquet.page.size.row.check.max | 10000
| | | |
+| parquet.page.value.count.threshold | Integer.MAX_VALUE / 2
| | | |
+| parquet.page.size.check.estimate | true
| | | |
+| parquet.columnindex.truncate.length | 64
| | | |
+| parquet.statistics.truncate.length | 2147483647
| | | |
+| parquet.bloom.filter.enabled | false
| | | |
+| parquet.bloom.filter.adaptive.enabled | false
| | | |
+| parquet.bloom.filter.candidates.number | 5
| | | |
+| parquet.bloom.filter.expected.ndv |
| | | |
+| parquet.bloom.filter.fpp | 0.01
| | | |
+| parquet.bloom.filter.max.bytes | 1m
| | | |
+| parquet.decrypt.off-heap.buffer.enabled | false
| | | |
+| parquet.page.row.count.limit | 20000
| | | |
+| parquet.page.write-checksum.enabled | true
| | false | |
+| parquet.crypto.factory.class | None
| | | |
+| parquet.compression.codec.zstd.bufferPool.enabled | true
| | | |
+| parquet.compression.codec.zstd.level | 3
| | 0 | Y |
+| parquet.compression.codec.zstd.workers | 0
| | | |
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index a1b98314c7..1c094faa4c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -369,6 +369,10 @@ object GlutenConfig {
val PARQUET_BLOCK_SIZE: String = "parquet.block.size"
val PARQUET_BLOCK_ROWS: String = "parquet.block.rows"
val PARQUET_GZIP_WINDOW_SIZE: String = "parquet.gzip.windowSize"
+ val PARQUET_ZSTD_COMPRESSION_LEVEL: String =
"parquet.compression.codec.zstd.level"
+ val PARQUET_DATAPAGE_SIZE: String = "parquet.page.size"
+ val PARQUET_ENABLE_DICTIONARY: String = "parquet.enable.dictionary"
+ val PARQUET_WRITER_VERSION: String = "parquet.writer.version"
// Hadoop config
val HADOOP_PREFIX = "spark.hadoop."
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]