This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 97c74c2d9f [VL] Add support for some Parquet write options to 3.4 /
3.5 to align with 3.2 / 3.3 (#8816)
97c74c2d9f is described below
commit 97c74c2d9f1e27e4698434553021e7c18e370b69
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Feb 25 14:38:37 2025 +0800
[VL] Add support for some Parquet write options to 3.4 / 3.5 to align with
3.2 / 3.3 (#8816)
---
.../backendsapi/velox/VeloxTransformerApi.scala | 41 ++++++++++-------
.../operators/writer/VeloxParquetDataSource.cc | 50 ++++++++++++---------
.../operators/writer/VeloxParquetDataSource.h | 7 +--
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 52 +++++++++++++---------
4 files changed, 86 insertions(+), 64 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
index d156fffa8b..9949f8822a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
@@ -17,8 +17,11 @@
package org.apache.gluten.backendsapi.velox
import org.apache.gluten.backendsapi.{BackendsApiManager, TransformerApi}
+import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.WriteFilesExecTransformer
+import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.proto.ConfigMap
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode}
import org.apache.gluten.utils.InputPartitionsUtil
@@ -28,12 +31,13 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
-import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.types._
import org.apache.spark.task.TaskResources
import org.apache.spark.util.collection.BitSet
-import com.google.protobuf.{Any, Message, StringValue}
+import com.google.protobuf.{Any, Message}
import java.util.{Map => JMap}
@@ -97,21 +101,24 @@ class VeloxTransformerApi extends TransformerApi with
Logging {
override def packPBMessage(message: Message): Any = Any.pack(message, "")
override def genWriteParameters(write: WriteFilesExecTransformer): Any = {
- val fileFormatStr = write.fileFormat match {
- case register: DataSourceRegister =>
- register.shortName
- case _ => "UnknownFileFormat"
+ write.fileFormat match {
+ case _ @(_: ParquetFileFormat | _: HiveFileFormat) =>
+ // Only Parquet is supported. It's safe to set a fixed "parquet" here
+ // because others already fell back by WriteFilesExecTransformer's
validation.
+ val shortName = "parquet"
+ val nativeConf =
+ GlutenFormatFactory(shortName)
+ .nativeConf(
+ write.caseInsensitiveOptions,
+
WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions))
+ packPBMessage(
+ ConfigMap
+ .newBuilder()
+ .putAllConfigs(nativeConf)
+ .putConfigs("format", shortName)
+ .build())
+ case _ =>
+ throw new GlutenException("Unsupported file write format: " +
write.fileFormat)
}
- val compressionCodec =
-
WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions).capitalize
- val writeParametersStr = new StringBuffer("WriteParameters:")
- writeParametersStr.append("is").append(compressionCodec).append("=1")
- writeParametersStr.append(";format=").append(fileFormatStr).append("\n")
-
- packPBMessage(
- StringValue
- .newBuilder()
- .setValue(writeParametersStr.toString)
- .build())
}
}
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSource.cc
b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
index 15082b8b82..4d67569d37 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSource.cc
+++ b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
@@ -42,25 +42,18 @@ namespace {
const int32_t kGzipWindowBits4k = 12;
}
-void VeloxParquetDataSource::initSink(const std::unordered_map<std::string,
std::string>& /* sparkConfs */) {
- if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
- sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
- } else {
- throw std::runtime_error("The file path is not local when writing data
with parquet format in velox runtime!");
- }
-}
-
-void VeloxParquetDataSource::init(const std::unordered_map<std::string,
std::string>& sparkConfs) {
- initSink(sparkConfs);
-
+std::unique_ptr<facebook::velox::parquet::WriterOptions>
VeloxParquetDataSource::makeParquetWriteOption(
+ const std::unordered_map<std::string, std::string>& sparkConfs) {
+ int64_t maxRowGroupBytes = 134217728; // 128MB
+ int64_t maxRowGroupRows = 100000000; // 100M
if (sparkConfs.find(kParquetBlockSize) != sparkConfs.end()) {
- maxRowGroupBytes_ =
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockSize)->second));
+ maxRowGroupBytes =
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockSize)->second));
}
if (sparkConfs.find(kParquetBlockRows) != sparkConfs.end()) {
- maxRowGroupRows_ =
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockRows)->second));
+ maxRowGroupRows =
static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockRows)->second));
}
- velox::parquet::WriterOptions writeOption;
- writeOption.parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds
/*micro*/;
+ auto writeOption =
std::make_unique<facebook::velox::parquet::WriterOptions>();
+ writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds
/*micro*/;
auto compressionCodec = CompressionKind::CompressionKind_SNAPPY;
if (sparkConfs.find(kParquetCompressionCodec) != sparkConfs.end()) {
auto compressionCodecStr =
sparkConfs.find(kParquetCompressionCodec)->second;
@@ -74,7 +67,7 @@ void VeloxParquetDataSource::init(const
std::unordered_map<std::string, std::str
if (parquetGzipWindowSizeStr == kGzipWindowSize4k) {
auto codecOptions =
std::make_shared<facebook::velox::parquet::arrow::util::GZipCodecOptions>();
codecOptions->window_bits = kGzipWindowBits4k;
- writeOption.codecOptions = std::move(codecOptions);
+ writeOption->codecOptions = std::move(codecOptions);
}
}
} else if (boost::iequals(compressionCodecStr, "lzo")) {
@@ -92,15 +85,28 @@ void VeloxParquetDataSource::init(const
std::unordered_map<std::string, std::str
compressionCodec = CompressionKind::CompressionKind_NONE;
}
}
- writeOption.compressionKind = compressionCodec;
- writeOption.flushPolicyFactory = [&]() {
+ writeOption->compressionKind = compressionCodec;
+ writeOption->flushPolicyFactory = [maxRowGroupRows, maxRowGroupBytes]() {
return std::make_unique<velox::parquet::LambdaFlushPolicy>(
- maxRowGroupRows_, maxRowGroupBytes_, [&]() { return false; });
+ maxRowGroupRows, maxRowGroupBytes, [&]() { return false; });
};
- writeOption.parquetWriteTimestampTimeZone = getConfigValue(sparkConfs,
kSessionTimezone, std::nullopt);
- auto schema = gluten::fromArrowSchema(schema_);
+ writeOption->parquetWriteTimestampTimeZone = getConfigValue(sparkConfs,
kSessionTimezone, std::nullopt);
+ return writeOption;
+}
+
+void VeloxParquetDataSource::initSink(const std::unordered_map<std::string,
std::string>& /* sparkConfs */) {
+ if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
+ sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
+ } else {
+ throw std::runtime_error("The file path is not local when writing data
with parquet format in velox runtime!");
+ }
+}
- parquetWriter_ = std::make_unique<velox::parquet::Writer>(std::move(sink_),
writeOption, pool_, asRowType(schema));
+void VeloxParquetDataSource::init(const std::unordered_map<std::string,
std::string>& sparkConfs) {
+ initSink(sparkConfs);
+ auto schema = gluten::fromArrowSchema(schema_);
+ const auto writeOption = makeParquetWriteOption(sparkConfs);
+ parquetWriter_ = std::make_unique<velox::parquet::Writer>(std::move(sink_),
*writeOption, pool_, asRowType(schema));
}
void VeloxParquetDataSource::inspectSchema(struct ArrowSchema* out) {
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSource.h
b/cpp/velox/operators/writer/VeloxParquetDataSource.h
index a94ab00b9e..0c9c168330 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSource.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSource.h
@@ -31,6 +31,7 @@
#include "memory/VeloxColumnarBatch.h"
#include "operators/writer/VeloxDataSource.h"
+#include "velox/common/compression/Compression.h"
#include "velox/common/file/FileSystems.h"
#ifdef ENABLE_S3
#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h"
@@ -88,6 +89,9 @@ class VeloxParquetDataSource : public VeloxDataSource {
std::shared_ptr<arrow::Schema> schema)
: VeloxDataSource(filePath, schema), filePath_(filePath),
schema_(schema), pool_(std::move(veloxPool)) {}
+ static std::unique_ptr<facebook::velox::parquet::WriterOptions>
makeParquetWriteOption(
+ const std::unordered_map<std::string, std::string>& sparkConfs);
+
void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override;
virtual void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs);
void inspectSchema(struct ArrowSchema* out) override;
@@ -103,9 +107,6 @@ class VeloxParquetDataSource : public VeloxDataSource {
std::unique_ptr<facebook::velox::dwio::common::FileSink> sink_;
private:
- int64_t maxRowGroupBytes_ = 134217728; // 128MB
- int64_t maxRowGroupRows_ = 100000000; // 100M
-
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<facebook::velox::parquet::Writer> parquetWriter_;
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 3661b76084..4e0e36cefb 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -26,9 +26,11 @@
#include "utils/ConfigExtractor.h"
+#include "config.pb.h"
#include "config/GlutenConfig.h"
#include "config/VeloxConfig.h"
#include "operators/plannodes/RowVectorStream.h"
+#include "operators/writer/VeloxParquetDataSource.h"
namespace gluten {
namespace {
@@ -533,6 +535,7 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle>
makeHiveInsertTableHandl
const std::vector<std::string>& partitionedBy,
const std::shared_ptr<connector::hive::HiveBucketProperty>& bucketProperty,
const std::shared_ptr<connector::hive::LocationHandle>& locationHandle,
+ const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
const dwio::common::FileFormat& tableStorageFormat =
dwio::common::FileFormat::PARQUET,
const std::optional<common::CompressionKind>& compressionKind = {}) {
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>>
columnHandles;
@@ -578,7 +581,13 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle>
makeHiveInsertTableHandl
VELOX_CHECK_EQ(numBucketColumns, bucketedBy.size());
VELOX_CHECK_EQ(numSortingColumns, sortedBy.size());
return std::make_shared<connector::hive::HiveInsertTableHandle>(
- columnHandles, locationHandle, tableStorageFormat, bucketProperty,
compressionKind);
+ columnHandles,
+ locationHandle,
+ tableStorageFormat,
+ bucketProperty,
+ compressionKind,
+ std::unordered_map<std::string, std::string>{},
+ writerOptions);
}
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const
::substrait::WriteRel& writeRel) {
@@ -646,30 +655,28 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
writePath = "";
}
- // spark default compression code is snappy.
- common::CompressionKind compressionCodec =
common::CompressionKind::CompressionKind_SNAPPY;
- if (writeRel.named_table().has_advanced_extension()) {
- if
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
"isSnappy=")) {
- compressionCodec = common::CompressionKind::CompressionKind_SNAPPY;
- } else if
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
"isGzip=")) {
- compressionCodec = common::CompressionKind::CompressionKind_GZIP;
- } else if
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
"isLzo=")) {
- compressionCodec = common::CompressionKind::CompressionKind_LZO;
- } else if
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
"isLz4=")) {
- compressionCodec = common::CompressionKind::CompressionKind_LZ4;
- } else if
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
"isZstd=")) {
- compressionCodec = common::CompressionKind::CompressionKind_ZSTD;
- } else if
(SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(),
"isNone=")) {
- compressionCodec = common::CompressionKind::CompressionKind_NONE;
- } else if (SubstraitParser::configSetInOptimization(
- writeRel.named_table().advanced_extension(),
"isUncompressed=")) {
- compressionCodec = common::CompressionKind::CompressionKind_NONE;
- }
+ GLUTEN_CHECK(writeRel.named_table().has_advanced_extension(), "Advanced
extension not found in WriteRel");
+ const auto& ext = writeRel.named_table().advanced_extension();
+ GLUTEN_CHECK(ext.has_optimization(), "Extension optimization not found in
WriteRel");
+ const auto& opt = ext.optimization();
+ gluten::ConfigMap confMap;
+ opt.UnpackTo(&confMap);
+ std::unordered_map<std::string, std::string> writeConfs;
+ for (const auto& item : *(confMap.mutable_configs())) {
+ writeConfs.emplace(item.first, item.second);
}
// Currently only support parquet format.
+ const std::string& formatShortName = writeConfs["format"];
+ GLUTEN_CHECK(formatShortName == "parquet", "Unsupported file write format: "
+ formatShortName);
dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET;
+ const std::shared_ptr<facebook::velox::parquet::WriterOptions> writerOptions
=
+ VeloxParquetDataSource::makeParquetWriteOption(writeConfs);
+ // Spark's default compression code is snappy.
+ const auto& compressionKind =
+
writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY);
+
return std::make_shared<core::TableWriteNode>(
nextPlanNodeId(),
inputType,
@@ -682,9 +689,10 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
inputType->children(),
partitionedKey,
bucketProperty,
- makeLocationHandle(writePath, fileFormat, compressionCodec,
bucketProperty != nullptr),
+ makeLocationHandle(writePath, fileFormat, compressionKind,
bucketProperty != nullptr),
+ writerOptions,
fileFormat,
- compressionCodec)),
+ compressionKind)),
(!partitionedKey.empty()),
exec::TableWriteTraits::outputType(nullptr),
connector::CommitStrategy::kNoCommit,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]