This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d7560eaa94 [GLUTEN-9571][VL] Support more parquet write configurations 
(#9572)
d7560eaa94 is described below

commit d7560eaa9401e55e7be6595f54dafcaf70b59b5e
Author: WangGuangxin <[email protected]>
AuthorDate: Wed Jul 23 16:28:54 2025 +0800

    [GLUTEN-9571][VL] Support more parquet write configurations (#9572)
    
    Support parquet writer configurations on Velox parquet write:
    
    parquet.page.size
    parquet.compression.codec.zstd.level
    parquet.enable.dictionary
    parquet.writer.version
    
    A doc on parquet write support for Spark/Gluten/Velox is also added
---
 .../velox/VeloxParquetWriterInjects.scala          |  7 ++++
 cpp/core/config/GlutenConfig.h                     |  8 ++++
 .../operators/writer/VeloxParquetDataSource.cc     | 37 ++++++++++++++---
 docs/Configuration.md                              | 46 ++++++++++++++++++++++
 .../org/apache/gluten/config/GlutenConfig.scala    |  4 ++
 5 files changed, 96 insertions(+), 6 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
index a66b271cb1..14f1c6d63b 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
@@ -47,6 +47,13 @@ class VeloxParquetWriterInjects extends 
VeloxFormatWriterInjects {
     options
       .get(GlutenConfig.PARQUET_GZIP_WINDOW_SIZE)
       .foreach(sparkOptions.put(GlutenConfig.PARQUET_GZIP_WINDOW_SIZE, _))
+
+    Seq(
+      GlutenConfig.PARQUET_ZSTD_COMPRESSION_LEVEL,
+      GlutenConfig.PARQUET_DATAPAGE_SIZE,
+      GlutenConfig.PARQUET_ENABLE_DICTIONARY,
+      GlutenConfig.PARQUET_WRITER_VERSION
+    ).foreach(key => options.get(key).foreach(sparkOptions.put(key, _)))
     sparkOptions.asJava
   }
 
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 5fb9040e32..98d992a354 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -58,6 +58,14 @@ const std::string kParquetBlockRows = "parquet.block.rows";
 const std::string kParquetGzipWindowSize = "parquet.gzip.windowSize";
 const std::string kGzipWindowSize4k = "4096";
 
+const std::string kParquetZSTDCompressionLevel = 
"parquet.compression.codec.zstd.level";
+
+const std::string kParquetDataPageSize = "parquet.page.size";
+
+const std::string kParquetEnableDictionary = "parquet.enable.dictionary";
+
+const std::string kParquetWriterVersion = "parquet.writer.version";
+
 const std::string kParquetCompressionCodec = 
"spark.sql.parquet.compression.codec";
 
 const std::string kColumnarToRowMemoryThreshold = 
"spark.gluten.sql.columnarToRowMemoryThreshold";
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSource.cc 
b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
index 4d67569d37..68215177a8 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSource.cc
+++ b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
@@ -31,6 +31,7 @@
 #include "velox/core/QueryConfig.h"
 #include "velox/core/QueryCtx.h"
 #include "velox/dwio/common/Options.h"
+#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
 
 using namespace facebook;
 using namespace facebook::velox::dwio::common;
@@ -46,17 +47,17 @@ std::unique_ptr<facebook::velox::parquet::WriterOptions> 
VeloxParquetDataSource:
     const std::unordered_map<std::string, std::string>& sparkConfs) {
   int64_t maxRowGroupBytes = 134217728; // 128MB
   int64_t maxRowGroupRows = 100000000; // 100M
-  if (sparkConfs.find(kParquetBlockSize) != sparkConfs.end()) {
-    maxRowGroupBytes = 
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockSize)->second));
+  if (auto it = sparkConfs.find(kParquetBlockSize); it != sparkConfs.end()) {
+    maxRowGroupBytes = static_cast<int64_t>(stoi(it->second));
   }
-  if (sparkConfs.find(kParquetBlockRows) != sparkConfs.end()) {
-    maxRowGroupRows = 
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockRows)->second));
+  if (auto it = sparkConfs.find(kParquetBlockRows); it != sparkConfs.end()) {
+    maxRowGroupRows = static_cast<int64_t>(stoi(it->second));
   }
   auto writeOption = 
