This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d5db62aa7  [VL] Extract common createHiveConnectorSessionConfig 
method and use it for Iceberg write (#11227)
0d5db62aa7 is described below

commit 0d5db62aa792690ec61d64bcfddffc0767621781
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Dec 1 19:54:48 2025 +0800

     [VL] Extract common createHiveConnectorSessionConfig method and use it for 
Iceberg write (#11227)
---
 .../execution/enhanced/VeloxIcebergSuite.scala     | 28 ++++++++++++++++++++++
 cpp/velox/compute/VeloxBackend.cc                  |  2 +-
 cpp/velox/compute/VeloxRuntime.cc                  |  2 +-
 cpp/velox/compute/WholeStageResultIterator.cc      | 25 +++----------------
 cpp/velox/compute/WholeStageResultIterator.h       |  7 ++----
 cpp/velox/compute/iceberg/IcebergWriter.cc         |  8 ++-----
 .../operators/writer/VeloxParquetDataSourceABFS.h  |  2 +-
 .../operators/writer/VeloxParquetDataSourceHDFS.h  |  2 +-
 .../operators/writer/VeloxParquetDataSourceS3.h    |  2 +-
 cpp/velox/utils/ConfigExtractor.cc                 | 26 ++++++++++++++++++--
 cpp/velox/utils/ConfigExtractor.h                  | 10 +++++---
 11 files changed, 71 insertions(+), 43 deletions(-)

diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index 5ff593a3ba..7b91b12b75 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -73,6 +73,34 @@ class VeloxIcebergSuite extends IcebergSuite {
     }
   }
 
+  test("iceberg insert partition table with uppercase partition name") {
+    withTable("iceberg_tb2") {
+      spark.sql("""
+                  |create table if not exists iceberg_tb2(A int, b int)
+                  |using iceberg
+                  |partitioned by (A);
+                  |""".stripMargin)
+      val df = spark.sql("""
+                           |insert into table iceberg_tb2 values(1, 1)
+                           |""".stripMargin)
+      assert(
+        df.queryExecution.executedPlan
+          .asInstanceOf[CommandResultExec]
+          .commandPhysicalPlan
+          .isInstanceOf[VeloxIcebergAppendDataExec])
+      checkAnswer(spark.sql("select * from iceberg_tb2"), Seq(Row(1, 1)))
+
+      val filePath = spark
+        .sql("select * from default.iceberg_tb2.files")
+        .select("file_path")
+        .collect()
+        .apply(0)
+        .getString(0)
+      val partitionPath = filePath.split('/').init.last
+      assert(partitionPath == "A=1")
+    }
+  }
+
   test("iceberg read cow table - delete") {
     withTable("iceberg_cow_tb") {
       spark.sql("""
diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index a7ab9cef7c..a99cac823b 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -147,7 +147,7 @@ void VeloxBackend::init(
   // Set cache_prefetch_min_pct default as 0 to force all loads are prefetched 
in DirectBufferInput.
   FLAGS_cache_prefetch_min_pct = backendConf_->get<int>(kCachePrefetchMinPct, 
0);
 
-  auto hiveConf = getHiveConfig(backendConf_);
+  auto hiveConf = createHiveConnectorConfig(backendConf_);
 
   // Setup and register.
   velox::filesystems::registerLocalFileSystem();
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index c8290276a9..966f2af6af 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -181,7 +181,7 @@ std::shared_ptr<ResultIterator> 
VeloxRuntime::createResultIterator(
       scanInfos,
       streamIds,
       spillDir,
-      veloxCfg_.get(),
+      veloxCfg_,
       taskInfo_.has_value() ? taskInfo_.value() : SparkTaskInfo{});
   return std::make_shared<ResultIterator>(std::move(wholeStageIter), this);
 }
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 16a865ac31..da5d1c4c44 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -18,6 +18,7 @@
 #include "VeloxBackend.h"
 #include "VeloxRuntime.h"
 #include "config/VeloxConfig.h"
+#include "utils/ConfigExtractor.h"
 #include "velox/connectors/hive/HiveConfig.h"
 #include "velox/connectors/hive/HiveConnectorSplit.h"
 #include "velox/exec/PlanNodeStats.h"
@@ -71,7 +72,7 @@ WholeStageResultIterator::WholeStageResultIterator(
     const std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
     const std::vector<facebook::velox::core::PlanNodeId>& streamIds,
     const std::string spillDir,
-    const facebook::velox::config::ConfigBase* veloxCfg,
+    const std::shared_ptr<facebook::velox::config::ConfigBase>& veloxCfg,
     const SparkTaskInfo& taskInfo)
     : memoryManager_(memoryManager),
       veloxCfg_(veloxCfg),
@@ -209,7 +210,7 @@ WholeStageResultIterator::WholeStageResultIterator(
 
 std::shared_ptr<velox::core::QueryCtx> 
WholeStageResultIterator::createNewVeloxQueryCtx() {
   std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>> 
connectorConfigs;
-  connectorConfigs[kHiveConnectorId] = createConnectorConfig();
+  connectorConfigs[kHiveConnectorId] = 
createHiveConnectorSessionConfig(veloxCfg_);
   std::shared_ptr<velox::core::QueryCtx> ctx = velox::core::QueryCtx::create(
       nullptr,
       facebook::velox::core::QueryConfig{getQueryContextConf()},
@@ -669,24 +670,4 @@ std::unordered_map<std::string, std::string> 
WholeStageResultIterator::getQueryC
   return configs;
 }
 
-std::shared_ptr<velox::config::ConfigBase> 
WholeStageResultIterator::createConnectorConfig() {
-  // The configs below are used at session level.
-  std::unordered_map<std::string, std::string> configs = {};
-  // The semantics of reading as lower case is opposite with case-sensitive.
-  
configs[velox::connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession]
 =
-      !veloxCfg_->get<bool>(kCaseSensitive, false) ? "true" : "false";
-  
configs[velox::connector::hive::HiveConfig::kPartitionPathAsLowerCaseSession] = 
"false";
-  configs[velox::parquet::WriterOptions::kParquetSessionWriteTimestampUnit] = 
"6";
-  configs[velox::connector::hive::HiveConfig::kReadTimestampUnitSession] = "6";
-  configs[velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession] 
=
-      std::to_string(veloxCfg_->get<int32_t>(kMaxPartitions, 10000));
-  configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] =
-      std::to_string(veloxCfg_->get<bool>(kIgnoreMissingFiles, false));
-  configs[velox::connector::hive::HiveConfig::kParquetUseColumnNamesSession] =
-      std::to_string(veloxCfg_->get<bool>(kParquetUseColumnNames, true));
-  configs[velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] =
-      std::to_string(veloxCfg_->get<bool>(kOrcUseColumnNames, true));
-  return std::make_shared<velox::config::ConfigBase>(std::move(configs));
-}
-
 } // namespace gluten
diff --git a/cpp/velox/compute/WholeStageResultIterator.h 
b/cpp/velox/compute/WholeStageResultIterator.h
index bca377cf19..358b153c7a 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -42,7 +42,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator 
{
       const std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
       const std::vector<facebook::velox::core::PlanNodeId>& streamIds,
       const std::string spillDir,
-      const facebook::velox::config::ConfigBase* veloxCfg,
+      const std::shared_ptr<facebook::velox::config::ConfigBase>& veloxCfg,
       const SparkTaskInfo& taskInfo);
 
   virtual ~WholeStageResultIterator() {
@@ -89,9 +89,6 @@ class WholeStageResultIterator : public ColumnarBatchIterator 
{
       const std::shared_ptr<const facebook::velox::core::PlanNode>&,
       std::vector<facebook::velox::core::PlanNodeId>& nodeIds);
 
-  /// Create connector config.
-  std::shared_ptr<facebook::velox::config::ConfigBase> createConnectorConfig();
-
   /// Construct partition columns.
   void constructPartitionColumns(
       std::unordered_map<std::string, std::optional<std::string>>&,
@@ -113,7 +110,7 @@ class WholeStageResultIterator : public 
ColumnarBatchIterator {
   VeloxMemoryManager* memoryManager_;
 
   /// Config, task and plan.
-  const config::ConfigBase* veloxCfg_;
+  const std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg_;
 #ifdef GLUTEN_ENABLE_GPU
   const bool enableCudf_;
 #endif
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc 
b/cpp/velox/compute/iceberg/IcebergWriter.cc
index 96a8561eca..0e671f1773 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.cc
+++ b/cpp/velox/compute/iceberg/IcebergWriter.cc
@@ -103,12 +103,8 @@ IcebergWriter::IcebergWriter(
     : rowType_(rowType), field_(convertToIcebergNestedField(field)), 
pool_(memoryPool), connectorPool_(connectorPool), 
createTimeNs_(getCurrentTimeNano()) {
   auto veloxCfg =
       
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(sparkConfs));
-  connectorSessionProperties_ = 
std::make_shared<facebook::velox::config::ConfigBase>(
-      std::unordered_map<std::string, std::string>(), true);
-  connectorSessionProperties_->set(
-      
facebook::velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession,
-      std::to_string(veloxCfg->get<int32_t>(kMaxPartitions, 10000)));
-  connectorConfig_ = 
std::make_shared<facebook::velox::connector::hive::HiveConfig>(getHiveConfig(veloxCfg));
+  connectorSessionProperties_ = createHiveConnectorSessionConfig(veloxCfg);
+  connectorConfig_ = 
std::make_shared<facebook::velox::connector::hive::HiveConfig>(createHiveConnectorConfig(veloxCfg));
   connectorQueryCtx_ = std::make_unique<connector::ConnectorQueryCtx>(
       pool_.get(),
       connectorPool_.get(),
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h 
b/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
index 7b8b6d2cab..50632fdd55 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
@@ -44,7 +44,7 @@ class VeloxParquetDataSourceABFS final : public 
VeloxParquetDataSource {
       : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
 
   void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs) override {
-    auto hiveConf = getHiveConfig(
+    auto hiveConf = createHiveConnectorConfig(
         
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(sparkConfs)),
         FileSystemType::kAbfs);
     facebook::velox::filesystems::registerAzureClientProvider(*hiveConf);
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h 
b/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
index 2e7b313118..ba9d392ed2 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
@@ -43,7 +43,7 @@ class VeloxParquetDataSourceHDFS final : public 
VeloxParquetDataSource {
       : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
 
   void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs) override {
-    auto hiveConf = getHiveConfig(
+    auto hiveConf = createHiveConnectorConfig(
         
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(sparkConfs)),
         FileSystemType::kHdfs);
     sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties = 
hiveConf, .pool = sinkPool_.get()});
diff --git a/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h 
b/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
index f366953422..9fa170ec94 100644
--- a/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
@@ -43,7 +43,7 @@ class VeloxParquetDataSourceS3 final : public 
VeloxParquetDataSource {
       : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
 
   void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs) override {
-    auto hiveConf = getHiveConfig(
+    auto hiveConf = createHiveConnectorConfig(
         
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(sparkConfs)),
         FileSystemType::kS3);
     sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties = 
hiveConf, .pool = sinkPool_.get()});
diff --git a/cpp/velox/utils/ConfigExtractor.cc 
b/cpp/velox/utils/ConfigExtractor.cc
index 1d4cd7f859..5e4fa4ef2c 100644
--- a/cpp/velox/utils/ConfigExtractor.cc
+++ b/cpp/velox/utils/ConfigExtractor.cc
@@ -25,6 +25,7 @@
 #include "utils/Macros.h"
 #include "velox/connectors/hive/HiveConfig.h"
 #include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h"
+#include "velox/dwio/parquet/writer/Writer.h"
 
 namespace gluten {
 
@@ -218,6 +219,27 @@ void getAbfsHiveConfig(
 
 } // namespace
 
+std::shared_ptr<facebook::velox::config::ConfigBase> 
createHiveConnectorSessionConfig(
+    const std::shared_ptr<facebook::velox::config::ConfigBase>& conf) {
+  // The configs below are used at session level.
+  std::unordered_map<std::string, std::string> configs = {};
+  // The semantics of reading as lower case is opposite with case-sensitive.
+  
configs[facebook::velox::connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession]
 =
+      !conf->get<bool>(kCaseSensitive, false) ? "true" : "false";
+  
configs[facebook::velox::connector::hive::HiveConfig::kPartitionPathAsLowerCaseSession]
 = "false";
+  
configs[facebook::velox::parquet::WriterOptions::kParquetSessionWriteTimestampUnit]
 = std::string("6");
+  
configs[facebook::velox::connector::hive::HiveConfig::kReadTimestampUnitSession]
 = std::string("6");
+  
configs[facebook::velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession]
 =
+      conf->get<std::string>(kMaxPartitions, "10000");
+  
configs[facebook::velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession]
 =
+      conf->get<bool>(kIgnoreMissingFiles, false) ? "true" : "false";
+  
configs[facebook::velox::connector::hive::HiveConfig::kParquetUseColumnNamesSession]
 =
+      conf->get<bool>(kParquetUseColumnNames, true) ? "true" : "false";
+  
configs[facebook::velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession]
 =
+      conf->get<bool>(kOrcUseColumnNames, true) ? "true" : "false";
+  return 
std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
+}
+
 std::string getConfigValue(
     const std::unordered_map<std::string, std::string>& confMap,
     const std::string& key,
@@ -232,8 +254,8 @@ std::string getConfigValue(
   return got->second;
 }
 
-std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
-    std::shared_ptr<facebook::velox::config::ConfigBase> conf,
+std::shared_ptr<facebook::velox::config::ConfigBase> createHiveConnectorConfig(
+    const std::shared_ptr<facebook::velox::config::ConfigBase>& conf,
     FileSystemType fsType) {
   std::unordered_map<std::string, std::string> hiveConfMap;
 
diff --git a/cpp/velox/utils/ConfigExtractor.h 
b/cpp/velox/utils/ConfigExtractor.h
index 5b10c714f2..cc7529c140 100644
--- a/cpp/velox/utils/ConfigExtractor.h
+++ b/cpp/velox/utils/ConfigExtractor.h
@@ -23,20 +23,24 @@
 #include <string>
 #include <unordered_map>
 
-#include "config/GlutenConfig.h"
 #include "velox/common/config/Config.h"
 
 namespace gluten {
 
 enum class FileSystemType : uint8_t { kHdfs, kS3, kAbfs, kGcs, kAll };
 
+/// Create hive connector session config.
+std::shared_ptr<facebook::velox::config::ConfigBase> 
createHiveConnectorSessionConfig(
+    const std::shared_ptr<facebook::velox::config::ConfigBase>& conf);
+
 std::string getConfigValue(
     const std::unordered_map<std::string, std::string>& confMap,
     const std::string& key,
     const std::optional<std::string>& fallbackValue);
 
-std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
-    std::shared_ptr<facebook::velox::config::ConfigBase> conf,
+/// Create hive connector config.
+std::shared_ptr<facebook::velox::config::ConfigBase> createHiveConnectorConfig(
+    const std::shared_ptr<facebook::velox::config::ConfigBase>& conf,
     FileSystemType fsType = FileSystemType::kAll);
 
 } // namespace gluten


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to