This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 97c74c2d9f [VL] Add support for some Parquet write options to 3.4 / 
3.5 to align with 3.2 / 3.3 (#8816)
97c74c2d9f is described below

commit 97c74c2d9f1e27e4698434553021e7c18e370b69
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Feb 25 14:38:37 2025 +0800

    [VL] Add support for some Parquet write options to 3.4 / 3.5 to align with 
3.2 / 3.3 (#8816)
---
 .../backendsapi/velox/VeloxTransformerApi.scala    | 41 ++++++++++-------
 .../operators/writer/VeloxParquetDataSource.cc     | 50 ++++++++++++---------
 .../operators/writer/VeloxParquetDataSource.h      |  7 +--
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        | 52 +++++++++++++---------
 4 files changed, 86 insertions(+), 64 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
index d156fffa8b..9949f8822a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
@@ -17,8 +17,11 @@
 package org.apache.gluten.backendsapi.velox
 
 import org.apache.gluten.backendsapi.{BackendsApiManager, TransformerApi}
+import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.execution.WriteFilesExecTransformer
+import org.apache.gluten.execution.datasource.GlutenFormatFactory
 import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.proto.ConfigMap
 import org.apache.gluten.runtime.Runtimes
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode}
 import org.apache.gluten.utils.InputPartitionsUtil
@@ -28,12 +31,13 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionDirectory}
-import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hive.execution.HiveFileFormat
 import org.apache.spark.sql.types._
 import org.apache.spark.task.TaskResources
 import org.apache.spark.util.collection.BitSet
 
-import com.google.protobuf.{Any, Message, StringValue}
+import com.google.protobuf.{Any, Message}
 
 import java.util.{Map => JMap}
 
@@ -97,21 +101,24 @@ class VeloxTransformerApi extends TransformerApi with 
Logging {
   override def packPBMessage(message: Message): Any = Any.pack(message, "")
 
   override def genWriteParameters(write: WriteFilesExecTransformer): Any = {
-    val fileFormatStr = write.fileFormat match {
-      case register: DataSourceRegister =>
-        register.shortName
-      case _ => "UnknownFileFormat"
+    write.fileFormat match {
+      case _ @(_: ParquetFileFormat | _: HiveFileFormat) =>
+        // Only Parquet is supported. It's safe to set a fixed "parquet" here
+        // because others already fell back by WriteFilesExecTransformer's 
validation.
+        val shortName = "parquet"
+        val nativeConf =
+          GlutenFormatFactory(shortName)
+            .nativeConf(
+              write.caseInsensitiveOptions,
+              
WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions))
+        packPBMessage(
+          ConfigMap
+            .newBuilder()
+            .putAllConfigs(nativeConf)
+            .putConfigs("format", shortName)
+            .build())
+      case _ =>
+        throw new GlutenException("Unsupported file write format: " + 
write.fileFormat)
     }
-    val compressionCodec =
-      
WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions).capitalize
-    val writeParametersStr = new StringBuffer("WriteParameters:")
-    writeParametersStr.append("is").append(compressionCodec).append("=1")
-    writeParametersStr.append(";format=").append(fileFormatStr).append("\n")
-
-    packPBMessage(
-      StringValue
-        .newBuilder()
-        .setValue(writeParametersStr.toString)
-        .build())
   }
 }
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSource.cc 
b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
index 15082b8b82..4d67569d37 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSource.cc
+++ b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
@@ -42,25 +42,18 @@ namespace {
 const int32_t kGzipWindowBits4k = 12;
 }
 
-void VeloxParquetDataSource::initSink(const std::unordered_map<std::string, 
std::string>& /* sparkConfs */) {
-  if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
-    sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
-  } else {
-    throw std::runtime_error("The file path is not local when writing data 
with parquet format in velox runtime!");
-  }
-}
-
-void VeloxParquetDataSource::init(const std::unordered_map<std::string, 
std::string>& sparkConfs) {
-  initSink(sparkConfs);
-
+std::unique_ptr<facebook::velox::parquet::WriterOptions> 
VeloxParquetDataSource::makeParquetWriteOption(
+    const std::unordered_map<std::string, std::string>& sparkConfs) {
+  int64_t maxRowGroupBytes = 134217728; // 128MB
+  int64_t maxRowGroupRows = 100000000; // 100M
   if (sparkConfs.find(kParquetBlockSize) != sparkConfs.end()) {
-    maxRowGroupBytes_ = 
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockSize)->second));
+    maxRowGroupBytes = 
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockSize)->second));
   }
   if (sparkConfs.find(kParquetBlockRows) != sparkConfs.end()) {
-    maxRowGroupRows_ = 
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockRows)->second));
+    maxRowGroupRows = 
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockRows)->second));
   }