std::make_unique<facebook::velox::parquet::WriterOptions>();
   writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds 
/*micro*/;
   auto compressionCodec = CompressionKind::CompressionKind_SNAPPY;
-  if (sparkConfs.find(kParquetCompressionCodec) != sparkConfs.end()) {
-    auto compressionCodecStr = 
sparkConfs.find(kParquetCompressionCodec)->second;
+  if (auto it = sparkConfs.find(kParquetCompressionCodec); it != 
sparkConfs.end()) {
+    auto compressionCodecStr = it->second;
     // spark support none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
     if (boost::iequals(compressionCodecStr, "snappy")) {
       compressionCodec = CompressionKind::CompressionKind_SNAPPY;
@@ -79,6 +80,12 @@ std::unique_ptr<facebook::velox::parquet::WriterOptions> 
VeloxParquetDataSource:
       compressionCodec = CompressionKind::CompressionKind_LZ4;
     } else if (boost::iequals(compressionCodecStr, "zstd")) {
       compressionCodec = CompressionKind::CompressionKind_ZSTD;
+      if (auto it = sparkConfs.find(kParquetZSTDCompressionLevel); it != 
sparkConfs.end()) {
+        auto compressionLevel = std::stoi(it->second);
+        auto codecOptions = 
std::make_shared<facebook::velox::parquet::arrow::util::CodecOptions>();
+        codecOptions->compression_level = compressionLevel;
+        writeOption->codecOptions = std::move(codecOptions);
+      }
     } else if (boost::iequals(compressionCodecStr, "uncompressed")) {
       compressionCodec = CompressionKind::CompressionKind_NONE;
     } else if (boost::iequals(compressionCodecStr, "none")) {
@@ -91,6 +98,24 @@ std::unique_ptr<facebook::velox::parquet::WriterOptions> 
VeloxParquetDataSource:
         maxRowGroupRows, maxRowGroupBytes, [&]() { return false; });
   };
   writeOption->parquetWriteTimestampTimeZone = getConfigValue(sparkConfs, 
kSessionTimezone, std::nullopt);
+  if (auto it = sparkConfs.find(kParquetDataPageSize); it != sparkConfs.end()) 
{
+    auto dataPageSize = std::stoi(it->second);
+    writeOption->dataPageSize = dataPageSize;
+  }
+  if (auto it = sparkConfs.find(kParquetWriterVersion); it != 
sparkConfs.end()) {
+    auto parquetVersion = it->second;
+    if (boost::iequals(parquetVersion, "v2")) {
+      writeOption->useParquetDataPageV2 = true;
+    }
+  }
+  if (auto it = sparkConfs.find(kParquetEnableDictionary); it != 
sparkConfs.end()) {
+    auto enableDictionary = it->second;
+    if (boost::iequals(enableDictionary, "true")) {
+      writeOption->enableDictionary = true;
+    } else {
+      writeOption->enableDictionary = false;
+    }
+  }
   return writeOption;
 }
 
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 6476cbfd2f..63619aee90 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -172,3 +172,49 @@ nav_order: 15
 | spark.gluten.supported.scala.udfs                                            
         || Supported scala udf names.                                          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.ui.enabled                                            | true    
          | Whether to enable the gluten web UI, If true, attach the gluten UI 
page to the Spark web UI.                                                       
                                                                                
                                                                                
                                                                                
               [...]
 
+## Parquet write configurations
+|                                                   | parquet-mr default       
                  | Spark default | Velox Default | Gluten Support |
+|---------------------------------------------------| 
------------------------------------------ |---------------| ------------- 
|----------------|
+| -------------------Spark----------------          |                          
                  |               |               |                |
+| spark.sql.parquet.outputTimestampType             |                          
                  | int96         |               |                |
+| spark.sql.parquet.writeLegacyFormat               |                          
                  | false         |               |                |
+| -------------------Velox/Arrow----------------    |                          
                  |               |               |                |
+| write_batch_size                                  |                          
                  |               | 1024          | Y (batch size) |
