This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 33f123e60a [GLUTEN-9517][CH] Allow merge tree format don't configure 
disk cache (#9520)
33f123e60a is described below

commit 33f123e60ac2f15b4c8b9f5f89fc22209c03f60f
Author: Chang chen <[email protected]>
AuthorDate: Tue May 6 20:41:58 2025 +0800

    [GLUTEN-9517][CH] Allow merge tree format don't configure disk cache (#9520)
    
    * [Refactor] Add GlutenObjectStorageConfig to improve consistency and 
maintainability
    
    This commit introduces a Scala object in the ClickHouse backend module and 
refactors the C++ implementation to use defined constants instead of hardcoded 
string literals for object storage disk types. The changes include:
    
    1. Create a centralized Scala object with S3 and HDFS disk type constants 
`GlutenObjectStorageConfig`
    2. Align C++ with the Scala implementation `GlutenObjectStorageConfig`
    3. Replace all hardcoded string literals with the respective constants from 
`GlutenObjectStorageConfig`
    4. Improve code formatting and organization in affected files
    
    These changes make the code more maintainable by centralizing important 
constants, reducing duplication, and ensuring consistency between Scala and C++ 
implementations.
    
    * [Refactor] Add MergeTree configuration options for Delta optimized writer
    
    Add two important configuration options for the ClickHouse backend that 
optimize writing to Delta tables:
    
    - `RuntimeSettings.MERGE_AFTER_INSERT`: Controls whether to merge data 
after insert in each task. Should be set to false when using 
DeltaOptimizedWriterTransformer.
    
    - `RuntimeSettings.INSERT_WITHOUT_LOCAL_STORAGE`: Controls whether to 
bypass local temporary storage when inserting into remote storage. Should be 
set to true when using DeltaOptimizedWriterTransformer.
    
    Also removed unnecessary `RuntimeConfig.LOGGER_LEVEL` settings from various 
test files.
    
    Added TODO comment in MetricsUtil to improve error handling when 
`RuntimeSettings.COLLECT_METRICS` is set to false.
    
    * [Refactor] Introduces `StoreConfigBuilder`
    
    Introduces a class to streamline and standardize the configuration of 
object storage disks for HDFS and S3 in test environments.
    
    - Adds a new class implementing a fluent builder pattern 
`StoreConfigBuilder`
    - Refactors HDFS and MinIO test helpers to use this common configuration 
approach
    - Simplifies storage configuration code and reduces duplication
    - Makes storage setup more flexible with better parameter control
    
    The builder pattern provides a cleaner interface for configuring various 
storage options like disk caching, RocksDB metadata, and access credentials.
    
    * [Refactor] Improve HDFS URL handling in test infrastructure
    
    The commit improves the HDFS URL management in test classes by:
    
    - Removing the global `HDFS_URL` constant and centralizing URL generation 
in the `HDFSTestHelper` class
    - Adding new helper methods: `independentHdfsURL()`, `hdfsURL()`, 
`deleteDir()`, and `countDir()`
    - Making directory paths consistent across tests by using a shared base 
directory parameter
    - Simplifying HDFS file operations with helper methods that handle path 
formatting
    - Removing redundant code for file system configuration and file operations
    - Adding constants for commonly used storage policies
    
    * [Refactor] Clean up object storage APIs and add TPCH test suites
    
    - Refactor storage API by unifying helper methods for MinIO and HDFS
      - Replace `setHadoopFileSystemConfig` with `setFileSystem` in 
MinioTestHelper
      - Replace `setObjectStoreConfig` with `setStoreConfig` in MinioTestHelper
      - Consolidate multiple HDFS configuration methods into a single 
`setStoreConfig`
      - Simplify filesystem configuration with `setFileSystem` in HDFSTestHelper
    
    - Add new test suites for TPCH benchmarks with object storage
      - Create `MinioTCPHTestSuite` for testing with MinIO
      - Create `HDFSTCPHTestSuite` for testing with HDFS
      - Add common `WriteMergeTreeConf` trait for MergeTree configuration
    
    - Simplify code in `GlutenClickHouseWholeStageTransformerSuite` by using the
      refactored methods
    
    * Include backend commit 
https://github.com/ClickHouse/ClickHouse/commit/83de175e411120003fed985c6e8a07e3053ea046
    
    Fix read position setting for HDFS buffer and simplify remote FS gather 
read logic. This commit implements two main improvements:
    
    1. Added implementations for `setReadUntilPosition` and `setReadUntilEnd` 
methods in the `ReadBufferFromHDFS` class, allowing HDFS read buffers to 
properly limit reading ranges, consistent with other storage interfaces
    2. Simplified the read position setting logic in 
`ReadBufferFromRemoteFSGather`, removing redundant condition checks and making 
the code more clear:
        - Setting read position is needed regardless of whether the object has 
an offset
        - Different strategies are used based on whether the read position is 
within the range
    
    These improvements help ensure proper control of data reading ranges when 
working with HDFS storage, particularly when MergeTree format doesn't configure 
disk cache.
    
    ---------
    
    Co-authored-by: Chang chen <[email protected]>
---
 .../gluten/backendsapi/clickhouse/CHConfig.scala   |   5 +
 .../backendsapi/clickhouse/RuntimeSettings.scala   |  16 +++
 .../org/apache/gluten/metrics/MetricsUtil.scala    |   2 +
 .../GlutenClickHouseDeltaParquetWriteSuite.scala   |   5 +-
 .../GlutenClickHouseExcelFormatSuite.scala         |   6 +-
 ...lutenClickHouseWholeStageTransformerSuite.scala |  12 +-
 .../cache/GlutenClickHouseHDFSSuite.scala          |   2 +-
 .../GlutenClickHouseNativeWriteTableSuite.scala    |   2 -
 .../hive/GlutenClickHouseTableAfterRestart.scala   |   5 +-
 .../GlutenClickHouseMergeTreeCacheDataSuite.scala  |  31 ++---
 .../GlutenClickHouseMergeTreeOptimizeSuite.scala   |   7 +-
 ...tenClickHouseMergeTreePathBasedWriteSuite.scala |   2 +-
 .../GlutenClickHouseMergeTreeTPCHSuite.scala       | 117 +++++++++++++++++++
 ...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala |  43 +++----
 ...eMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala |  41 +++----
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  |   7 +-
 .../GlutenClickHouseMergeTreeWriteStatsSuite.scala |  29 ++---
 .../GlutenClickHouseMergeTreeWriteSuite.scala      |   2 +-
 .../metrics/GlutenClickHouseTPCHMetricsSuite.scala |   2 -
 .../org/apache/gluten/utils/HDFSTestHelper.scala   |  87 +++++++++-----
 .../org/apache/gluten/utils/MinioTestHelper.scala  |  48 ++++----
 .../apache/gluten/utils/StoreConfigBuilder.scala   | 126 +++++++++++++++++++++
 cpp-ch/clickhouse.version                          |   2 +-
 cpp-ch/local-engine/Common/CHUtil.cpp              |  14 ++-
 cpp-ch/local-engine/Common/GlutenConfig.h          |   7 +-
 .../registerGlutenDiskObjectStorage.cpp            |  66 ++++-------
 cpp-ch/local-engine/Disks/registerGlutenDisks.cpp  |  13 ++-
 27 files changed, 469 insertions(+), 230 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
index 4720db3ad7..59f2ae3d8a 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
@@ -131,3 +131,8 @@ class CHConfig(conf: SQLConf) extends GlutenConfig(conf) {
 
   def enableGlutenLocalFileCache: Boolean = 
getConf(ENABLE_GLUTEN_LOCAL_FILE_CACHE)
 }
+
+object GlutenObjectStorageConfig {
+  val S3_DISK_TYPE: String = "s3_gluten"
+  val HDFS_DISK_TYPE: String = "hdfs_gluten"
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
index bcd245eafe..1c0e83b1d3 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
@@ -102,4 +102,20 @@ object RuntimeSettings {
       .doc("Enable memory spill scheduler")
       .booleanConf
       .createWithDefault(true)
+
+  val MERGE_AFTER_INSERT =
+    buildConf(runtimeSettings("mergetree.merge_after_insert"))
+      .doc(s"""Merge after insert in each task.
+              |Set to false If DeltaOptimizedWriterTransformer is used
+              |""".stripMargin)
+      .booleanConf
+      .createWithDefault(true)
+
+  val INSERT_WITHOUT_LOCAL_STORAGE =
+    buildConf(runtimeSettings("mergetree.insert_without_local_storage"))
+      .doc(s"""When insert into remote storage, don't write to local temporary 
storage first.
+              |Set to true If DeltaOptimizedWriterTransformer is used
+              |""".stripMargin)
+      .booleanConf
+      .createWithDefault(false)
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index 3630205fd0..4219bdc509 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -106,6 +106,8 @@ object MetricsUtil extends Logging {
         val numNativeMetrics = metrics.metricsDataList.size()
         val relSize = relMap.values().asScala.flatMap(l => l.asScala).size
         if (numNativeMetrics == 0 || numNativeMetrics != relSize) {
+          // TODO: if `RuntimeSettings.COLLECT_METRICS` set to false, we 
should not log the warning
+          //       otherwise, we should raise an exception
           logWarning(
             s"Updating native metrics failed due to the wrong size of metrics 
data: " +
               s"$numNativeMetrics")
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
index a26f3607e4..33a657820e 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.backendsapi.clickhouse.RuntimeSettings
 import org.apache.gluten.config.GlutenConfig
 
 import org.apache.spark.SparkConf
@@ -32,8 +33,6 @@ import java.io.File
 
 class GlutenClickHouseDeltaParquetWriteSuite extends ParquetTPCHSuite {
 
-  import org.apache.gluten.backendsapi.clickhouse.CHConfig._
-
   /** Run Gluten + ClickHouse Backend with SortShuffleManager */
   override protected def sparkConf: SparkConf = {
     super.sparkConf
@@ -45,7 +44,7 @@ class GlutenClickHouseDeltaParquetWriteSuite extends 
ParquetTPCHSuite {
       .set("spark.sql.files.maxPartitionBytes", "20000000")
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, spark35.toString)
       .set("spark.sql.storeAssignmentPolicy", "legacy")
-      .setCHSettings("mergetree.merge_after_insert", false)
+      .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
       .set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
   }
 
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
index dcc5f39c54..761d48b0ce 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
@@ -1034,7 +1034,7 @@ class GlutenClickHouseExcelFormatSuite extends 
GlutenClickHouseWholeStageTransfo
          | from $orcFileFormat.`$filePath`
          | where long_field > 30
          |""".stripMargin
-    compareResultsAgainstVanillaSpark(sql, compareResult = true, df => {}, 
noFallBack = true)
+    compareResultsAgainstVanillaSpark(sql, compareResult = true, df => {})
   }
 
   // TODO: Fix: if the field names has upper case form, it will return null 
value
@@ -1458,7 +1458,7 @@ class GlutenClickHouseExcelFormatSuite extends 
GlutenClickHouseWholeStageTransfo
      * is destroyed, but before that, the file moved by spark committer.
      */
 
-    val tablePath = hdfsHelper.getHdfsUrl(s"$SPARK_DIR_NAME/write_into_hdfs/")
+    val tablePath = hdfsHelper.independentHdfsURL("write_into_hdfs")
     val format = "parquet"
     val sql =
       s"""
@@ -1474,7 +1474,7 @@ class GlutenClickHouseExcelFormatSuite extends 
GlutenClickHouseWholeStageTransfo
   // TODO: pass spark configuration to FileFormatWriter in Spark 3.3 and 3.2
   testWithMinSparkVersion("write succeed even if set wrong snappy compression 
codec level", "3.5") {
     // TODO: remove duplicated test codes
-    val tablePath = hdfsHelper.getHdfsUrl(s"$SPARK_DIR_NAME/failed_test/")
+    val tablePath = hdfsHelper.independentHdfsURL("failed_test")
     val format = "parquet"
     val sql =
       s"""
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index 5dd8b0c2c4..af33880e27 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -60,8 +60,7 @@ class GlutenClickHouseWholeStageTransformerSuite
 
   val BUCKET_NAME: String = SPARK_DIR_NAME
   val minioHelper = new MinioTestHelper(TMP_PREFIX)
-  val hdfsHelper = new HDFSTestHelper(TMP_PREFIX)
-  val HDFS_URL: String = hdfsHelper.getHdfsUrl(SPARK_DIR_NAME)
+  val hdfsHelper = new HDFSTestHelper(TMP_PREFIX, SPARK_DIR_NAME)
 
   val CH_DEFAULT_STORAGE_DIR = "/data"
 
@@ -91,11 +90,10 @@ class GlutenClickHouseWholeStageTransformerSuite
       .set(RuntimeConfig.PATH.key, UTSystemParameters.diskOutputDataPath)
       .set(RuntimeConfig.TMP_PATH.key, s"/tmp/libch/$SPARK_DIR_NAME")
     if (UTSystemParameters.testMergeTreeOnObjectStorage) {
-      minioHelper.setHadoopFileSystemConfig(conf)
-      minioHelper.setObjectStoreConfig(conf, BUCKET_NAME)
-      hdfsHelper.setHDFSStoreConfig(conf)
-      hdfsHelper.setHDFSStoreConfigRocksDB(conf)
-      hdfsHelper.setHdfsClientConfig(conf)
+      minioHelper.setFileSystem(conf)
+      minioHelper.setStoreConfig(conf, BUCKET_NAME)
+      hdfsHelper.setFileSystem(conf)
+      hdfsHelper.setStoreConfig(conf)
     } else {
       conf
     }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
index e7064d1edc..0cc1e0955a 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
@@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path
 
 class GlutenClickHouseHDFSSuite extends GlutenClickHouseCacheBaseTestSuite {
 
-  override protected val remotePath: String = 
hdfsHelper.getHdfsUrl("tpch-data")
+  override protected val remotePath: String = hdfsHelper.hdfsURL("tpch-data")
 
   override protected def copyDataIfNeeded(): Unit = {
     val targetFile = new Path(s"$remotePath/lineitem")
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
index c1dc110779..81bf35e2eb 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.execution.hive
 
-import org.apache.gluten.backendsapi.clickhouse.RuntimeConfig
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite
 import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData
@@ -65,7 +64,6 @@ class GlutenClickHouseNativeWriteTableSuite
       .set("spark.sql.storeAssignmentPolicy", "legacy")
       .set("spark.sql.warehouse.dir", getWarehouseDir)
       .set("spark.sql.session.timeZone", sessionTimeZone)
-      .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
       .setMaster("local[1]")
   }
 
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
index bc84f060b2..60f06ce959 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution.hive
 
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig, 
RuntimeSettings}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.CreateMergeTreeSuite
 
@@ -46,14 +46,13 @@ class GlutenClickHouseTableAfterRestart extends 
CreateMergeTreeSuite with ReCrea
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
       .set("spark.sql.adaptive.enabled", "true")
-      .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
       .setCHConfig("user_defined_path", "/tmp/user_defined")
       .set("spark.sql.files.maxPartitionBytes", "20000000")
       .set("spark.ui.enabled", "true")
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
       .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
       .set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "100000")
-      .setCHSettings("mergetree.merge_after_insert", false)
+      .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
       .setCHSettings("input_format_parquet_max_block_size", 8192)
       .setMaster("local[2]")
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
index 90985f412f..712ef5efb4 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution.mergetree
 
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{CreateMergeTreeSuite, 
FileSourceScanExecTransformer}
 
@@ -24,9 +24,6 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.delta.files.TahoeFileIndex
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-
 import java.io.File
 
 import scala.concurrent.duration.DurationInt
@@ -34,7 +31,6 @@ import scala.concurrent.duration.DurationInt
 class GlutenClickHouseMergeTreeCacheDataSuite extends CreateMergeTreeSuite {
 
   override protected def sparkConf: SparkConf = {
-    import org.apache.gluten.backendsapi.clickhouse.CHConfig._
 
     super.sparkConf
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
@@ -42,19 +38,16 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends 
CreateMergeTreeSuite {
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
       .set("spark.sql.adaptive.enabled", "true")
-      .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
       .set("spark.gluten.soft-affinity.enabled", "true")
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
       .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
-      .setCHSettings("mergetree.merge_after_insert", false)
+      .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
   }
 
+  private val remotePath: String = hdfsHelper.independentHdfsURL("test")
   override protected def beforeEach(): Unit = {
     super.beforeEach()
-    val conf = new Configuration
-    conf.set("fs.defaultFS", HDFS_URL)
-    val fs = FileSystem.get(conf)
-    fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+    hdfsHelper.deleteDir(remotePath)
     hdfsHelper.resetMeta()
     hdfsHelper.resetCache()
   }
@@ -97,7 +90,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends 
CreateMergeTreeSuite {
                  |)
                  |USING clickhouse
                  |PARTITIONED BY (l_shipdate)
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_hdfs'
                  |TBLPROPERTIES (storage_policy='__hdfs_main',
                  |               orderByKey='l_linenumber,l_orderkey')
                  |""".stripMargin)
@@ -200,7 +193,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends 
CreateMergeTreeSuite {
                  |)
                  |USING clickhouse
                  |PARTITIONED BY (l_shipdate)
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_hdfs'
                  |TBLPROPERTIES (storage_policy='__hdfs_main',
                  |               orderByKey='l_linenumber,l_orderkey')
                  |""".stripMargin)
@@ -307,7 +300,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends 
CreateMergeTreeSuite {
                  |)
                  |USING clickhouse
                  |PARTITIONED BY (l_shipdate)
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_hdfs'
                  |TBLPROPERTIES (storage_policy='__hdfs_main',
                  |               orderByKey='l_linenumber,l_orderkey')
                  |""".stripMargin)
@@ -324,7 +317,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends 
CreateMergeTreeSuite {
     val res = spark
       .sql(s"""
               |cache data
-              |  select * from '$HDFS_URL/test/lineitem_mergetree_hdfs'
+              |  select * from '$remotePath/lineitem_mergetree_hdfs'
               |  after l_shipdate AS OF '1995-01-10'
               |  CACHEPROPERTIES(storage_policy='__hdfs_main',
               |                aaa='ccc')""".stripMargin)
@@ -409,7 +402,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends 
CreateMergeTreeSuite {
                  | L_COMMENT       string
                  |)
                  |USING clickhouse
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_hdfs'
                  |TBLPROPERTIES (storage_policy='__hdfs_main')
                  |""".stripMargin)
 
@@ -498,7 +491,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends 
CreateMergeTreeSuite {
                  |)
                  |USING clickhouse
                  |PARTITIONED BY (L_SHIPDATE)
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_hdfs'
                  |TBLPROPERTIES (storage_policy='__hdfs_main',
                  |               orderByKey='L_LINENUMBER,L_ORDERKEY')
                  |""".stripMargin)
@@ -515,7 +508,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends 
CreateMergeTreeSuite {
     val res = spark
       .sql(s"""
               |cache data
-              |  select * from '$HDFS_URL/test/lineitem_mergetree_hdfs'
+              |  select * from '$remotePath/lineitem_mergetree_hdfs'
               |  after L_SHIPDATE AS OF '1995-01-10'
               |  CACHEPROPERTIES(storage_policy='__hdfs_main',
               |                aaa='ccc')""".stripMargin)
@@ -576,7 +569,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends 
CreateMergeTreeSuite {
   test("test disable cache files return") {
     withSQLConf(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key -> "false") {
       runSql(
-        s"CACHE FILES select * from 
'${hdfsHelper.getHdfsUrl("tpch-data/lineitem")}'",
+        s"CACHE FILES select * from 
'${hdfsHelper.hdfsURL("tpch-data/lineitem")}'",
         noFallBack = false) {
         df =>
           val res = df.collect()
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
index 9d8dc08bb9..a3908274ec 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution.mergetree
 
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig, 
RuntimeSettings}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{CreateMergeTreeSuite, 
FileSourceScanExecTransformer}
 
@@ -46,7 +46,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends 
CreateMergeTreeSuite {
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
       .set("spark.sql.adaptive.enabled", "true")
-      .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
       .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
       .set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "10000")
@@ -54,7 +53,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends 
CreateMergeTreeSuite {
         "spark.databricks.delta.retentionDurationCheck.enabled",
         "false"
       ) // otherwise, RETAIN 0 HOURS will fail
-      .setCHSettings("mergetree.merge_after_insert", false)
+      .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
       .setCHSettings("input_format_parquet_max_block_size", 8192)
   }
 
@@ -494,7 +493,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends 
CreateMergeTreeSuite {
   test("test mergetree insert with optimize basic") {
     withSQLConf(
       DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> "200000000",
-      CHConfig.runtimeSettings("mergetree.merge_after_insert") -> "true"
+      RuntimeSettings.MERGE_AFTER_INSERT.key -> "true"
     ) {
       spark.sql(s"""
                    |DROP TABLE IF EXISTS 
lineitem_mergetree_insert_optimize_basic;
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index dc5ffd936d..ff237e9600 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -54,7 +54,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite extends 
CreateMergeTreeSuite
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
       .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
       .set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "100000")
-      .setCHSettings("mergetree.merge_after_insert", false)
+      .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
       .setCHSettings("input_format_parquet_max_block_size", 8192)
 
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeTPCHSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeTPCHSuite.scala
new file mode 100644
index 0000000000..67fe2c1837
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeTPCHSuite.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.mergetree
+
+import org.apache.gluten.backendsapi.clickhouse.CHConfig
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+trait TPCHObjectStore extends TPCHDatabase {
+  // parquet data source
+  override protected def parquetSourceDB: String = "parquet_source"
+  val policy: String
+  val remotePath: String
+
+  override protected def createTestTables(): Unit = {
+    createTPCHTables(
+      s"$remotePath/default",
+      format = "clickhouse",
+      isNull = false,
+      props = Map("storage_policy" -> s"'$policy'") // use the policy to 
create tables
+    )
+    insertIntoTPCHTables(parquetSourceDB)
+  }
+}
+
+/**
+ * An experimental configuration trait to find a way to reduce suite 
configuration for MergeTree
+ * data format in Spark tests.
+ *
+ * This trait centralizes common configuration settings needed across multiple 
test suites working
+ * with MergeTree format, eliminating the need to duplicate these settings in 
each test class. By
+ * extending this trait, test suites gain a standardized configuration 
foundation while only needing
+ * to implement the `useOnePipeline` variable.
+ *
+ * The trait handles several aspects of configuration:
+ *   - Sets up the columnar shuffle management system
+ *   - Configures Delta Lake session extensions
+ *   - Establishes ClickHouse catalog implementation
+ *   - Enables native writer optimizations with appropriate pipeline settings
+ *
+ * Test suites can customize behavior by defining the `useOnePipeline` value 
based on their specific
+ * testing requirements or Spark version compatibility.
+ */
+
+trait WriteMergeTreeConf extends SharedSparkSession {
+  val useOnePipeline: Boolean
+  override protected def sparkConf: SparkConf = {
+    import org.apache.spark.shuffle.sort.ColumnarShuffleManager
+    import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseSparkCatalog
+    import org.apache.spark.sql.internal.SQLConf
+    import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
+    import io.delta.sql.DeltaSparkSessionExtension
+
+    super.sparkConf
+      .set("spark.shuffle.manager", classOf[ColumnarShuffleManager].getName)
+      .set(SPARK_SESSION_EXTENSIONS.key, 
classOf[DeltaSparkSessionExtension].getName)
+      .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, 
classOf[ClickHouseSparkCatalog].getName)
+      .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
+      .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, 
useOnePipeline.toString)
+  }
+}
+
+abstract class GlutenClickHouseMergeTreeTPCHSuite
+  extends GlutenClickHouseTPCHAbstractSuite
+  with TPCHMergeTreeResult
+  with TPCHObjectStore
+  with WriteMergeTreeConf {
+
+  override val useOnePipeline: Boolean = spark35
+  final override val testCases: Seq[Int] = Seq(
+    1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 
22
+  )
+  setupTestCase()
+}
+
+class MinioTCPHTestSuite extends GlutenClickHouseMergeTreeTPCHSuite {
+  override val policy: String = minioHelper.STORE_POLICY_NOCACHE
+  override val remotePath: String = s"s3a://$BUCKET_NAME"
+
+  override def beforeAll(): Unit = {
+    if (minioHelper.bucketExists(BUCKET_NAME)) {
+      minioHelper.clearBucket(BUCKET_NAME)
+    }
+    minioHelper.createBucket(BUCKET_NAME)
+    minioHelper.resetMeta()
+    super.beforeAll()
+  }
+}
+
+class HDFSTCPHTestSuite extends GlutenClickHouseMergeTreeTPCHSuite {
+
+  override val policy: String = hdfsHelper.STORE_POLICY_ROCKSDB
+  override val remotePath: String = hdfsHelper.independentHdfsURL()
+
+  override def beforeAll(): Unit = {
+    hdfsHelper.deleteDir(remotePath)
+    hdfsHelper.resetMeta()
+    super.beforeAll()
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 043b19b8f0..32876f1c10 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution.mergetree
 
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig, 
RuntimeSettings}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{CreateMergeTreeSuite, 
FileSourceScanExecTransformer}
 
@@ -28,15 +28,11 @@ import 
org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 import org.apache.spark.sql.types._
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-
 import scala.concurrent.duration.DurationInt
 
 class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends CreateMergeTreeSuite {
 
   override protected def sparkConf: SparkConf = {
-    import org.apache.gluten.backendsapi.clickhouse.CHConfig._
 
     super.sparkConf
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
@@ -44,18 +40,15 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends 
CreateMergeTreeSuite {
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
       .set("spark.sql.adaptive.enabled", "true")
-      .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
       .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
-      .setCHSettings("mergetree.merge_after_insert", false)
+      .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
   }
 
+  private val remotePath: String = hdfsHelper.independentHdfsURL("test")
   override protected def beforeEach(): Unit = {
     super.beforeEach()
-    val conf = new Configuration
-    conf.set("fs.defaultFS", HDFS_URL)
-    val fs = FileSystem.get(conf)
-    fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+    hdfsHelper.deleteDir(remotePath)
     hdfsHelper.resetMeta()
   }
 
@@ -90,7 +83,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends 
CreateMergeTreeSuite {
                  | l_comment       string
                  |)
                  |USING clickhouse
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_hdfs'
                  |TBLPROPERTIES (storage_policy='__hdfs_main')
                  |""".stripMargin)
 
@@ -155,7 +148,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends 
CreateMergeTreeSuite {
                  |TBLPROPERTIES (storage_policy='__hdfs_main',
                  |               orderByKey='l_shipdate,l_orderkey',
                  |               primaryKey='l_shipdate')
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_orderbykey_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_orderbykey_hdfs'
                  |""".stripMargin)
 
     spark.sql(s"""
@@ -222,7 +215,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends 
CreateMergeTreeSuite {
                  |TBLPROPERTIES (storage_policy='__hdfs_main',
                  |               orderByKey='l_orderkey',
                  |               primaryKey='l_orderkey')
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_partition_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_partition_hdfs'
                  |""".stripMargin)
 
     // dynamic partitions
@@ -384,7 +377,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends 
CreateMergeTreeSuite {
                  |TBLPROPERTIES (storage_policy='__hdfs_main',
                  |               orderByKey='c1',
                  |               primaryKey='c1')
-                 |LOCATION '$HDFS_URL/test/partition_escape'
+                 |LOCATION '$remotePath/partition_escape'
                  |""".stripMargin)
 
     spark.sql("insert into partition_escape select * from origin_table")
@@ -420,7 +413,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends 
CreateMergeTreeSuite {
                  |PARTITIONED BY (l_returnflag)
                  |CLUSTERED BY (l_orderkey)
                  |${if (spark32) "" else "SORTED BY (l_partkey)"} INTO 4 
BUCKETS
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_bucket_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_bucket_hdfs'
                  |TBLPROPERTIES (storage_policy='__hdfs_main')
                  |""".stripMargin)
 
@@ -469,7 +462,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends 
CreateMergeTreeSuite {
   }
 
   testSparkVersionLE33("test mergetree write with the path based bucket 
table") {
-    val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
+    val dataPath = s"$remotePath/lineitem_mergetree_bucket_hdfs"
 
     val sourceDF = spark.sql(s"""
                                 |select * from lineitem
@@ -525,12 +518,12 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends 
CreateMergeTreeSuite {
 
   test("test mergetree insert with optimize basic") {
     val tableName = "lineitem_mergetree_insert_optimize_basic_hdfs"
-    val dataPath = s"$HDFS_URL/test/$tableName"
+    val dataPath = s"$remotePath/$tableName"
 
     withSQLConf(
       "spark.databricks.delta.optimize.minFileSize" -> "200000000",
-      CHConfig.runtimeSettings("mergetree.merge_after_insert") -> "true",
-      CHConfig.runtimeSettings("mergetree.insert_without_local_storage") -> 
"true",
+      RuntimeSettings.MERGE_AFTER_INSERT.key -> "true",
+      RuntimeSettings.INSERT_WITHOUT_LOCAL_STORAGE.key -> "true",
       RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key -> "10000"
     ) {
       spark.sql(s"""
@@ -547,17 +540,9 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends 
CreateMergeTreeSuite {
 
       val ret = spark.sql(s"select count(*) from $tableName").collect()
       assertResult(600572)(ret.apply(0).get(0))
-      val conf = new Configuration
-      conf.set("fs.defaultFS", HDFS_URL)
-      val fs = FileSystem.get(conf)
 
       eventually(timeout(60.seconds), interval(2.seconds)) {
-        val it = fs.listFiles(new Path(dataPath), true)
-        var files = 0
-        while (it.hasNext) {
-          it.next()
-          files += 1
-        }
+        val files = hdfsHelper.countDir(dataPath)
         assertResult(4)(files)
       }
     }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
index 18da968b30..3840d0ec32 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution.mergetree
 
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig, 
RuntimeSettings}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{CreateMergeTreeSuite, 
FileSourceScanExecTransformer}
 
@@ -27,15 +27,11 @@ import org.apache.spark.sql.delta.files.TahoeFileIndex
 import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-
 import scala.concurrent.duration.DurationInt
 
 class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends 
CreateMergeTreeSuite {
 
   override protected def sparkConf: SparkConf = {
-    import org.apache.gluten.backendsapi.clickhouse.CHConfig._
 
     super.sparkConf
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
@@ -43,18 +39,15 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
       .set("spark.sql.adaptive.enabled", "true")
-      .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
       .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
-      .setCHSettings("mergetree.merge_after_insert", false)
+      .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
   }
 
+  private val remotePath: String = hdfsHelper.independentHdfsURL("test")
   override protected def beforeEach(): Unit = {
     super.beforeEach()
-    val conf = new Configuration
-    conf.set("fs.defaultFS", HDFS_URL)
-    val fs = FileSystem.get(conf)
-    fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+    hdfsHelper.deleteDir(remotePath)
     hdfsHelper.resetMeta()
   }
 
@@ -89,7 +82,7 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
                  | l_comment       string
                  |)
                  |USING clickhouse
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_hdfs'
                  |TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb')
                  |""".stripMargin)
 
@@ -154,7 +147,7 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
                  |TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb',
                  |               orderByKey='l_shipdate,l_orderkey',
                  |               primaryKey='l_shipdate')
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_orderbykey_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_orderbykey_hdfs'
                  |""".stripMargin)
 
     spark.sql(s"""
@@ -221,7 +214,7 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
                  |TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb',
                  |               orderByKey='l_orderkey',
                  |               primaryKey='l_orderkey')
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_partition_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_partition_hdfs'
                  |""".stripMargin)
 
     // dynamic partitions
@@ -370,7 +363,7 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
                  |PARTITIONED BY (l_returnflag)
                  |CLUSTERED BY (l_orderkey)
                  |${if (spark32) "" else "SORTED BY (l_partkey)"} INTO 4 
BUCKETS
-                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_bucket_hdfs'
+                 |LOCATION '$remotePath/lineitem_mergetree_bucket_hdfs'
                  |TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb')
                  |""".stripMargin)
 
@@ -419,7 +412,7 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
   }
 
   testSparkVersionLE33("test mergetree write with the path based bucket 
table") {
-    val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
+    val dataPath = s"$remotePath/lineitem_mergetree_bucket_hdfs"
 
     val sourceDF = spark.sql(s"""
                                 |select * from lineitem
@@ -475,12 +468,12 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
 
   test("test mergetree insert with optimize basic") {
     val tableName = "lineitem_mergetree_insert_optimize_basic_hdfs"
-    val dataPath = s"$HDFS_URL/test/$tableName"
+    val dataPath = s"$remotePath/$tableName"
 
     withSQLConf(
       "spark.databricks.delta.optimize.minFileSize" -> "200000000",
-      CHConfig.runtimeSettings("mergetree.merge_after_insert") -> "true",
-      CHConfig.runtimeSettings("mergetree.insert_without_local_storage") -> 
"true",
+      RuntimeSettings.MERGE_AFTER_INSERT.key -> "true",
+      RuntimeSettings.INSERT_WITHOUT_LOCAL_STORAGE.key -> "true",
       RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key -> "10000"
     ) {
       spark.sql(s"""
@@ -497,17 +490,9 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
 
       val ret = spark.sql(s"select count(*) from $tableName").collect()
       assertResult(600572)(ret.apply(0).get(0))
-      val conf = new Configuration
-      conf.set("fs.defaultFS", HDFS_URL)
-      val fs = FileSystem.get(conf)
 
       eventually(timeout(60.seconds), interval(2.seconds)) {
-        val it = fs.listFiles(new Path(dataPath), true)
-        var files = 0
-        while (it.hasNext) {
-          it.next()
-          files += 1
-        }
+        val files = hdfsHelper.countDir(dataPath)
         assertResult(4)(files)
       }
     }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 79b759c1ae..c68c51e50d 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution.mergetree
 
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BasicScanExecTransformer, 
CreateMergeTreeSuite, FileSourceScanExecTransformer}
 
@@ -40,7 +40,6 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite extends 
CreateMergeTreeSuite {
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
       .set("spark.sql.adaptive.enabled", "true")
-      .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
       .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
   }
@@ -506,8 +505,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite extends 
CreateMergeTreeSuite {
 
     withSQLConf(
       "spark.databricks.delta.optimize.minFileSize" -> "200000000",
-      CHConfig.runtimeSettings("mergetree.insert_without_local_storage") -> 
"true",
-      CHConfig.runtimeSettings("mergetree.merge_after_insert") -> "true"
+      RuntimeSettings.INSERT_WITHOUT_LOCAL_STORAGE.key -> "true",
+      RuntimeSettings.MERGE_AFTER_INSERT.key -> "true"
     ) {
       spark.sql(s"""
                    |DROP TABLE IF EXISTS $tableName;
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteStatsSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteStatsSuite.scala
index 43e8b25134..df4dae03c9 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteStatsSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteStatsSuite.scala
@@ -16,8 +16,7 @@
  */
 package org.apache.gluten.execution.mergetree
 
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig}
-import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{FileSourceScanExecTransformer, 
GlutenClickHouseTPCDSAbstractSuite}
 
@@ -29,9 +28,6 @@ import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.functions.input_file_name
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-
 class GlutenClickHouseMergeTreeWriteStatsSuite extends 
GlutenClickHouseTPCDSAbstractSuite {
 
   override protected def sparkConf: SparkConf = {
@@ -47,18 +43,15 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends 
GlutenClickHouseTPCDSAbst
       .set("spark.databricks.delta.stats.enabled", "true")
       .set("spark.databricks.delta.optimizeWrite.enabled", "true")
       .set("spark.sql.storeAssignmentPolicy", "LEGACY")
-      .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
-      .setCHSettings("mergetree.merge_after_insert", false)
       .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
+      .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
   }
 
+  private val remotePath: String = hdfsHelper.independentHdfsURL("stats")
   override protected def beforeEach(): Unit = {
     super.beforeEach()
-    val conf = new Configuration
-    conf.set("fs.defaultFS", HDFS_URL)
-    val fs = FileSystem.get(conf)
-    fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+    hdfsHelper.deleteDir(remotePath)
     hdfsHelper.resetMeta()
   }
 
@@ -98,7 +91,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends 
GlutenClickHouseTPCDSAbst
            |     ss_sold_date_sk INT
            |)
            |USING clickhouse
-           |LOCATION '$HDFS_URL/stats/store_sales'
+           |LOCATION '$remotePath/store_sales'
            |TBLPROPERTIES (storage_policy='__hdfs_main')
            |""".stripMargin,
       "store_returns" ->
@@ -127,7 +120,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends 
GlutenClickHouseTPCDSAbst
            |     sr_returned_date_sk INT
            |)
            |USING clickhouse
-           |LOCATION '$HDFS_URL/stats/store_returns'
+           |LOCATION '$remotePath/store_returns'
            |TBLPROPERTIES (storage_policy='__hdfs_main')
            |""".stripMargin,
       "catalog_sales" ->
@@ -170,7 +163,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends 
GlutenClickHouseTPCDSAbst
            |     cs_sold_date_sk INT
            |)
            |USING clickhouse
-           |LOCATION '$HDFS_URL/stats/catalog_sales'
+           |LOCATION '$remotePath/catalog_sales'
            |TBLPROPERTIES (storage_policy='__hdfs_main')
            |""".stripMargin,
       "catalog_returns" ->
@@ -206,7 +199,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends 
GlutenClickHouseTPCDSAbst
            |     cr_returned_date_sk INT
            |)
            |USING clickhouse
-           |LOCATION '$HDFS_URL/stats/catalog_returns'
+           |LOCATION '$remotePath/catalog_returns'
            |TBLPROPERTIES (storage_policy='__hdfs_main')
            |""".stripMargin,
       "web_sales" ->
@@ -249,7 +242,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends 
GlutenClickHouseTPCDSAbst
            |     ws_sold_date_sk INT
            |)
            |USING clickhouse
-           |LOCATION '$HDFS_URL/stats/web_sales'
+           |LOCATION '$remotePath/web_sales'
            |TBLPROPERTIES (storage_policy='__hdfs_main')
            |""".stripMargin,
       "web_returns" ->
@@ -282,7 +275,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends 
GlutenClickHouseTPCDSAbst
            |     wr_returned_date_sk INT
            |)
            |USING clickhouse
-           |LOCATION '$HDFS_URL/stats/web_returns'
+           |LOCATION '$remotePath/web_returns'
            |TBLPROPERTIES (storage_policy='__hdfs_main')
            |""".stripMargin,
       "inventory" ->
@@ -295,7 +288,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends 
GlutenClickHouseTPCDSAbst
            |     inv_date_sk INT
            |)
            |USING clickhouse
-           |LOCATION '$HDFS_URL/stats/inventory'
+           |LOCATION '$remotePath/inventory'
            |TBLPROPERTIES (storage_policy='__hdfs_main')
            |""".stripMargin
     )
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index ed516162cf..336c98333d 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -51,7 +51,7 @@ class GlutenClickHouseMergeTreeWriteSuite extends 
CreateMergeTreeSuite {
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
       .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
       .set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "100000")
-      .setCHSettings("mergetree.merge_after_insert", false)
+      .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
       .setCHSettings("input_format_parquet_max_block_size", 8192)
   }
 
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
index e08bd280c3..4a5a923162 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.execution.metrics
 
-import org.apache.gluten.backendsapi.clickhouse.RuntimeConfig
 import org.apache.gluten.execution._
 import org.apache.gluten.execution.GlutenPlan
 
@@ -44,7 +43,6 @@ class GlutenClickHouseTPCHMetricsSuite extends 
ParquetTPCHSuite {
       .set("spark.io.compression.codec", "LZ4")
       .set("spark.sql.shuffle.partitions", "1")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
-      .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
       .setCHSettings("input_format_parquet_max_block_size", 
parquetMaxBlockSize)
       .setCHConfig("enable_pre_projection_for_join_conditions", "false")
       .setCHConfig("enable_streaming_aggregating", true)
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
index 6aa872146e..366d53e4d2 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
@@ -21,49 +21,80 @@ import 
org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
 import org.apache.spark.SparkConf
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import java.io.File
 
-class HDFSTestHelper(TMP_PREFIX: String) {
+class HDFSTestHelper(TMP_PREFIX: String, independentDir: String) {
 
   // HDFS parameters
   val HDFS_CACHE_PATH = s"$TMP_PREFIX/hdfs/cache"
   private val HDFS_METADATA_PATH = s"$TMP_PREFIX/hdfs/metadata"
   private val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
 
-  def getHdfsUrl(dirName: String): String = s"$HDFS_URL_ENDPOINT/$dirName"
+  def hdfsURL(dirName: String): String = s"$HDFS_URL_ENDPOINT/$dirName"
+  def independentHdfsURL(dirName: String = ""): String =
+    if (dirName.isEmpty) {
+      s"$HDFS_URL_ENDPOINT/$independentDir"
+    } else {
+      s"$HDFS_URL_ENDPOINT/$independentDir/$dirName"
+    }
   def metaPath(dirName: String): String = s"$HDFS_METADATA_PATH/$dirName"
 
-  def setHDFSStoreConfig(conf: SparkConf): SparkConf = {
-    conf.setCHConfig(
-      "storage_configuration.disks.hdfs.type" -> "hdfs_gluten",
-      "storage_configuration.disks.hdfs.endpoint" -> s"$HDFS_URL_ENDPOINT/",
-      "storage_configuration.disks.hdfs.metadata_path" -> HDFS_METADATA_PATH,
-      "storage_configuration.disks.hdfs_cache.type" -> "cache",
-      "storage_configuration.disks.hdfs_cache.disk" -> "hdfs",
-      "storage_configuration.disks.hdfs_cache.path" -> HDFS_CACHE_PATH,
-      "storage_configuration.disks.hdfs_cache.max_size" -> "10Gi",
-      "storage_configuration.policies.__hdfs_main.volumes" -> "main",
-      "storage_configuration.policies.__hdfs_main.volumes.main.disk" -> 
"hdfs_cache"
-    )
+  def builder(policyName: String): StoreConfigBuilder = new 
StoreConfigBuilder(policyName)
+
+  val STORE_POLICY = "__hdfs_main"
+  val STORE_POLICY_ROCKSDB = "__hdfs_main_rocksdb"
+
+  def deleteDir(dirName: String): Unit = {
+    val hdfs_dir = hdfsDir(dirName)
+    val conf = new Configuration
+    conf.set("fs.defaultFS", hdfs_dir)
+    val fs = FileSystem.get(conf)
+    fs.delete(new Path(hdfs_dir), true)
   }
 
-  def setHDFSStoreConfigRocksDB(conf: SparkConf): SparkConf = {
-    conf.setCHConfig(
-      "storage_configuration.disks.hdfs2.type" -> "hdfs_gluten",
-      "storage_configuration.disks.hdfs2.endpoint" -> s"$HDFS_URL_ENDPOINT/",
-      "storage_configuration.disks.hdfs2.metadata_path" -> HDFS_METADATA_PATH,
-      "storage_configuration.disks.hdfs2.metadata_type" -> "rocksdb",
-      "storage_configuration.disks.hdfs_cache2.type" -> "cache",
-      "storage_configuration.disks.hdfs_cache2.disk" -> "hdfs2",
-      "storage_configuration.disks.hdfs_cache2.path" -> HDFS_CACHE_PATH,
-      "storage_configuration.disks.hdfs_cache2.max_size" -> "10Gi",
-      "storage_configuration.policies.__hdfs_main_rocksdb.volumes" -> "main",
-      "storage_configuration.policies.__hdfs_main_rocksdb.volumes.main.disk" 
-> "hdfs_cache2"
-    )
+  def countDir(dirName: String): Int = {
+    val hdfs_dir = hdfsDir(dirName)
+
+    val conf = new Configuration
+    conf.set("fs.defaultFS", hdfs_dir)
+    val fs = FileSystem.get(conf)
+    val it = fs.listFiles(new Path(hdfs_dir), true)
+    var count = 0
+    while (it.hasNext) {
+      it.next()
+      count += 1
+    }
+    count
+  }
+
+  private def hdfsDir(dirName: String) = {
+    dirName match {
+      case d if d.startsWith("/") => s"$HDFS_URL_ENDPOINT$d"
+      case d if d.startsWith(HDFS_URL_ENDPOINT) =>
+        d // Keep paths that already start with the endpoint
+      case d => s"$HDFS_URL_ENDPOINT/$d"
+    }
+  }
+
+  def setStoreConfig(conf: SparkConf): SparkConf = {
+    builder(STORE_POLICY)
+      .withEndpoint(s"$HDFS_URL_ENDPOINT/")
+      .withMetadataPath(HDFS_METADATA_PATH)
+      .withCachePath(HDFS_CACHE_PATH)
+      .build(conf)
+
+    builder(STORE_POLICY_ROCKSDB)
+      .withEndpoint(s"$HDFS_URL_ENDPOINT/")
+      .withMetadataPath(HDFS_METADATA_PATH)
+      .withDiskcache(false)
+      .withRocksDB(true)
+      .build(conf)
   }
 
-  def setHdfsClientConfig(conf: SparkConf): SparkConf = {
+  def setFileSystem(conf: SparkConf): SparkConf = {
     conf.setCHConfig(
       "hdfs.dfs_client_read_shortcircuit" -> "false",
       "hdfs.dfs_default_replica" -> "1"
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
index 0667c2c843..f82c09adb6 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.utils
 
-import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+import org.apache.gluten.backendsapi.clickhouse.GlutenObjectStorageConfig
 
 import org.apache.spark.SparkConf
 
@@ -27,7 +27,7 @@ import org.apache.commons.io.FileUtils
 import java.io.File
 import java.util
 
-import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 class MinioTestHelper(TMP_PREFIX: String) {
 
@@ -41,6 +41,9 @@ class MinioTestHelper(TMP_PREFIX: String) {
   val S3_CACHE_PATH = s"$TMP_PREFIX/s3/cache"
   val S3A_ENDPOINT = "s3a://"
 
+  val STORE_POLICY = "__s3_main"
+  val STORE_POLICY_NOCACHE = "__s3_main_2"
+
   private lazy val client = MinioClient
     .builder()
     .endpoint(MINIO_ENDPOINT)
@@ -77,12 +80,12 @@ class MinioTestHelper(TMP_PREFIX: String) {
 
   def listObjects(bucketName: String, prefix: String): Iterable[String] = {
     val args = 
ListObjectsArgs.builder().bucket(bucketName).recursive(true).prefix(prefix).build()
-    val objectNames = new util.ArrayList[String]()
-    client.listObjects(args).forEach(obj => 
objectNames.add(obj.get().objectName()))
-    objectNames.asScala
+    val objectNames = mutable.ArrayBuffer[String]()
+    client.listObjects(args).forEach(obj => 
objectNames.append(obj.get().objectName()))
+    objectNames
   }
 
-  def setHadoopFileSystemConfig(conf: SparkConf): SparkConf = {
+  def setFileSystem(conf: SparkConf): SparkConf = {
     conf
       .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
       .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
@@ -92,22 +95,23 @@ class MinioTestHelper(TMP_PREFIX: String) {
       .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
   }
 
-  def setObjectStoreConfig(conf: SparkConf, BUCKET_NAME: String): SparkConf = {
-    val wholePath: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
-    conf
-      .setCHConfig(
-        "storage_configuration.disks.s3.type" -> "s3_gluten",
-        "storage_configuration.disks.s3.endpoint" -> wholePath,
-        "storage_configuration.disks.s3.access_key_id" -> S3_ACCESS_KEY,
-        "storage_configuration.disks.s3.secret_access_key" -> S3_SECRET_KEY,
-        "storage_configuration.disks.s3.metadata_path" -> S3_METADATA_PATH,
-        "storage_configuration.disks.s3_cache.type" -> "cache",
-        "storage_configuration.disks.s3_cache.disk" -> "s3",
-        "storage_configuration.disks.s3_cache.path" -> S3_CACHE_PATH,
-        "storage_configuration.disks.s3_cache.max_size" -> "10Gi",
-        "storage_configuration.policies.__s3_main.volumes" -> "main",
-        "storage_configuration.policies.__s3_main.volumes.main.disk" -> 
"s3_cache"
-      )
+  def builder(policyName: String): StoreConfigBuilder =
+    new StoreConfigBuilder(policyName, GlutenObjectStorageConfig.S3_DISK_TYPE)
+
+  def setStoreConfig(conf: SparkConf, BUCKET_NAME: String): SparkConf = {
+    builder(STORE_POLICY)
+      .withEndpoint(s"$MINIO_ENDPOINT$BUCKET_NAME/")
+      .withMetadataPath(S3_METADATA_PATH)
+      .withCachePath(S3_CACHE_PATH)
+      .withAKSK(S3_ACCESS_KEY, S3_SECRET_KEY)
+      .build(conf)
+
+    builder(STORE_POLICY_NOCACHE)
+      .withEndpoint(s"$MINIO_ENDPOINT$BUCKET_NAME/")
+      .withMetadataPath(S3_METADATA_PATH)
+      .withDiskcache(false)
+      .withAKSK(S3_ACCESS_KEY, S3_SECRET_KEY)
+      .build(conf)
   }
 
   def resetMeta(): Unit = {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/StoreConfigBuilder.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/StoreConfigBuilder.scala
new file mode 100644
index 0000000000..cb9678f88b
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/StoreConfigBuilder.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+import org.apache.gluten.backendsapi.clickhouse.GlutenObjectStorageConfig
+
+import org.apache.spark.SparkConf
+
+import scala.collection.mutable
+
+class StoreConfigBuilder(
+    val policyName: String,
+    val diskType: String = GlutenObjectStorageConfig.HDFS_DISK_TYPE) {
+  private var useDiskcache: Boolean = true
+  private var useRocksDB: Boolean = false
+
+  private var CACHE_PATH: String = _
+  private var METADATA_PATH: String = _
+  private var ENDPOINT: String = _
+
+  private var ak: String = _
+  private var sk: String = _
+
+  def withEndpoint(endpoint: String): StoreConfigBuilder = {
+    this.ENDPOINT = endpoint
+    this
+  }
+
+  def withMetadataPath(metadataPath: String): StoreConfigBuilder = {
+    this.METADATA_PATH = metadataPath
+    this
+  }
+
+  def withCachePath(cachePath: String): StoreConfigBuilder = {
+    this.CACHE_PATH = cachePath
+    this
+  }
+
+  def withDiskcache(useDiskcache: Boolean): StoreConfigBuilder = {
+    this.useDiskcache = useDiskcache
+    this
+  }
+
+  def withAKSK(ak: String, sk: String): StoreConfigBuilder = {
+    require(ak != null && sk != null, "ak and sk must not be null")
+    this.ak = ak
+    this.sk = sk
+    this
+  }
+
+  def withRocksDB(useRocksDB: Boolean): StoreConfigBuilder = {
+    this.useRocksDB = useRocksDB
+    this
+  }
+
+  private def extractStorageType(typeString: String): String = {
+    if (typeString.contains("_")) {
+      typeString.split("_").head
+    } else {
+      typeString
+    }
+  }
+
+  def build(conf: SparkConf): SparkConf = {
+    val prefix = extractStorageType(diskType)
+    val disk = if (useRocksDB) s"${prefix}2" else prefix
+    val disk_cache = if (useRocksDB) s"${prefix}_cache2" else 
s"${prefix}_cache"
+    val main_disk = if (useDiskcache) disk_cache else disk
+
+    require(ENDPOINT != null, "ENDPOINT is null")
+    require(METADATA_PATH != null, "METADATA_PATH is null")
+
+    val settings = mutable.ArrayBuffer[(String, String)]()
+    settings.appendAll(
+      Seq(
+        s"storage_configuration.disks.$disk.type" -> diskType,
+        s"storage_configuration.disks.$disk.endpoint" -> s"$ENDPOINT",
+        s"storage_configuration.disks.$disk.metadata_path" -> METADATA_PATH
+      ))
+
+    if (ak != null && sk != null) {
+      settings.appendAll(
+        Seq(
+          s"storage_configuration.disks.$disk.access_key_id" -> ak,
+          s"storage_configuration.disks.$disk.secret_access_key" -> sk
+        ))
+    }
+
+    if (useDiskcache) {
+      require(CACHE_PATH != null, "CACHE_PATH is null")
+      settings.appendAll(
+        Seq(
+          s"storage_configuration.disks.$disk_cache.type" -> "cache",
+          s"storage_configuration.disks.$disk_cache.disk" -> disk,
+          s"storage_configuration.disks.$disk_cache.path" -> CACHE_PATH,
+          s"storage_configuration.disks.$disk_cache.max_size" -> "10Gi"
+        ))
+    }
+    settings.appendAll(
+      Seq(
+        s"storage_configuration.policies.$policyName.volumes" -> "main",
+        s"storage_configuration.policies.$policyName.volumes.main.disk" -> 
main_disk
+      ))
+
+    if (useRocksDB) {
+      settings.append(s"storage_configuration.disks.$disk.metadata_type" -> 
"rocksdb")
+    }
+    settings.foreach(setting => conf.setCHConfig(setting))
+    conf
+  }
+}
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index be6fb587e6..f16df23d38 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
 CH_BRANCH=rebase_ch/20250502
-CH_COMMIT=376bf9d0d91
+CH_COMMIT=83de175e411
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 84eb55f379..33f9a62f64 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -35,8 +35,8 @@
 #include <Core/ServerSettings.h>
 #include <Core/Settings.h>
 #include <DataTypes/DataTypeArray.h>
-#include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/DataTypeMap.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/DataTypeString.h>
 #include <DataTypes/DataTypeTuple.h>
 #include <DataTypes/DataTypesDecimal.h>
@@ -468,7 +468,7 @@ std::vector<String> 
BackendInitializerUtil::wrapDiskPathConfig(
     };
 
     Poco::Util::AbstractConfiguration::Keys disks;
-    std::unordered_set<String> disk_types = {"s3_gluten", "hdfs_gluten", 
"cache"};
+    std::unordered_set<String> disk_types = 
{GlutenObjectStorageConfig::S3_DISK_TYPE, 
GlutenObjectStorageConfig::HDFS_DISK_TYPE, "cache"};
     config.keys("storage_configuration.disks", disks);
 
     std::ranges::for_each(
@@ -481,7 +481,7 @@ std::vector<String> 
BackendInitializerUtil::wrapDiskPathConfig(
                 return;
             if (disk_type == "cache")
                 change_func(disk_prefix + ".path");
-            else if (disk_type == "s3_gluten" || disk_type == "hdfs_gluten")
+            else if (disk_type == GlutenObjectStorageConfig::S3_DISK_TYPE || 
disk_type == GlutenObjectStorageConfig::HDFS_DISK_TYPE)
                 change_func(disk_prefix + ".metadata_path");
         });
 
@@ -545,7 +545,8 @@ DB::Context::ConfigurationPtr 
BackendInitializerUtil::initConfig(const SparkConf
     // FIXMEX: workaround for 
https://github.com/ClickHouse/ClickHouse/pull/75452#pullrequestreview-2625467710
     // entry in DiskSelector::initialize
     // Bug in FileCacheSettings::loadFromConfig
-    auto updateCacheDiskType = [](Poco::Util::AbstractConfiguration & config) {
+    auto updateCacheDiskType = [](Poco::Util::AbstractConfiguration & config)
+    {
         const std::string config_prefix = "storage_configuration.disks";
         Poco::Util::AbstractConfiguration::Keys keys;
         config.keys(config_prefix, keys);
@@ -671,7 +672,7 @@ void BackendInitializerUtil::initSettings(const 
SparkConfigs::ConfigMap & spark_
     /// We currently do not support lazy materialization.
     /// "test 'order by' two keys" will failed if we enable it.
     settings[Setting::query_plan_optimize_lazy_materialization] = false;
-    
+
     for (const auto & [key, value] : spark_conf_map)
     {
         // Firstly apply 
spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to 
settings
@@ -762,7 +763,8 @@ void BackendInitializerUtil::initSettings(const 
SparkConfigs::ConfigMap & spark_
         {
             auto mem_gb = task_memory / static_cast<double>(1_GiB);
             // 2.8x+5, Heuristics calculate the block size of external sort, 
[8,16]
-            settings[Setting::prefer_external_sort_block_bytes] = 
std::max(std::min(static_cast<size_t>(2.8 * mem_gb + 5), 16ul), 8ul) * 1024 * 
1024;
+            settings[Setting::prefer_external_sort_block_bytes]
+                = std::max(std::min(static_cast<size_t>(2.8 * mem_gb + 5), 
16ul), 8ul) * 1024 * 1024;
         }
     }
 }
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h 
b/cpp-ch/local-engine/Common/GlutenConfig.h
index 62e0d228e0..7b2a33a9b6 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -31,7 +31,6 @@ struct ReadSettings;
 }
 namespace local_engine
 {
-
 struct SparkConfigs
 {
     using ConfigMap = google::protobuf::Map<std::string, std::string>;
@@ -196,4 +195,10 @@ struct GlutenCacheConfig
     inline static const String ENABLED = "enable.gluten_cache.local";
 };
 
+struct GlutenObjectStorageConfig
+{
+    inline static const String S3_DISK_TYPE = "s3_gluten";
+    inline static const String HDFS_DISK_TYPE = "hdfs_gluten";
+};
+
 }
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
index b4073d0f39..60a5795bc3 100644
--- 
a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
+++ 
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
@@ -15,10 +15,14 @@
  * limitations under the License.
  */
 #include "config.h"
+
 #include <Core/Settings.h>
 #include <Disks/ObjectStorages/ObjectStorageFactory.h>
+#include <Interpreters/Context.h>
+#include <Common/GlutenConfig.h>
+#include <Common/Macros.h>
+
 #if USE_AWS_S3
-#include <Disks/ObjectStorages/S3/DiskS3Utils.h>
 #include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
 #include <Disks/ObjectStorages/S3/diskSettings.h>
 #endif
@@ -27,10 +31,6 @@
 #include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
 #endif
 
-#include <Interpreters/Context.h>
-#include <Common/Macros.h>
-
-
 namespace DB
 {
 namespace Setting
@@ -50,10 +50,7 @@ namespace local_engine
 using namespace DB;
 
 #if USE_AWS_S3
-static S3::URI getS3URI(
-    const Poco::Util::AbstractConfiguration & config,
-    const std::string & config_prefix,
-    const ContextPtr & context)
+static S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config, 
const std::string & config_prefix, const ContextPtr & context)
 {
     String endpoint = 
context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
     S3::URI uri(endpoint);
@@ -65,41 +62,31 @@ static S3::URI getS3URI(
     return uri;
 }
 
-static std::string getEndpoint(
-        const Poco::Util::AbstractConfiguration & config,
-        const std::string & config_prefix,
-        const ContextPtr & context)
+static std::string
+getEndpoint(const Poco::Util::AbstractConfiguration & config, const 
std::string & config_prefix, const ContextPtr & context)
 {
     return context->getMacros()->expand(config.getString(config_prefix + 
".endpoint"));
 }
 
 void registerGlutenS3ObjectStorage(ObjectStorageFactory & factory)
 {
-    static constexpr auto disk_type = "s3_gluten";
-
     factory.registerObjectStorageType(
-        disk_type,
-        [](
-        const std::string & name,
-        const Poco::Util::AbstractConfiguration & config,
-        const std::string & config_prefix,
-        const ContextPtr & context,
-        bool /*skip_access_check*/) -> ObjectStoragePtr
+        GlutenObjectStorageConfig::S3_DISK_TYPE,
+        [](const std::string & name,
+           const Poco::Util::AbstractConfiguration & config,
+           const std::string & config_prefix,
+           const ContextPtr & context,
+           bool /*skip_access_check*/) -> ObjectStoragePtr
         {
             auto uri = getS3URI(config, config_prefix, context);
             auto s3_capabilities = getCapabilitiesFromConfig(config, 
config_prefix);
             auto endpoint = getEndpoint(config, config_prefix, context);
-            auto settings = getSettings(config, config_prefix, context, 
endpoint, /* validate_settings */true);
-            auto client = getClient(endpoint, *settings, context, /* 
for_disk_s3 */true);
+            auto settings = getSettings(config, config_prefix, context, 
endpoint, /* validate_settings */ true);
+            auto client = getClient(endpoint, *settings, context, /* 
for_disk_s3 */ true);
             auto key_generator = 
createObjectStorageKeysGeneratorAsIsWithPrefix(uri.key);
 
-            auto object_storage = std::make_shared<S3ObjectStorage>(
-                std::move(client),
-                std::move(settings),
-                uri,
-                s3_capabilities,
-                key_generator,
-                name);
+            auto object_storage
+                = std::make_shared<S3ObjectStorage>(std::move(client), 
std::move(settings), uri, s3_capabilities, key_generator, name);
             return object_storage;
         });
 }
@@ -110,13 +97,12 @@ void registerGlutenS3ObjectStorage(ObjectStorageFactory & 
factory)
 void registerGlutenHDFSObjectStorage(ObjectStorageFactory & factory)
 {
     factory.registerObjectStorageType(
-        "hdfs_gluten",
-        [](
-            const std::string & /* name */,
-            const Poco::Util::AbstractConfiguration & config,
-            const std::string & config_prefix,
-            const ContextPtr & context,
-            bool /* skip_access_check */) -> ObjectStoragePtr
+        GlutenObjectStorageConfig::HDFS_DISK_TYPE,
+        [](const std::string & /* name */,
+           const Poco::Util::AbstractConfiguration & config,
+           const std::string & config_prefix,
+           const ContextPtr & context,
+           bool /* skip_access_check */) -> ObjectStoragePtr
         {
             auto uri = 
context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
             checkHDFSURL(uri);
@@ -124,9 +110,7 @@ void registerGlutenHDFSObjectStorage(ObjectStorageFactory & 
factory)
                 throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must 
ends with '/', but '{}' doesn't.", uri);
 
             std::unique_ptr<HDFSObjectStorageSettings> settings = 
std::make_unique<HDFSObjectStorageSettings>(
-                config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 
1024),
-                context->getSettingsRef()[Setting::hdfs_replication]
-            );
+                config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 
1024), context->getSettingsRef()[Setting::hdfs_replication]);
             return std::make_shared<GlutenHDFSObjectStorage>(uri, 
std::move(settings), config);
         });
 }
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp 
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
index ce78afa169..20029a4121 100644
--- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -14,12 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "registerGlutenDisks.h"
 #include "config.h"
+
 #include <Disks/DiskFactory.h>
-#include <Interpreters/Context.h>
 #include <Disks/ObjectStorages/DiskObjectStorage.h>
 #include <Disks/ObjectStorages/MetadataStorageFactory.h>
 #include <Disks/ObjectStorages/ObjectStorageFactory.h>
+#include <Common/GlutenConfig.h>
 
 #if USE_HDFS
 #include <Disks/ObjectStorages/GlutenDiskHDFS.h>
@@ -33,8 +35,6 @@
 #include <Disks/ObjectStorages/MetadataStorageFromRocksDB.h>
 #endif
 
-#include "registerGlutenDisks.h"
-
 namespace local_engine
 {
 #if USE_AWS_S3
@@ -93,7 +93,8 @@ void registerGlutenDisks(bool global_skip_access_check)
 
 
     registerGlutenS3ObjectStorage(object_factory);
-    factory.registerDiskType("s3_gluten", creator); /// For compatibility
+
+    factory.registerDiskType(GlutenObjectStorageConfig::S3_DISK_TYPE, 
creator); /// For compatibility
 #endif
 
 #if USE_HDFS
@@ -112,7 +113,7 @@ void registerGlutenDisks(bool global_skip_access_check)
         { return DB::ObjectStorageFactory::instance().create(name, conf, 
config_prefix, ctx, skip_access_check); };
         auto object_storage = object_storage_creator(config, context);
         DB::MetadataStoragePtr metadata_storage;
-        auto metadata_type = 
DB::MetadataStorageFactory::getMetadataType(config, config_prefix,  "local");
+        auto metadata_type = 
DB::MetadataStorageFactory::getMetadataType(config, config_prefix, "local");
         if (metadata_type == "rocksdb")
         {
 #if USE_ROCKSDB
@@ -138,7 +139,7 @@ void registerGlutenDisks(bool global_skip_access_check)
     };
 
     registerGlutenHDFSObjectStorage(object_factory);
-    factory.registerDiskType("hdfs_gluten", hdfs_creator); /// For 
compatibility
+    factory.registerDiskType(GlutenObjectStorageConfig::HDFS_DISK_TYPE, 
hdfs_creator); /// For compatibility
 #endif
 }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to