-  velox::parquet::WriterOptions writeOption;
-  writeOption.parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds 
/*micro*/;
+  auto writeOption = 
std::make_unique<facebook::velox::parquet::WriterOptions>();
+  writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds 
/*micro*/;
   auto compressionCodec = CompressionKind::CompressionKind_SNAPPY;
   if (sparkConfs.find(kParquetCompressionCodec) != sparkConfs.end()) {
     auto compressionCodecStr = 
sparkConfs.find(kParquetCompressionCodec)->second;
@@ -74,7 +67,7 @@ void VeloxParquetDataSource::init(const 
std::unordered_map<std::string, std::str
         if (parquetGzipWindowSizeStr == kGzipWindowSize4k) {
           auto codecOptions = 
std::make_shared<facebook::velox::parquet::arrow::util::GZipCodecOptions>();
           codecOptions->window_bits = kGzipWindowBits4k;
-          writeOption.codecOptions = std::move(codecOptions);
+          writeOption->codecOptions = std::move(codecOptions);
         }
       }
     } else if (boost::iequals(compressionCodecStr, "lzo")) {
@@ -92,15 +85,28 @@ void VeloxParquetDataSource::init(const 
std::unordered_map<std::string, std::str
       compressionCodec = CompressionKind::CompressionKind_NONE;
     }
   }
-  writeOption.compressionKind = compressionCodec;
-  writeOption.flushPolicyFactory = [&]() {
+  writeOption->compressionKind = compressionCodec;
+  writeOption->flushPolicyFactory = [maxRowGroupRows, maxRowGroupBytes]() {
     return std::make_unique<velox::parquet::LambdaFlushPolicy>(
-        maxRowGroupRows_, maxRowGroupBytes_, [&]() { return false; });
+        maxRowGroupRows, maxRowGroupBytes, [&]() { return false; });
   };
-  writeOption.parquetWriteTimestampTimeZone = getConfigValue(sparkConfs, 
kSessionTimezone, std::nullopt);
-  auto schema = gluten::fromArrowSchema(schema_);
+  writeOption->parquetWriteTimestampTimeZone = getConfigValue(sparkConfs, 
kSessionTimezone, std::nullopt);
+  return writeOption;
+}
+
+void VeloxParquetDataSource::initSink(const std::unordered_map<std::string, 
std::string>& /* sparkConfs */) {
+  if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
+    sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
+  } else {
+    throw std::runtime_error("The file path is not local when writing data 
with parquet format in velox runtime!");
+  }
+}
 
-  parquetWriter_ = std::make_unique<velox::parquet::Writer>(std::move(sink_), 
writeOption, pool_, asRowType(schema));
+void VeloxParquetDataSource::init(const std::unordered_map<std::string, 
std::string>& sparkConfs) {
+  initSink(sparkConfs);
+  auto schema = gluten::fromArrowSchema(schema_);
+  const auto writeOption = makeParquetWriteOption(sparkConfs);
+  parquetWriter_ = std::make_unique<velox::parquet::Writer>(std::move(sink_), 
*writeOption, pool_, asRowType(schema));
 }
 
 void VeloxParquetDataSource::inspectSchema(struct ArrowSchema* out) {
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSource.h 
b/cpp/velox/operators/writer/VeloxParquetDataSource.h
index a94ab00b9e..0c9c168330 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSource.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSource.h
@@ -31,6 +31,7 @@
 #include "memory/VeloxColumnarBatch.h"
 #include "operators/writer/VeloxDataSource.h"
 
+#include "velox/common/compression/Compression.h"
 #include "velox/common/file/FileSystems.h"
 #ifdef ENABLE_S3
 #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h"
@@ -88,6 +89,9 @@ class VeloxParquetDataSource : public VeloxDataSource {
       std::shared_ptr<arrow::Schema> schema)
       : VeloxDataSource(filePath, schema), filePath_(filePath), 
schema_(schema), pool_(std::move(veloxPool)) {}
 
+  static std::unique_ptr<facebook::velox::parquet::WriterOptions> 
makeParquetWriteOption(
+      const std::unordered_map<std::string, std::string>& sparkConfs);
+
   void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override;
   virtual void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs);
   void inspectSchema(struct ArrowSchema* out) override;
