This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0d5db62aa7 [VL] Extract common createHiveConnectorSessionConfig
method and use it for Iceberg write (#11227)
0d5db62aa7 is described below
commit 0d5db62aa792690ec61d64bcfddffc0767621781
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Dec 1 19:54:48 2025 +0800
[VL] Extract common createHiveConnectorSessionConfig method and use it for
Iceberg write (#11227)
---
.../execution/enhanced/VeloxIcebergSuite.scala | 28 ++++++++++++++++++++++
cpp/velox/compute/VeloxBackend.cc | 2 +-
cpp/velox/compute/VeloxRuntime.cc | 2 +-
cpp/velox/compute/WholeStageResultIterator.cc | 25 +++----------------
cpp/velox/compute/WholeStageResultIterator.h | 7 ++----
cpp/velox/compute/iceberg/IcebergWriter.cc | 8 ++-----
.../operators/writer/VeloxParquetDataSourceABFS.h | 2 +-
.../operators/writer/VeloxParquetDataSourceHDFS.h | 2 +-
.../operators/writer/VeloxParquetDataSourceS3.h | 2 +-
cpp/velox/utils/ConfigExtractor.cc | 26 ++++++++++++++++++--
cpp/velox/utils/ConfigExtractor.h | 10 +++++---
11 files changed, 71 insertions(+), 43 deletions(-)
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index 5ff593a3ba..7b91b12b75 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -73,6 +73,34 @@ class VeloxIcebergSuite extends IcebergSuite {
}
}
+ test("iceberg insert partition table with uppercase partition name") {
+ withTable("iceberg_tb2") {
+ spark.sql("""
+ |create table if not exists iceberg_tb2(A int, b int)
+ |using iceberg
+ |partitioned by (A);
+ |""".stripMargin)
+ val df = spark.sql("""
+ |insert into table iceberg_tb2 values(1, 1)
+ |""".stripMargin)
+ assert(
+ df.queryExecution.executedPlan
+ .asInstanceOf[CommandResultExec]
+ .commandPhysicalPlan
+ .isInstanceOf[VeloxIcebergAppendDataExec])
+ checkAnswer(spark.sql("select * from iceberg_tb2"), Seq(Row(1, 1)))
+
+ val filePath = spark
+ .sql("select * from default.iceberg_tb2.files")
+ .select("file_path")
+ .collect()
+ .apply(0)
+ .getString(0)
+ val partitionPath = filePath.split('/').init.last
+ assert(partitionPath == "A=1")
+ }
+ }
+
test("iceberg read cow table - delete") {
withTable("iceberg_cow_tb") {
spark.sql("""
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index a7ab9cef7c..a99cac823b 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -147,7 +147,7 @@ void VeloxBackend::init(
// Set cache_prefetch_min_pct default as 0 to force all loads are prefetched
in DirectBufferInput.
FLAGS_cache_prefetch_min_pct = backendConf_->get<int>(kCachePrefetchMinPct,
0);
- auto hiveConf = getHiveConfig(backendConf_);
+ auto hiveConf = createHiveConnectorConfig(backendConf_);
// Setup and register.
velox::filesystems::registerLocalFileSystem();
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index c8290276a9..966f2af6af 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -181,7 +181,7 @@ std::shared_ptr<ResultIterator>
VeloxRuntime::createResultIterator(
scanInfos,
streamIds,
spillDir,
- veloxCfg_.get(),
+ veloxCfg_,
taskInfo_.has_value() ? taskInfo_.value() : SparkTaskInfo{});
return std::make_shared<ResultIterator>(std::move(wholeStageIter), this);
}
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 16a865ac31..da5d1c4c44 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -18,6 +18,7 @@
#include "VeloxBackend.h"
#include "VeloxRuntime.h"
#include "config/VeloxConfig.h"
+#include "utils/ConfigExtractor.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/PlanNodeStats.h"
@@ -71,7 +72,7 @@ WholeStageResultIterator::WholeStageResultIterator(
const std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
const std::vector<facebook::velox::core::PlanNodeId>& streamIds,
const std::string spillDir,
- const facebook::velox::config::ConfigBase* veloxCfg,
+ const std::shared_ptr<facebook::velox::config::ConfigBase>& veloxCfg,
const SparkTaskInfo& taskInfo)
: memoryManager_(memoryManager),
veloxCfg_(veloxCfg),
@@ -209,7 +210,7 @@ WholeStageResultIterator::WholeStageResultIterator(
std::shared_ptr<velox::core::QueryCtx>
WholeStageResultIterator::createNewVeloxQueryCtx() {
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>>
connectorConfigs;
- connectorConfigs[kHiveConnectorId] = createConnectorConfig();
+ connectorConfigs[kHiveConnectorId] =
createHiveConnectorSessionConfig(veloxCfg_);
std::shared_ptr<velox::core::QueryCtx> ctx = velox::core::QueryCtx::create(
nullptr,
facebook::velox::core::QueryConfig{getQueryContextConf()},
@@ -669,24 +670,4 @@ std::unordered_map<std::string, std::string>
WholeStageResultIterator::getQueryC
return configs;
}
-std::shared_ptr<velox::config::ConfigBase>
WholeStageResultIterator::createConnectorConfig() {
- // The configs below are used at session level.
- std::unordered_map<std::string, std::string> configs = {};
- // The semantics of reading as lower case is opposite with case-sensitive.
-
configs[velox::connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession]
=
- !veloxCfg_->get<bool>(kCaseSensitive, false) ? "true" : "false";
-
configs[velox::connector::hive::HiveConfig::kPartitionPathAsLowerCaseSession] =
"false";
- configs[velox::parquet::WriterOptions::kParquetSessionWriteTimestampUnit] =
"6";
- configs[velox::connector::hive::HiveConfig::kReadTimestampUnitSession] = "6";
- configs[velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession]
=
- std::to_string(veloxCfg_->get<int32_t>(kMaxPartitions, 10000));
- configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] =
- std::to_string(veloxCfg_->get<bool>(kIgnoreMissingFiles, false));
- configs[velox::connector::hive::HiveConfig::kParquetUseColumnNamesSession] =
- std::to_string(veloxCfg_->get<bool>(kParquetUseColumnNames, true));
- configs[velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] =
- std::to_string(veloxCfg_->get<bool>(kOrcUseColumnNames, true));
- return std::make_shared<velox::config::ConfigBase>(std::move(configs));
-}
-
} // namespace gluten
diff --git a/cpp/velox/compute/WholeStageResultIterator.h
b/cpp/velox/compute/WholeStageResultIterator.h
index bca377cf19..358b153c7a 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -42,7 +42,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator
{
const std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
const std::vector<facebook::velox::core::PlanNodeId>& streamIds,
const std::string spillDir,
- const facebook::velox::config::ConfigBase* veloxCfg,
+ const std::shared_ptr<facebook::velox::config::ConfigBase>& veloxCfg,
const SparkTaskInfo& taskInfo);
virtual ~WholeStageResultIterator() {
@@ -89,9 +89,6 @@ class WholeStageResultIterator : public ColumnarBatchIterator
{
const std::shared_ptr<const facebook::velox::core::PlanNode>&,
std::vector<facebook::velox::core::PlanNodeId>& nodeIds);
- /// Create connector config.
- std::shared_ptr<facebook::velox::config::ConfigBase> createConnectorConfig();
-
/// Construct partition columns.
void constructPartitionColumns(
std::unordered_map<std::string, std::optional<std::string>>&,
@@ -113,7 +110,7 @@ class WholeStageResultIterator : public
ColumnarBatchIterator {
VeloxMemoryManager* memoryManager_;
/// Config, task and plan.
- const config::ConfigBase* veloxCfg_;
+ const std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg_;
#ifdef GLUTEN_ENABLE_GPU
const bool enableCudf_;
#endif
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc
b/cpp/velox/compute/iceberg/IcebergWriter.cc
index 96a8561eca..0e671f1773 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.cc
+++ b/cpp/velox/compute/iceberg/IcebergWriter.cc
@@ -103,12 +103,8 @@ IcebergWriter::IcebergWriter(
: rowType_(rowType), field_(convertToIcebergNestedField(field)),
pool_(memoryPool), connectorPool_(connectorPool),
createTimeNs_(getCurrentTimeNano()) {
auto veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>(sparkConfs));
- connectorSessionProperties_ =
std::make_shared<facebook::velox::config::ConfigBase>(
- std::unordered_map<std::string, std::string>(), true);
- connectorSessionProperties_->set(
-
facebook::velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession,
- std::to_string(veloxCfg->get<int32_t>(kMaxPartitions, 10000)));
- connectorConfig_ =
std::make_shared<facebook::velox::connector::hive::HiveConfig>(getHiveConfig(veloxCfg));
+ connectorSessionProperties_ = createHiveConnectorSessionConfig(veloxCfg);
+ connectorConfig_ =
std::make_shared<facebook::velox::connector::hive::HiveConfig>(createHiveConnectorConfig(veloxCfg));
connectorQueryCtx_ = std::make_unique<connector::ConnectorQueryCtx>(
pool_.get(),
connectorPool_.get(),
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
b/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
index 7b8b6d2cab..50632fdd55 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
@@ -44,7 +44,7 @@ class VeloxParquetDataSourceABFS final : public
VeloxParquetDataSource {
: VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs) override {
- auto hiveConf = getHiveConfig(
+ auto hiveConf = createHiveConnectorConfig(
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>(sparkConfs)),
FileSystemType::kAbfs);
facebook::velox::filesystems::registerAzureClientProvider(*hiveConf);
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
b/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
index 2e7b313118..ba9d392ed2 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
@@ -43,7 +43,7 @@ class VeloxParquetDataSourceHDFS final : public
VeloxParquetDataSource {
: VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs) override {
- auto hiveConf = getHiveConfig(
+ auto hiveConf = createHiveConnectorConfig(
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>(sparkConfs)),
FileSystemType::kHdfs);
sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties =
hiveConf, .pool = sinkPool_.get()});
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
b/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
index f366953422..9fa170ec94 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
@@ -43,7 +43,7 @@ class VeloxParquetDataSourceS3 final : public
VeloxParquetDataSource {
: VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs) override {
- auto hiveConf = getHiveConfig(
+ auto hiveConf = createHiveConnectorConfig(
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>(sparkConfs)),
FileSystemType::kS3);
sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties =
hiveConf, .pool = sinkPool_.get()});
diff --git a/cpp/velox/utils/ConfigExtractor.cc
b/cpp/velox/utils/ConfigExtractor.cc
index 1d4cd7f859..5e4fa4ef2c 100644
--- a/cpp/velox/utils/ConfigExtractor.cc
+++ b/cpp/velox/utils/ConfigExtractor.cc
@@ -25,6 +25,7 @@
#include "utils/Macros.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h"
+#include "velox/dwio/parquet/writer/Writer.h"
namespace gluten {
@@ -218,6 +219,27 @@ void getAbfsHiveConfig(
} // namespace
+std::shared_ptr<facebook::velox::config::ConfigBase>
createHiveConnectorSessionConfig(
+ const std::shared_ptr<facebook::velox::config::ConfigBase>& conf) {
+ // The configs below are used at session level.
+ std::unordered_map<std::string, std::string> configs = {};
+ // The semantics of reading as lower case is opposite with case-sensitive.
+
configs[facebook::velox::connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession]
=
+ !conf->get<bool>(kCaseSensitive, false) ? "true" : "false";
+
configs[facebook::velox::connector::hive::HiveConfig::kPartitionPathAsLowerCaseSession]
= "false";
+
configs[facebook::velox::parquet::WriterOptions::kParquetSessionWriteTimestampUnit]
= std::string("6");
+
configs[facebook::velox::connector::hive::HiveConfig::kReadTimestampUnitSession]
= std::string("6");
+
configs[facebook::velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession]
=
+ conf->get<std::string>(kMaxPartitions, "10000");
+
configs[facebook::velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession]
=
+ conf->get<bool>(kIgnoreMissingFiles, false) ? "true" : "false";
+
configs[facebook::velox::connector::hive::HiveConfig::kParquetUseColumnNamesSession]
=
+ conf->get<bool>(kParquetUseColumnNames, true) ? "true" : "false";
+
configs[facebook::velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession]
=
+ conf->get<bool>(kOrcUseColumnNames, true) ? "true" : "false";
+ return
std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
+}
+
std::string getConfigValue(
const std::unordered_map<std::string, std::string>& confMap,
const std::string& key,
@@ -232,8 +254,8 @@ std::string getConfigValue(
return got->second;
}
-std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
- std::shared_ptr<facebook::velox::config::ConfigBase> conf,
+std::shared_ptr<facebook::velox::config::ConfigBase> createHiveConnectorConfig(
+ const std::shared_ptr<facebook::velox::config::ConfigBase>& conf,
FileSystemType fsType) {
std::unordered_map<std::string, std::string> hiveConfMap;
diff --git a/cpp/velox/utils/ConfigExtractor.h
b/cpp/velox/utils/ConfigExtractor.h
index 5b10c714f2..cc7529c140 100644
--- a/cpp/velox/utils/ConfigExtractor.h
+++ b/cpp/velox/utils/ConfigExtractor.h
@@ -23,20 +23,24 @@
#include <string>
#include <unordered_map>
-#include "config/GlutenConfig.h"
#include "velox/common/config/Config.h"
namespace gluten {
enum class FileSystemType : uint8_t { kHdfs, kS3, kAbfs, kGcs, kAll };
+/// Create hive connector session config.
+std::shared_ptr<facebook::velox::config::ConfigBase>
createHiveConnectorSessionConfig(
+ const std::shared_ptr<facebook::velox::config::ConfigBase>& conf);
+
std::string getConfigValue(
const std::unordered_map<std::string, std::string>& confMap,
const std::string& key,
const std::optional<std::string>& fallbackValue);
-std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
- std::shared_ptr<facebook::velox::config::ConfigBase> conf,
+/// Create hive connector config.
+std::shared_ptr<facebook::velox::config::ConfigBase> createHiveConnectorConfig(
+ const std::shared_ptr<facebook::velox::config::ConfigBase>& conf,
FileSystemType fsType = FileSystemType::kAll);
} // namespace gluten
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]