+| rowgroup_length                                   |                          
                  |               | 1M            |                |
+| compression_level                                 |                          
                  |               | 0             |                |
+| page_index                                        |                          
                  |               | false         |                |
+| decimal_as_integer                                |                          
                  |               | false         |                |
+| statistics_enabled                                |                          
                  |               | false         |                |
+| -------------------parquet-mr----------------     |                          
                  |               |               |                |
+| parquet.summary.metadata.level                    | all                      
                  |               |               |                |
+| parquet.enable.summary-metadata                   | true                     
                  |               |               |                |
+| parquet.block.size                                | 128m                     
                  |               |               | Y              |
+| parquet.page.size                                 | 1m                       
                  |               | 1M            | Y              |
+| parquet.compression                               | uncompressed             
                  | snappy        | uncompressed  | Y              |
+| parquet.write.support.class                       | 
org.apache.parquet.hadoop.api.WriteSupport |               |               |    
            |
+| parquet.enable.dictionary                         | true                     
                  |               | true          | Y              |
+| parquet.dictionary.page.size                      | 1m                       
                  |               | 1m            |                |
+| parquet.validation                                | false                    
                  |               |               |                |
+| parquet.writer.version                            | PARQUET_1_0              
                  |               | PARQUET_2_6   | Y              |
+| parquet.memory.pool.ratio                         | 0.95                     
                  |               |               |                |
+| parquet.memory.min.chunk.size                     | 1m                       
                  |               |               |                |
+| parquet.writer.max-padding                        | 8m                       
                  |               |               |                |
+| parquet.page.size.row.check.min                   | 100                      
                  |               |               |                |
+| parquet.page.size.row.check.max                   | 10000                    
                  |               |               |                |
+| parquet.page.value.count.threshold                | Integer.MAX_VALUE / 2    
                  |               |               |                |
+| parquet.page.size.check.estimate                  | true                     
                  |               |               |                |
+| parquet.columnindex.truncate.length               | 64                       
                  |               |               |                |
+| parquet.statistics.truncate.length                | 2147483647               
                  |               |               |                |
+| parquet.bloom.filter.enabled                      | false                    
                  |               |               |                |
+| parquet.bloom.filter.adaptive.enabled             | false                    
                  |               |               |                |
+| parquet.bloom.filter.candidates.number            | 5                        
                  |               |               |                |
+| parquet.bloom.filter.expected.ndv                 |                          
                  |               |               |                |
+| parquet.bloom.filter.fpp                          | 0.01                     
                  |               |               |                |
+| parquet.bloom.filter.max.bytes                    | 1m                       
                  |               |               |                |
+| parquet.decrypt.off-heap.buffer.enabled           | false                    
                  |               |               |                |
+| parquet.page.row.count.limit                      | 20000                    
                  |               |               |                |
+| parquet.page.write-checksum.enabled               | true                     
                  |               | false         |                |
+| parquet.crypto.factory.class                      | None                     
                  |               |               |                |
+| parquet.compression.codec.zstd.bufferPool.enabled | true                     
                  |               |               |                |
+| parquet.compression.codec.zstd.level              | 3                        
                  |               | 0             | Y              |
+| parquet.compression.codec.zstd.workers            | 0                        
                  |               |               |                |
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index a1b98314c7..1c094faa4c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -369,6 +369,10 @@ object GlutenConfig {
   val PARQUET_BLOCK_SIZE: String = "parquet.block.size"
   val PARQUET_BLOCK_ROWS: String = "parquet.block.rows"
   val PARQUET_GZIP_WINDOW_SIZE: String = "parquet.gzip.windowSize"
+  val PARQUET_ZSTD_COMPRESSION_LEVEL: String = 
"parquet.compression.codec.zstd.level"
+  val PARQUET_DATAPAGE_SIZE: String = "parquet.page.size"
+  val PARQUET_ENABLE_DICTIONARY: String = "parquet.enable.dictionary"
+  val PARQUET_WRITER_VERSION: String = "parquet.writer.version"
   // Hadoop config
   val HADOOP_PREFIX = "spark.hadoop."
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to