@@ -103,9 +107,6 @@ class VeloxParquetDataSource : public VeloxDataSource {
   std::unique_ptr<facebook::velox::dwio::common::FileSink> sink_;
 
  private:
-  int64_t maxRowGroupBytes_ = 134217728; // 128MB
-  int64_t maxRowGroupRows_ = 100000000; // 100M
-
   std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<facebook::velox::parquet::Writer> parquetWriter_;
   std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 3661b76084..4e0e36cefb 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -26,9 +26,11 @@
 
 #include "utils/ConfigExtractor.h"
 
+#include "config.pb.h"
 #include "config/GlutenConfig.h"
 #include "config/VeloxConfig.h"
 #include "operators/plannodes/RowVectorStream.h"
+#include "operators/writer/VeloxParquetDataSource.h"
 
 namespace gluten {
 namespace {
@@ -533,6 +535,7 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle> 
makeHiveInsertTableHandl
     const std::vector<std::string>& partitionedBy,
     const std::shared_ptr<connector::hive::HiveBucketProperty>& bucketProperty,
     const std::shared_ptr<connector::hive::LocationHandle>& locationHandle,
+    const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
     const dwio::common::FileFormat& tableStorageFormat = 
dwio::common::FileFormat::PARQUET,
     const std::optional<common::CompressionKind>& compressionKind = {}) {
   std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>> 
columnHandles;
@@ -578,7 +581,13 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle> 
makeHiveInsertTableHandl
   VELOX_CHECK_EQ(numBucketColumns, bucketedBy.size());
   VELOX_CHECK_EQ(numSortingColumns, sortedBy.size());
   return std::make_shared<connector::hive::HiveInsertTableHandle>(
-      columnHandles, locationHandle, tableStorageFormat, bucketProperty, 
compressionKind);
+      columnHandles,
+      locationHandle,
+      tableStorageFormat,
+      bucketProperty,
+      compressionKind,
+      std::unordered_map<std::string, std::string>{},
+      writerOptions);
 }
 
 core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const 
::substrait::WriteRel& writeRel) {
@@ -646,30 +655,28 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
     writePath = "";
   }
 
-  // spark default compression code is snappy.
-  common::CompressionKind compressionCodec = 
common::CompressionKind::CompressionKind_SNAPPY;
-  if (writeRel.named_table().has_advanced_extension()) {
-    if 
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
 "isSnappy=")) {
-      compressionCodec = common::CompressionKind::CompressionKind_SNAPPY;
-    } else if 
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
 "isGzip=")) {
-      compressionCodec = common::CompressionKind::CompressionKind_GZIP;
-    } else if 
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
 "isLzo=")) {
-      compressionCodec = common::CompressionKind::CompressionKind_LZO;
-    } else if 
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
 "isLz4=")) {
-      compressionCodec = common::CompressionKind::CompressionKind_LZ4;
-    } else if 
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
 "isZstd=")) {
-      compressionCodec = common::CompressionKind::CompressionKind_ZSTD;
-    } else if 
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
 "isNone=")) {
-      compressionCodec = common::CompressionKind::CompressionKind_NONE;
-    } else if (SubstraitParser::configSetInOptimization(
-                   writeRel.named_table().advanced_extension(), 
"isUncompressed=")) {
-      compressionCodec = common::CompressionKind::CompressionKind_NONE;
-    }
+  GLUTEN_CHECK(writeRel.named_table().has_advanced_extension(), "Advanced 
extension not found in WriteRel");
+  const auto& ext = writeRel.named_table().advanced_extension();
+  GLUTEN_CHECK(ext.has_optimization(), "Extension optimization not found in 
WriteRel");
+  const auto& opt = ext.optimization();
+  gluten::ConfigMap confMap;
+  opt.UnpackTo(&confMap);
+  std::unordered_map<std::string, std::string> writeConfs;
+  for (const auto& item : *(confMap.mutable_configs())) {
+    writeConfs.emplace(item.first, item.second);
   }
 
   // Currently only support parquet format.
+  const std::string& formatShortName = writeConfs["format"];
+  GLUTEN_CHECK(formatShortName == "parquet", "Unsupported file write format: " 
+ formatShortName);
   dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET;
 
+  const std::shared_ptr<facebook::velox::parquet::WriterOptions> writerOptions 
=
+      VeloxParquetDataSource::makeParquetWriteOption(writeConfs);
+  // Spark's default compression code is snappy.
+  const auto& compressionKind =
+      
writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY);
+
   return std::make_shared<core::TableWriteNode>(
       nextPlanNodeId(),
       inputType,
@@ -682,9 +689,10 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
               inputType->children(),
               partitionedKey,
               bucketProperty,
-              makeLocationHandle(writePath, fileFormat, compressionCodec, 
bucketProperty != nullptr),
+              makeLocationHandle(writePath, fileFormat, compressionKind, 
bucketProperty != nullptr),
+              writerOptions,
               fileFormat,
-              compressionCodec)),
+              compressionKind)),
       (!partitionedKey.empty()),
       exec::TableWriteTraits::outputType(nullptr),
       connector::CommitStrategy::kNoCommit,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to