This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c275b1ee1f [GLUTEN-6867][CH] Fix Bug that cann't read file on minio 
(#9332)
c275b1ee1f is described below

commit c275b1ee1fb1220fa77a8bb120372d398865ef1c
Author: Chang chen <[email protected]>
AuthorDate: Wed Apr 16 21:41:30 2025 +0800

    [GLUTEN-6867][CH] Fix Bug that cann't read file on minio (#9332)
    
    [CH] Fix Bug that cann't read file on minio
    * Introduce GlutenClickHouseCacheBaseTestSuite for testing Minio
    * create bucket before create table
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |   2 -
 .../GlutenClickHouseExcelFormatSuite.scala         |   5 +-
 ...lutenClickHouseWholeStageTransformerSuite.scala |  74 ++-----------
 .../GlutenClickHouseCacheBaseTestSuite.scala}      |  88 ++++------------
 .../cache/GlutenClickHouseHDFSSuite.scala          |  45 ++++++++
 .../cache/GlutenClickHouseMINIOSuite.scala         |  52 +++++++++
 .../GlutenClickHouseMergeTreeCacheDataSuite.scala  |  44 ++++----
 ...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala |  10 +-
 ...eMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala |  10 +-
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  |  82 ++++-----------
 .../GlutenClickHouseMergetreeWriteStatsSuite.scala |   8 +-
 .../org/apache/gluten/utils/CacheTestHelper.scala  |  66 ++++++++++++
 .../org/apache/gluten/utils/HDFSTestHelper.scala   |  82 +++++++++++++++
 .../org/apache/gluten/utils/MinioTestHelper.scala  | 117 +++++++++++++++++++++
 .../Storages/SubstraitSource/ReadBufferBuilder.cpp | 115 ++++++++++----------
 15 files changed, 501 insertions(+), 299 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 7de4e1e832..edf354b0c3 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -201,8 +201,6 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
                 fileSizes.add(JLong.valueOf(size))
                 modificationTimes.add(JLong.valueOf(time))
               case _ =>
-                fileSizes.add(0)
-                modificationTimes.add(0)
             }
 
             val otherConstantMetadataColumnValues =
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
index eb9e13e451..1c5b359dc4 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
@@ -1471,7 +1471,8 @@ class GlutenClickHouseExcelFormatSuite
      * close the hdfs file, and hence the file is not flushed.HDFS file is 
closed when LocalExecutor
      * is destroyed, but before that, the file moved by spark committer.
      */
-    val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/write_into_hdfs/"
+
+    val tablePath = hdfsHelper.getHdfsUrl(s"$SPARK_DIR_NAME/write_into_hdfs/")
     val format = "parquet"
     val sql =
       s"""
@@ -1487,7 +1488,7 @@ class GlutenClickHouseExcelFormatSuite
   // TODO: pass spark configuration to FileFormatWriter in Spark 3.3 and 3.2
   testWithMinSparkVersion("write succeed even if set wrong snappy compression 
codec level", "3.5") {
     // TODO: remove duplicated test codes
-    val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/failed_test/"
+    val tablePath = hdfsHelper.getHdfsUrl(s"$SPARK_DIR_NAME/failed_test/")
     val format = "parquet"
     val sql =
       s"""
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index 0e557bd26b..cc8b758c5b 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.clickhouse.RuntimeConfig
-import org.apache.gluten.utils.UTSystemParameters
+import org.apache.gluten.utils.{HDFSTestHelper, MinioTestHelper, 
UTSystemParameters}
 
 import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
 
@@ -41,27 +41,15 @@ class GlutenClickHouseWholeStageTransformerSuite extends 
WholeStageTransformerSu
   }
   val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")
 
-  private val TMP_PREFIX = s"/tmp/gluten/$SPARK_DIR_NAME"
+  protected val TMP_PREFIX = s"/tmp/gluten/$SPARK_DIR_NAME"
 
-  val S3_METADATA_PATH = s"$TMP_PREFIX/s3/metadata"
-  val S3_CACHE_PATH = s"$TMP_PREFIX/s3/cache"
-  val S3_ENDPOINT = "s3://127.0.0.1:9000/"
-  val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
   val BUCKET_NAME: String = SPARK_DIR_NAME
-  val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
-
-  val HDFS_METADATA_PATH = s"$TMP_PREFIX/hdfs/metadata"
-  val HDFS_CACHE_PATH = s"$TMP_PREFIX/hdfs/cache"
-  val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
-  val HDFS_URL = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME"
-
-  val S3_ACCESS_KEY = "minioadmin"
-  val S3_SECRET_KEY = "minioadmin"
+  val minioHelper = new MinioTestHelper(TMP_PREFIX)
+  val hdfsHelper = new HDFSTestHelper(TMP_PREFIX)
+  val HDFS_URL: String = hdfsHelper.getHdfsUrl(SPARK_DIR_NAME)
 
   val CH_DEFAULT_STORAGE_DIR = "/data"
 
-  val LOCAL_CACHE_PATH = s"$TMP_PREFIX/local/cache"
-
   protected def spark32: Boolean = sparkVersion.equals("3.2")
   protected def spark33: Boolean = sparkVersion.equals("3.3")
   protected def spark35: Boolean = sparkVersion.equals("3.5")
@@ -88,53 +76,11 @@ class GlutenClickHouseWholeStageTransformerSuite extends 
WholeStageTransformerSu
       .set(RuntimeConfig.PATH.key, UTSystemParameters.diskOutputDataPath)
       .set(RuntimeConfig.TMP_PATH.key, s"/tmp/libch/$SPARK_DIR_NAME")
     if (UTSystemParameters.testMergeTreeOnObjectStorage) {
-      conf
-        .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
-        .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
-        .set("spark.hadoop.fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")
-        .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
-        .set("spark.hadoop.fs.s3a.path.style.access", "true")
-        .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
-        .setCHConfig(
-          "storage_configuration.disks.s3.type" -> "s3_gluten",
-          "storage_configuration.disks.s3.endpoint" -> WHOLE_PATH,
-          "storage_configuration.disks.s3.access_key_id" -> S3_ACCESS_KEY,
-          "storage_configuration.disks.s3.secret_access_key" -> S3_SECRET_KEY,
-          "storage_configuration.disks.s3.metadata_path" -> S3_METADATA_PATH,
-          "storage_configuration.disks.s3_cache.type" -> "cache",
-          "storage_configuration.disks.s3_cache.disk" -> "s3",
-          "storage_configuration.disks.s3_cache.path" -> S3_CACHE_PATH,
-          "storage_configuration.disks.s3_cache.max_size" -> "10Gi",
-          "storage_configuration.policies.__s3_main.volumes" -> "main",
-          "storage_configuration.policies.__s3_main.volumes.main.disk" -> 
"s3_cache"
-        )
-        .setCHConfig(
-          "storage_configuration.disks.hdfs.type" -> "hdfs_gluten",
-          "storage_configuration.disks.hdfs.endpoint" -> 
s"$HDFS_URL_ENDPOINT/",
-          "storage_configuration.disks.hdfs.metadata_path" -> 
HDFS_METADATA_PATH,
-          "storage_configuration.disks.hdfs_cache.type" -> "cache",
-          "storage_configuration.disks.hdfs_cache.disk" -> "hdfs",
-          "storage_configuration.disks.hdfs_cache.path" -> HDFS_CACHE_PATH,
-          "storage_configuration.disks.hdfs_cache.max_size" -> "10Gi",
-          "storage_configuration.policies.__hdfs_main.volumes" -> "main",
-          "storage_configuration.policies.__hdfs_main.volumes.main.disk" -> 
"hdfs_cache"
-        )
-        .setCHConfig(
-          "storage_configuration.disks.hdfs2.type" -> "hdfs_gluten",
-          "storage_configuration.disks.hdfs2.endpoint" -> 
s"$HDFS_URL_ENDPOINT/",
-          "storage_configuration.disks.hdfs2.metadata_path" -> 
HDFS_METADATA_PATH,
-          "storage_configuration.disks.hdfs2.metadata_type" -> "rocksdb",
-          "storage_configuration.disks.hdfs_cache2.type" -> "cache",
-          "storage_configuration.disks.hdfs_cache2.disk" -> "hdfs2",
-          "storage_configuration.disks.hdfs_cache2.path" -> HDFS_CACHE_PATH,
-          "storage_configuration.disks.hdfs_cache2.max_size" -> "10Gi",
-          "storage_configuration.policies.__hdfs_main_rocksdb.volumes" -> 
"main",
-          
"storage_configuration.policies.__hdfs_main_rocksdb.volumes.main.disk" -> 
"hdfs_cache2"
-        )
-        .setCHConfig(
-          "hdfs.dfs_client_read_shortcircuit" -> "false",
-          "hdfs.dfs_default_replica" -> "1"
-        )
+      minioHelper.setHadoopFileSystemConfig(conf)
+      minioHelper.setObjectStoreConfig(conf, BUCKET_NAME)
+      hdfsHelper.setHDFSStoreConfig(conf)
+      hdfsHelper.setHDFSStoreConfigRocksDB(conf)
+      hdfsHelper.setHdfsClientConfig(conf)
     } else {
       conf
     }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
similarity index 67%
rename from 
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
rename to 
backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
index 31dbe38f10..bb72e34e3e 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
@@ -14,34 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.execution.tpch
+package org.apache.gluten.execution.cache
 
-import org.apache.gluten.backendsapi.clickhouse.CHConfig
 import org.apache.gluten.backendsapi.clickhouse.CHConfig._
-import org.apache.gluten.execution.{CHNativeCacheManager, 
FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}
+import org.apache.gluten.execution.{FileSourceScanExecTransformer, 
GlutenClickHouseTPCHAbstractSuite}
+import org.apache.gluten.utils.CacheTestHelper
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 
 import org.apache.commons.io.IOUtils
-import org.apache.hadoop.fs.{FileUtil, Path}
+import org.apache.hadoop.fs.Path
 
 import java.nio.charset.Charset
 
-class GlutenClickHouseHDFSSuite
+abstract class GlutenClickHouseCacheBaseTestSuite
   extends GlutenClickHouseTPCHAbstractSuite
   with AdaptiveSparkPlanHelper {
-
-  override protected val tablesPath: String = HDFS_URL_ENDPOINT + "/tpch-data"
+  // Common paths
   override protected val tpchQueries: String =
     rootPath + 
"../../../../tools/gluten-it/common/src/main/resources/tpch-queries"
   override protected val queriesResults: String = rootPath + "queries-output"
 
-  private val cache_name = "gluten_cache"
+  // Abstract methods to be implemented by subclasses
+  protected def cleanupCache(): Unit = cacheHelper.deleteCache(spark, 
tablesPath)
+  protected def copyDataIfNeeded(): Unit
+
+  // Initialize the cache helper - accessible to subclasses
+  protected val cacheHelper = new CacheTestHelper(TMP_PREFIX)
 
   /** Run Gluten + ClickHouse Backend with SortShuffleManager */
   override protected def sparkConf: SparkConf = {
-    super.sparkConf
+    val conf = super.sparkConf
       .set("spark.shuffle.manager", "sort")
       .set("spark.io.compression.codec", "snappy")
       .set("spark.sql.shuffle.partitions", "5")
@@ -49,19 +53,10 @@ class GlutenClickHouseHDFSSuite
       .set("spark.sql.adaptive.enabled", "true")
       .setCHConfig("use_local_format", true)
       .set(prefixOf("shuffle.hash.algorithm"), "sparkMurmurHash3_32")
-      .set(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key, "true")
-      .setCHConfig("gluten_cache.local.name", cache_name)
-      .setCHConfig("gluten_cache.local.path", LOCAL_CACHE_PATH)
-      .setCHConfig("gluten_cache.local.max_size", "10Gi")
-      // If reuse_disk_cache is set to false,the cache will be deleted in 
JNI_OnUnload
-      // but CacheManager and JobScheduler of backend are static global 
variables
-      // and is destroyed at the end of the program which causes backend 
reporting logical errors.
-      // TODO: fix reuse_disk_cache
-      .setCHConfig("reuse_disk_cache", "true")
       .set("spark.sql.adaptive.enabled", "false")
 
-    // TODO: spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm =>
-    //     CHConf.prefixOf("shuffle.hash.algorithm")
+    // Apply cache configuration using the helper
+    cacheHelper.setCacheConfig(conf)
   }
 
   override protected def createTPCHNotNullTables(): Unit = {
@@ -70,55 +65,20 @@ class GlutenClickHouseHDFSSuite
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    val targetFile = new Path(s"$tablesPath/lineitem")
-    val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
-    val existed = fs.exists(targetFile)
-    // If the 'lineitem' directory doesn't exist in HDFS,
-    // upload the 'lineitem' data from the local system.
-    if (!existed) {
-      val localDataDir = new Path(s"$absoluteParquetPath/lineitem")
-      val localFs = 
localDataDir.getFileSystem(spark.sessionState.newHadoopConf())
-      FileUtil.copy(
-        localFs,
-        localDataDir,
-        fs,
-        targetFile,
-        false,
-        true,
-        spark.sessionState.newHadoopConf())
-    }
+    copyDataIfNeeded()
   }
 
   override protected def beforeEach(): Unit = {
     super.beforeEach()
-    deleteCache()
+    cleanupCache()
   }
 
   override protected def afterAll(): Unit = {
-    deleteCache()
-    super.afterEach()
-  }
-
-  private def deleteCache(): Unit = {
-    val targetFile = new Path(tablesPath)
-    val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
-    fs.listStatus(targetFile)
-      .foreach(
-        table => {
-          if (table.isDirectory) {
-            fs.listStatus(table.getPath)
-              .foreach(
-                data => {
-                  if (data.isFile) {
-                    CHNativeCacheManager
-                      .removeFiles(data.getPath.toUri.getPath.substring(1), 
cache_name)
-                  }
-                })
-          }
-        })
+    cleanupCache()
+    super.afterAll()
   }
 
-  val runWithoutCache: () => Unit = () => {
+  def runWithoutCache(): Unit = {
     runTPCHQuery(6) {
       df =>
         val plans = df.queryExecution.executedPlan.collect {
@@ -129,7 +89,7 @@ class GlutenClickHouseHDFSSuite
     }
   }
 
-  val runWithCache: () => Unit = () => {
+  def runWithCache(): Unit = {
     runTPCHQuery(6) {
       df =>
         val plans = df.queryExecution.executedPlan.collect {
@@ -141,15 +101,13 @@ class GlutenClickHouseHDFSSuite
     }
   }
 
-  test("test hdfs cache") {
+  test("test cache") {
     runWithoutCache()
     runWithCache()
   }
 
   test("test cache file command") {
-    runSql(
-      s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
-      noFallBack = false) { _ => }
+    runSql(s"CACHE FILES select * from '$tablesPath/lineitem'", noFallBack = 
false) { _ => }
     runWithCache()
   }
 
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
new file mode 100644
index 0000000000..f07fd61678
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.cache
+
+import org.apache.hadoop.fs.FileUtil
+import org.apache.hadoop.fs.Path
+
+class GlutenClickHouseHDFSSuite extends GlutenClickHouseCacheBaseTestSuite {
+
+  override protected val tablesPath: String = 
hdfsHelper.getHdfsUrl("tpch-data")
+
+  override protected def copyDataIfNeeded(): Unit = {
+    val targetFile = new Path(s"$tablesPath/lineitem")
+    val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+    val existed = fs.exists(targetFile)
+    // If the 'lineitem' directory doesn't exist in HDFS,
+    // upload the 'lineitem' data from the local system.
+    if (!existed) {
+      val localDataDir = new Path(s"$absoluteParquetPath/lineitem")
+      val localFs = 
localDataDir.getFileSystem(spark.sessionState.newHadoopConf())
+      FileUtil.copy(
+        localFs,
+        localDataDir,
+        fs,
+        targetFile,
+        false,
+        true,
+        spark.sessionState.newHadoopConf())
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseMINIOSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseMINIOSuite.scala
new file mode 100644
index 0000000000..cf4ecf6fd0
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseMINIOSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.cache
+
+import org.apache.hadoop.fs.{FileUtil, Path}
+
+class GlutenClickHouseMINIOSuite extends GlutenClickHouseCacheBaseTestSuite {
+
+  private val BUCKET = "tpch-data"
+  override protected val tablesPath: String = s"s3a://$BUCKET"
+
+  override protected def copyDataIfNeeded(): Unit = {
+    val targetFile = new Path(s"$tablesPath/lineitem")
+    val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+    val existed = fs.exists(targetFile)
+    // If the 'lineitem' directory doesn't exist in Minio,
+    // upload the 'lineitem' data from the local system.
+    if (!existed) {
+      val localDataDir = new Path(s"$absoluteParquetPath/lineitem")
+      val localFs = 
localDataDir.getFileSystem(spark.sessionState.newHadoopConf())
+      FileUtil.copy(
+        localFs,
+        localDataDir,
+        fs,
+        targetFile,
+        false,
+        true,
+        spark.sessionState.newHadoopConf())
+    }
+  }
+
+  override def beforeAll(): Unit = {
+    if (!minioHelper.bucketExists(BUCKET)) {
+      minioHelper.createBucket(BUCKET)
+    }
+    super.beforeAll()
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
index 7db265de80..16f565637b 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.delta.files.TahoeFileIndex
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 
-import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 
@@ -69,10 +68,8 @@ class GlutenClickHouseMergeTreeCacheDataSuite
     conf.set("fs.defaultFS", HDFS_URL)
     val fs = FileSystem.get(conf)
     fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
-    FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
-    FileUtils.forceMkdir(new File(HDFS_CACHE_PATH))
+    hdfsHelper.resetMeta()
+    hdfsHelper.resetCache()
   }
 
   def countFiles(directory: File): Int = {
@@ -123,9 +120,8 @@ class GlutenClickHouseMergeTreeCacheDataSuite
                  | select * from lineitem a
                  | where a.l_shipdate between date'1995-01-01' and 
date'1995-01-31'
                  |""".stripMargin)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
-    val dataPath = new File(HDFS_CACHE_PATH)
+    hdfsHelper.resetMeta()
+    val dataPath = new File(hdfsHelper.HDFS_CACHE_PATH)
     val initial_cache_files = countFiles(dataPath)
 
     val res = spark
@@ -137,7 +133,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
               |                aaa='ccc')""".stripMargin)
       .collect()
     assertResult(true)(res(0).getBoolean(0))
-    val metaPath = new 
File(s"$HDFS_METADATA_PATH/$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
+    val metaPath = new 
File(hdfsHelper.metaPath(s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs"))
     assertResult(true)(metaPath.exists() && metaPath.isDirectory)
     assertResult(22)(metaPath.list().length)
     assert(countFiles(dataPath) > initial_cache_files)
@@ -227,9 +223,8 @@ class GlutenClickHouseMergeTreeCacheDataSuite
                  | select * from lineitem a
                  | where a.l_shipdate between date'1995-01-01' and 
date'1995-01-31'
                  |""".stripMargin)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
-    val dataPath = new File(HDFS_CACHE_PATH)
+    hdfsHelper.resetMeta()
+    val dataPath = new File(hdfsHelper.HDFS_CACHE_PATH)
     val initial_cache_files = countFiles(dataPath)
 
     val res = spark
@@ -241,7 +236,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
               |                aaa='ccc')""".stripMargin)
       .collect()
     assertResult(true)(res(0).getBoolean(0))
-    val metaPath = new 
File(s"$HDFS_METADATA_PATH/$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
+    val metaPath = new 
File(hdfsHelper.metaPath(s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs"))
     assertResult(true)(metaPath.exists() && metaPath.isDirectory)
     eventually(timeout(60.seconds), interval(2.seconds)) {
       assertResult(22)(metaPath.list().length)
@@ -335,9 +330,8 @@ class GlutenClickHouseMergeTreeCacheDataSuite
                  | select * from lineitem a
                  | where a.l_shipdate between date'1995-01-01' and 
date'1995-01-31'
                  |""".stripMargin)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
-    val dataPath = new File(HDFS_CACHE_PATH)
+    hdfsHelper.resetMeta()
+    val dataPath = new File(hdfsHelper.HDFS_CACHE_PATH)
     val initial_cache_files = countFiles(dataPath)
 
     val res = spark
@@ -349,7 +343,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
               |                aaa='ccc')""".stripMargin)
       .collect()
     assertResult(true)(res(0).getBoolean(0))
-    val metaPath = new 
File(s"$HDFS_METADATA_PATH/$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
+    val metaPath = new 
File(hdfsHelper.metaPath(s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs"))
     assertResult(true)(metaPath.exists() && metaPath.isDirectory)
     assertResult(22)(metaPath.list().length)
     assert(countFiles(dataPath) > initial_cache_files)
@@ -437,12 +431,11 @@ class GlutenClickHouseMergeTreeCacheDataSuite
                  | select * from lineitem a
                  | where a.l_shipdate between date'1995-01-01' and 
date'1995-01-31'
                  |""".stripMargin)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
-    val dataPath = new File(HDFS_CACHE_PATH)
+    hdfsHelper.resetMeta()
+    val dataPath = new File(hdfsHelper.HDFS_CACHE_PATH)
     val initial_cache_files = countFiles(dataPath)
 
-    val metaPath = new 
File(s"$HDFS_METADATA_PATH/$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
+    val metaPath = new 
File(hdfsHelper.metaPath(s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs"))
     val res1 = spark.sql(s"cache data select * from 
lineitem_mergetree_hdfs").collect()
     assertResult(true)(res1(0).getBoolean(0))
     assertResult(1)(metaPath.list().length)
@@ -528,9 +521,8 @@ class GlutenClickHouseMergeTreeCacheDataSuite
                  | select * from lineitem a
                  | where a.l_shipdate between date'1995-01-01' and 
date'1995-01-31'
                  |""".stripMargin)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
-    val dataPath = new File(HDFS_CACHE_PATH)
+    hdfsHelper.resetMeta()
+    val dataPath = new File(hdfsHelper.HDFS_CACHE_PATH)
     val initial_cache_files = countFiles(dataPath)
 
     val res = spark
@@ -542,7 +534,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
               |                aaa='ccc')""".stripMargin)
       .collect()
     assertResult(true)(res(0).getBoolean(0))
-    val metaPath = new 
File(s"$HDFS_METADATA_PATH/$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
+    val metaPath = new 
File(hdfsHelper.metaPath(s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs"))
     assertResult(true)(metaPath.exists() && metaPath.isDirectory)
     assertResult(22)(metaPath.list().length)
     assert(countFiles(dataPath) > initial_cache_files)
@@ -597,7 +589,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
   test("test disable cache files return") {
     withSQLConf(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key -> "false") {
       runSql(
-        s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
+        s"CACHE FILES select * from 
'${hdfsHelper.getHdfsUrl("tpch-data/lineitem")}'",
         noFallBack = false) {
         df =>
           val res = df.collect()
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index faffe19136..7da18478ba 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -29,12 +29,9 @@ import 
org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 import org.apache.spark.sql.types._
 
-import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
-import java.io.File
-
 import scala.concurrent.duration.DurationInt
 
 class GlutenClickHouseMergeTreeWriteOnHDFSSuite
@@ -72,13 +69,12 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
     conf.set("fs.defaultFS", HDFS_URL)
     val fs = FileSystem.get(conf)
     fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+    hdfsHelper.resetMeta()
   }
 
   override protected def afterEach(): Unit = {
     super.afterEach()
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    hdfsHelper.resetMeta()
   }
 
   test("test mergetree table write") {
@@ -115,7 +111,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
                  | insert into table lineitem_mergetree_hdfs
                  | select * from lineitem
                  |""".stripMargin)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    hdfsHelper.resetMeta()
 
     runTPCHQueryBySQL(1, q1("lineitem_mergetree_hdfs")) {
       df =>
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
index bc93cdb0f4..cc20ddc5f8 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
@@ -28,12 +28,9 @@ import 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 
-import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
-import java.io.File
-
 import scala.concurrent.duration.DurationInt
 
 class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
@@ -71,13 +68,12 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
     conf.set("fs.defaultFS", HDFS_URL)
     val fs = FileSystem.get(conf)
     fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+    hdfsHelper.resetMeta()
   }
 
   override protected def afterEach(): Unit = {
     super.afterEach()
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    hdfsHelper.resetMeta()
   }
 
   test("test mergetree table write") {
@@ -114,7 +110,7 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
                  | insert into table lineitem_mergetree_hdfs
                  | select * from lineitem
                  |""".stripMargin)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    hdfsHelper.resetMeta()
 
     runTPCHQueryBySQL(1, q1("lineitem_mergetree_hdfs")) {
       df =>
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index b840f03eee..50a32e3e44 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -28,12 +28,7 @@ import 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 
-import io.minio.{BucketExistsArgs, ListObjectsArgs, MakeBucketArgs, 
MinioClient, RemoveBucketArgs, RemoveObjectsArgs}
-import io.minio.messages.DeleteObject
-import org.apache.commons.io.FileUtils
-
 import java.io.File
-import java.util
 
 import scala.concurrent.duration.DurationInt
 
@@ -47,12 +42,6 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
   override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
   override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
 
-  private val client = MinioClient
-    .builder()
-    .endpoint(MINIO_ENDPOINT)
-    .credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
-    .build()
-
   override protected def createTPCHNotNullTables(): Unit = {
     createNotNullTPCHTablesInParquet(tablesPath)
   }
@@ -71,27 +60,16 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
 
   override protected def beforeEach(): Unit = {
     super.beforeEach()
-    if 
(client.bucketExists(BucketExistsArgs.builder().bucket(BUCKET_NAME).build())) {
-      val results =
-        
client.listObjects(ListObjectsArgs.builder().bucket(BUCKET_NAME).recursive(true).build())
-      val objects = new util.LinkedList[DeleteObject]()
-      results.forEach(
-        obj => {
-          objects.add(new DeleteObject(obj.get().objectName()))
-        })
-      val removeResults = client.removeObjects(
-        
RemoveObjectsArgs.builder().bucket(BUCKET_NAME).objects(objects).build())
-      removeResults.forEach(result => result.get().message())
-      
client.removeBucket(RemoveBucketArgs.builder().bucket(BUCKET_NAME).build())
+    if (minioHelper.bucketExists(BUCKET_NAME)) {
+      minioHelper.clearBucket(BUCKET_NAME)
     }
-    client.makeBucket(MakeBucketArgs.builder().bucket(BUCKET_NAME).build())
-    FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
-    FileUtils.forceMkdir(new File(S3_METADATA_PATH))
+    minioHelper.createBucket(BUCKET_NAME)
+    minioHelper.resetMeta()
   }
 
   override protected def afterEach(): Unit = {
     super.afterEach()
-    FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+    minioHelper.resetMeta()
   }
 
   test("test mergetree table write") {
@@ -128,7 +106,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
                  | insert into table lineitem_mergetree_s3
                  | select * from lineitem
                  |""".stripMargin)
-    FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+    minioHelper.resetMeta()
 
     runTPCHQueryBySQL(1, q1("lineitem_mergetree_s3")) {
       df =>
@@ -160,41 +138,25 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
   }
 
   private def verifyS3CompactFileExist(table: String): Unit = {
-    val args = ListObjectsArgs
-      .builder()
-      .bucket(BUCKET_NAME)
-      .recursive(true)
-      .prefix(table)
-      .build()
-    var objectCount: Int = 0
-    var metadataGlutenExist: Boolean = false
-    var metadataBinExist: Boolean = false
-    var dataBinExist: Boolean = false
+    val objectNames = minioHelper.listObjects(BUCKET_NAME, table)
+    var metadataGlutenExist = false
+    var metadataBinExist = false
+    var dataBinExist = false
     var hasCommits = false
-    client
-      .listObjects(args)
-      .forEach(
-        obj => {
-          objectCount += 1
-          val objectName = obj.get().objectName()
-          if (objectName.contains("metadata.gluten")) {
-            metadataGlutenExist = true
-          } else if (objectName.contains("part_meta.gluten")) {
-            metadataBinExist = true
-          } else if (objectName.contains("part_data.gluten")) {
-            dataBinExist = true
-          } else if (objectName.contains("_commits")) {
-            // Spark 35 has _commits directory
-            // table/_delta_log/_commits/
-            hasCommits = true
-          }
-        })
+
+    objectNames.foreach {
+      objectName =>
+        if (objectName.contains("metadata.gluten")) metadataGlutenExist = true
+        else if (objectName.contains("part_meta.gluten")) metadataBinExist = 
true
+        else if (objectName.contains("part_data.gluten")) dataBinExist = true
+        else if (objectName.contains("_commits")) hasCommits = true
+    }
 
     if (isSparkVersionGE("3.5")) {
-      assertResult(6)(objectCount)
+      assertResult(6)(objectNames.size)
       assert(hasCommits)
     } else {
-      assertResult(5)(objectCount)
+      assertResult(5)(objectNames.size)
     }
 
     assert(metadataGlutenExist)
@@ -615,7 +577,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
                  | select * from lineitem
                  |""".stripMargin)
 
-    FileUtils.forceDelete(new File(S3_METADATA_PATH))
+    minioHelper.resetMeta()
 
     
withSQLConf(CHConfig.runtimeSettings("enabled_driver_filter_mergetree_index") 
-> "true") {
       runTPCHQueryBySQL(6, q6(tableName)) {
@@ -675,7 +637,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
                  | select /*+ REPARTITION(3) */ * from lineitem
                  |""".stripMargin)
 
-    FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+    minioHelper.resetMeta()
     spark.sql("optimize lineitem_mergetree_bucket_s3")
     spark.sql("drop table lineitem_mergetree_bucket_s3")
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
index cd8e07dd01..5a918ae694 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
@@ -30,12 +30,9 @@ import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.functions.input_file_name
 
-import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 
-import java.io.File
-
 class GlutenClickHouseMergetreeWriteStatsSuite
   extends GlutenClickHouseTPCDSAbstractSuite
   with AdaptiveSparkPlanHelper {
@@ -65,13 +62,12 @@ class GlutenClickHouseMergetreeWriteStatsSuite
     conf.set("fs.defaultFS", HDFS_URL)
     val fs = FileSystem.get(conf)
     fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+    hdfsHelper.resetMeta()
   }
 
   override protected def afterEach(): Unit = {
     super.afterEach()
-    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    hdfsHelper.resetMeta()
   }
 
   def tpcdsMergetreeTables: Map[String, String] = {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
new file mode 100644
index 0000000000..6cdc16e2a1
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.backendsapi.clickhouse.CHConfig
+import org.apache.gluten.backendsapi.clickhouse.CHConfig._
+import org.apache.gluten.execution.CHNativeCacheManager
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+
+import org.apache.hadoop.fs.Path
+
+class CacheTestHelper(val TMP_PREFIX: String) {
+
+  val LOCAL_CACHE_PATH = s"$TMP_PREFIX/local/cache"
+  val CACHE_NAME = "gluten_cache"
+
+  /** Configure SparkConf with cache-related settings */
+  def setCacheConfig(conf: SparkConf): SparkConf = {
+    conf
+      .set(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key, "true")
+      .setCHConfig("gluten_cache.local.name", CACHE_NAME)
+      .setCHConfig("gluten_cache.local.path", LOCAL_CACHE_PATH)
+      .setCHConfig("gluten_cache.local.max_size", "10Gi")
+      // If reuse_disk_cache is set to false,the cache will be deleted in 
JNI_OnUnload
+      // but CacheManager and JobScheduler of backend are static global 
variables
+      // and is destroyed at the end of the program which causes backend 
reporting logical errors.
+      // TODO: fix reuse_disk_cache
+      .setCHConfig("reuse_disk_cache", "true")
+  }
+
+  /** Delete cache files for all tables in the data path */
+  def deleteCache(spark: SparkSession, dataPath: String): Unit = {
+    val targetFile = new Path(dataPath)
+    val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+    fs.listStatus(targetFile)
+      .foreach(
+        table => {
+          if (table.isDirectory) {
+            fs.listStatus(table.getPath)
+              .foreach(
+                data => {
+                  if (data.isFile) {
+                    CHNativeCacheManager
+                      .removeFiles(data.getPath.toUri.getPath.substring(1), 
CACHE_NAME)
+                  }
+                })
+          }
+        })
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
new file mode 100644
index 0000000000..6aa872146e
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+
+import org.apache.spark.SparkConf
+
+import org.apache.commons.io.FileUtils
+
+import java.io.File
+
+class HDFSTestHelper(TMP_PREFIX: String) {
+
+  // HDFS parameters
+  val HDFS_CACHE_PATH = s"$TMP_PREFIX/hdfs/cache"
+  private val HDFS_METADATA_PATH = s"$TMP_PREFIX/hdfs/metadata"
+  private val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
+
+  def getHdfsUrl(dirName: String): String = s"$HDFS_URL_ENDPOINT/$dirName"
+  def metaPath(dirName: String): String = s"$HDFS_METADATA_PATH/$dirName"
+
+  def setHDFSStoreConfig(conf: SparkConf): SparkConf = {
+    conf.setCHConfig(
+      "storage_configuration.disks.hdfs.type" -> "hdfs_gluten",
+      "storage_configuration.disks.hdfs.endpoint" -> s"$HDFS_URL_ENDPOINT/",
+      "storage_configuration.disks.hdfs.metadata_path" -> HDFS_METADATA_PATH,
+      "storage_configuration.disks.hdfs_cache.type" -> "cache",
+      "storage_configuration.disks.hdfs_cache.disk" -> "hdfs",
+      "storage_configuration.disks.hdfs_cache.path" -> HDFS_CACHE_PATH,
+      "storage_configuration.disks.hdfs_cache.max_size" -> "10Gi",
+      "storage_configuration.policies.__hdfs_main.volumes" -> "main",
+      "storage_configuration.policies.__hdfs_main.volumes.main.disk" -> 
"hdfs_cache"
+    )
+  }
+
+  def setHDFSStoreConfigRocksDB(conf: SparkConf): SparkConf = {
+    conf.setCHConfig(
+      "storage_configuration.disks.hdfs2.type" -> "hdfs_gluten",
+      "storage_configuration.disks.hdfs2.endpoint" -> s"$HDFS_URL_ENDPOINT/",
+      "storage_configuration.disks.hdfs2.metadata_path" -> HDFS_METADATA_PATH,
+      "storage_configuration.disks.hdfs2.metadata_type" -> "rocksdb",
+      "storage_configuration.disks.hdfs_cache2.type" -> "cache",
+      "storage_configuration.disks.hdfs_cache2.disk" -> "hdfs2",
+      "storage_configuration.disks.hdfs_cache2.path" -> HDFS_CACHE_PATH,
+      "storage_configuration.disks.hdfs_cache2.max_size" -> "10Gi",
+      "storage_configuration.policies.__hdfs_main_rocksdb.volumes" -> "main",
+      "storage_configuration.policies.__hdfs_main_rocksdb.volumes.main.disk" 
-> "hdfs_cache2"
+    )
+  }
+
+  def setHdfsClientConfig(conf: SparkConf): SparkConf = {
+    conf.setCHConfig(
+      "hdfs.dfs_client_read_shortcircuit" -> "false",
+      "hdfs.dfs_default_replica" -> "1"
+    )
+  }
+
+  def resetMeta(): Unit = {
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+  }
+
+  def resetCache(): Unit = {
+    FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
+    FileUtils.forceMkdir(new File(HDFS_CACHE_PATH))
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
new file mode 100644
index 0000000000..b106734e33
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+
+import org.apache.spark.SparkConf
+
+import io.minio.{BucketExistsArgs, ListObjectsArgs, MakeBucketArgs, 
MinioClient, RemoveBucketArgs, RemoveObjectsArgs}
+import io.minio.messages.DeleteObject
+import org.apache.commons.io.FileUtils
+
+import java.io.File
+import java.util
+
+import scala.jdk.CollectionConverters._
+
+class MinioTestHelper(TMP_PREFIX: String) {
+
+  // MINIO parameters
+  val MINIO_ENDPOINT: String = "http://127.0.0.1:9000/";
+  val S3_ACCESS_KEY = "minioadmin"
+  val S3_SECRET_KEY = "minioadmin"
+
+  // Object Store parameters
+  val S3_METADATA_PATH = s"$TMP_PREFIX/s3/metadata"
+  val S3_CACHE_PATH = s"$TMP_PREFIX/s3/cache"
+  val S3A_ENDPOINT = "s3a://"
+
+  private lazy val client = MinioClient
+    .builder()
+    .endpoint(MINIO_ENDPOINT)
+    .credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
+    .build()
+
+  def bucketExists(bucketName: String): Boolean = {
+    client.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build())
+  }
+
+  def clearBucket(bucketName: String): Unit = {
+    val results = client.listObjects(
+      ListObjectsArgs
+        .builder()
+        .bucket(bucketName)
+        .recursive(true)
+        .build())
+    val objects = new util.LinkedList[DeleteObject]()
+    results.forEach(obj => objects.add(new 
DeleteObject(obj.get().objectName())))
+    val removeResults = client
+      .removeObjects(
+        RemoveObjectsArgs
+          .builder()
+          .bucket(bucketName)
+          .objects(objects)
+          .build())
+    removeResults.forEach(result => result.get().message())
+    client.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build())
+  }
+
+  def createBucket(bucketName: String): Unit = {
+    client.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build())
+  }
+
+  def listObjects(bucketName: String, prefix: String): Iterable[String] = {
+    val args = 
ListObjectsArgs.builder().bucket(bucketName).recursive(true).prefix(prefix).build()
+    val objectNames = new util.ArrayList[String]()
+    client.listObjects(args).forEach(obj => 
objectNames.add(obj.get().objectName()))
+    objectNames.asScala
+  }
+
+  def setHadoopFileSystemConfig(conf: SparkConf): SparkConf = {
+    conf
+      .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
+      .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
+      .set("spark.hadoop.fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")
+      .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
+      .set("spark.hadoop.fs.s3a.path.style.access", "true")
+      .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
+  }
+
+  def setObjectStoreConfig(conf: SparkConf, BUCKET_NAME: String): SparkConf = {
+    val wholePath: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
+    conf
+      .setCHConfig(
+        "storage_configuration.disks.s3.type" -> "s3_gluten",
+        "storage_configuration.disks.s3.endpoint" -> wholePath,
+        "storage_configuration.disks.s3.access_key_id" -> S3_ACCESS_KEY,
+        "storage_configuration.disks.s3.secret_access_key" -> S3_SECRET_KEY,
+        "storage_configuration.disks.s3.metadata_path" -> S3_METADATA_PATH,
+        "storage_configuration.disks.s3_cache.type" -> "cache",
+        "storage_configuration.disks.s3_cache.disk" -> "s3",
+        "storage_configuration.disks.s3_cache.path" -> S3_CACHE_PATH,
+        "storage_configuration.disks.s3_cache.max_size" -> "10Gi",
+        "storage_configuration.policies.__s3_main.volumes" -> "main",
+        "storage_configuration.policies.__s3_main.volumes.main.disk" -> 
"s3_cache"
+      )
+  }
+
+  def resetMeta(): Unit = {
+    FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+    FileUtils.forceMkdir(new File(S3_METADATA_PATH))
+  }
+}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 4ac155a9bf..dadb852442 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -82,11 +82,11 @@ extern const SettingsUInt64 max_read_buffer_size;
 }
 namespace ErrorCodes
 {
-    extern const int BAD_ARGUMENTS;
-    extern const int CANNOT_OPEN_FILE;
-    extern const int UNKNOWN_FILE_SIZE;
-    extern const int CANNOT_SEEK_THROUGH_FILE;
-    extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
+extern const int BAD_ARGUMENTS;
+extern const int CANNOT_OPEN_FILE;
+extern const int UNKNOWN_FILE_SIZE;
+extern const int CANNOT_SEEK_THROUGH_FILE;
+extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
 }
 
 namespace FileCacheSetting
@@ -219,7 +219,7 @@ adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer> 
read_buffer, const s
     if (dynamic_cast<DB::ReadBufferFromHDFS *>(read_buffer.get()) || 
dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(read_buffer.get())
         || dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
         read_buffer = 
std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
-#else 
+#else
     if (dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
         read_buffer = 
std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
 #endif
@@ -236,8 +236,7 @@ public:
 
     bool isRemote() const override { return false; }
 
-    std::unique_ptr<DB::ReadBuffer>
-    build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) 
override
+    std::unique_ptr<DB::ReadBuffer> build(const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
     {
         Poco::URI file_uri(file_info.uri_file());
         std::unique_ptr<DB::ReadBufferFromFileBase> read_buffer;
@@ -258,13 +257,14 @@ public:
 #if USE_HDFS
 class HDFSFileReadBufferBuilder : public ReadBufferBuilder
 {
-    using ReadBufferCreator = 
std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek, 
const DB::StoredObject & object)>;
+    using ReadBufferCreator
+        = std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool 
restricted_seek, const DB::StoredObject & object)>;
+
 public:
     explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : 
ReadBufferBuilder(context_), context(context_) { }
     ~HDFSFileReadBufferBuilder() override = default;
 
-    std::unique_ptr<DB::ReadBuffer>
-    build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) 
override
+    std::unique_ptr<DB::ReadBuffer> build(const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
     {
         DB::ReadSettings read_settings = getReadSettings();
         const auto & config = context->getConfigRef();
@@ -304,13 +304,13 @@ public:
             bool use_async_prefetch
                 = read_settings.remote_fs_prefetch && thread_pool_read && 
(file_info.has_text() || file_info.has_json());
             auto raw_read_buffer = std::make_unique<ReadBufferFromHDFS>(
-                    hdfs_uri,
-                    hdfs_file_path,
-                    config,
-                    read_settings,
-                    /* read_until_position */ 0,
-                    /* use_external_buffer */ false,
-                    file_size);
+                hdfs_uri,
+                hdfs_file_path,
+                config,
+                read_settings,
+                /* read_until_position */ 0,
+                /* use_external_buffer */ false,
+                file_size);
 
             if (use_async_prefetch)
                 read_buffer = std::make_unique<AsynchronousReadBufferFromHDFS>(
@@ -338,15 +338,15 @@ public:
 
             ReadBufferCreator read_buffer_creator
                 = [this, hdfs_uri = hdfs_uri, hdfs_file_path = hdfs_file_path, 
read_settings, &config](
-                      bool /* restricted_seek */, const DB::StoredObject & 
object) -> std::unique_ptr<DB::ReadBufferFromHDFS> {
+                      bool /* restricted_seek */, const DB::StoredObject & 
object) -> std::unique_ptr<DB::ReadBufferFromHDFS>
+            {
                 return std::make_unique<DB::ReadBufferFromHDFS>(
                     hdfs_uri, hdfs_file_path, config, read_settings, 0, true, 
object.bytes_size);
             };
 
             auto remote_path = uri.getPath().substr(1);
             DB::StoredObjects stored_objects{DB::StoredObject{remote_path, "", 
*file_size}};
-            auto cache_creator = wrapWithCache(
-                read_buffer_creator, read_settings, remote_path, 
*modified_time, *file_size);
+            auto cache_creator = wrapWithCache(read_buffer_creator, 
read_settings, remote_path, *modified_time, *file_size);
             size_t buffer_size = 
std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
             if (*file_size > 0)
                 buffer_size = std::min(*file_size, buffer_size);
@@ -391,13 +391,12 @@ public:
 
     ~S3FileReadBufferBuilder() override = default;
 
-    std::unique_ptr<DB::ReadBuffer>
-    build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) 
override
+    std::unique_ptr<DB::ReadBuffer> build(const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
     {
         DB::ReadSettings read_settings = getReadSettings();
         Poco::URI file_uri(file_info.uri_file());
         // file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet
-        const std::string& bucket = file_uri.getHost();
+        const std::string & bucket = file_uri.getHost();
         const auto client = getClient(bucket);
         std::string pathKey = file_uri.getPath().substr(1);
 
@@ -410,25 +409,25 @@ public:
         }
         else
         {
-            DB::S3::ObjectInfo object_info =  DB::S3::getObjectInfo(*client, 
bucket, pathKey, "");
+            DB::S3::ObjectInfo object_info = DB::S3::getObjectInfo(*client, 
bucket, pathKey, "");
             object_size = object_info.size;
             object_modified_time = object_info.last_modification_time;
         }
 
-        auto read_buffer_creator
-            = [bucket, client, read_settings, this](bool restricted_seek, 
const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
+        auto read_buffer_creator = [bucket, client, read_settings, this](
+                                       bool restricted_seek, const 
DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
         {
-                return std::make_unique<DB::ReadBufferFromS3>(
-                    client,
-                    bucket,
-                    object.remote_path,
-                    "",
-                    DB::S3::S3RequestSettings(),
-                    read_settings,
-                    /* use_external_buffer */ true,
-                    /* offset */ 0,
-                    /* read_until_position */ 0,
-                    restricted_seek);
+            return std::make_unique<DB::ReadBufferFromS3>(
+                client,
+                bucket,
+                object.remote_path,
+                "",
+                DB::S3::S3RequestSettings(),
+                read_settings,
+                /* use_external_buffer */ true,
+                /* offset */ 0,
+                /* read_until_position */ 0,
+                restricted_seek);
         };
 
         auto cache_creator = wrapWithCache(read_buffer_creator, read_settings, 
pathKey, object_modified_time, object_size);
@@ -441,8 +440,8 @@ public:
         size_t buffer_size = 
std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
         if (object_size > 0)
             buffer_size = std::min(object_size, buffer_size);
-        auto async_reader
-            = 
std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl), 
pool_reader, read_settings, 
buffer_size,read_settings.remote_read_min_bytes_for_seek);
+        auto async_reader = 
std::make_unique<DB::AsynchronousBoundedReadBuffer>(
+            std::move(s3_impl), pool_reader, read_settings, buffer_size, 
read_settings.remote_read_min_bytes_for_seek);
         if (read_settings.remote_fs_prefetch)
             async_reader->prefetch(Priority{});
 
@@ -526,7 +525,8 @@ private:
 
         String config_prefix = "s3";
         auto endpoint = getSetting(settings, bucket_name, 
BackendInitializerUtil::HADOOP_S3_ENDPOINT, 
"https://s3.us-west-2.amazonaws.com";);
-        if (!endpoint.starts_with("https://";))
+        bool end_point_start_with_http_or_https = 
endpoint.starts_with("https://";) || endpoint.starts_with("http://";);
+        if (!end_point_start_with_http_or_https)
         {
             if (endpoint.starts_with("s3"))
                 // as 
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/bk_cloud-data-access/content/s3-config-parameters.html
@@ -586,18 +586,17 @@ private:
             auto new_client = DB::S3::ClientFactory::instance().create(
                 client_configuration,
                 client_settings,
-                ak,     // access_key_id
-                sk,     // secret_access_key
-                "",     // server_side_encryption_customer_key_base64
-                {},     // sse_kms_config
-                {},     // headers
+                ak, // access_key_id
+                sk, // secret_access_key
+                "", // server_side_encryption_customer_key_base64
+                {}, // sse_kms_config
+                {}, // headers
                 DB::S3::CredentialsConfiguration{
                     .use_environment_credentials = true,
                     .use_insecure_imds_request = false,
                     .role_arn = getSetting(settings, bucket_name, 
BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE),
                     .session_name = getSetting(settings, bucket_name, 
BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME),
-                    .external_id = getSetting(settings, bucket_name, 
BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)
-                });
+                    .external_id = getSetting(settings, bucket_name, 
BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)});
 
             //TODO: support online change config for cached per_bucket_clients
             std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
@@ -609,14 +608,12 @@ private:
             auto new_client = DB::S3::ClientFactory::instance().create(
                 client_configuration,
                 client_settings,
-                ak,     // access_key_id
-                sk,     // secret_access_key
-                "",     // server_side_encryption_customer_key_base64
-                {},     // sse_kms_config
-                {},     // headers
-                DB::S3::CredentialsConfiguration{
-                    .use_environment_credentials = true,
-                    .use_insecure_imds_request = false});
+                ak, // access_key_id
+                sk, // secret_access_key
+                "", // server_side_encryption_customer_key_base64
+                {}, // sse_kms_config
+                {}, // headers
+                DB::S3::CredentialsConfiguration{.use_environment_credentials 
= true, .use_insecure_imds_request = false});
 
             std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
             cacheClient(bucket_name, is_per_bucket, ret);
@@ -643,7 +640,6 @@ public:
     }
 
 private:
-
     std::shared_ptr<DB::AzureBlobStorage::ContainerClient> shared_client;
 
     std::shared_ptr<DB::AzureBlobStorage::ContainerClient> getClient()
@@ -655,8 +651,7 @@ private:
         const Poco::Util::AbstractConfiguration & config = 
context->getConfigRef();
         bool is_client_for_disk = false;
         auto new_settings = DB::AzureBlobStorage::getRequestSettings(config, 
config_prefix, context);
-        DB::AzureBlobStorage::ConnectionParams params
-        {
+        DB::AzureBlobStorage::ConnectionParams params{
             .endpoint = DB::AzureBlobStorage::processEndpoint(config, 
config_prefix),
             .auth_method = DB::AzureBlobStorage::getAuthMethod(config, 
config_prefix),
             .client_options = 
DB::AzureBlobStorage::getClientOptions(*new_settings, is_client_for_disk),
@@ -840,7 +835,7 @@ ReadBufferBuilder::ReadBufferCreator 
ReadBufferBuilder::wrapWithCache(
     updateCaches(key, last_modified_time, file_size);
 
     return [read_buffer_creator, read_settings, this](
-                   bool restricted_seek, const DB::StoredObject & object) -> 
std::unique_ptr<DB::ReadBufferFromFileBase>
+               bool restricted_seek, const DB::StoredObject & object) -> 
std::unique_ptr<DB::ReadBufferFromFileBase>
     {
         auto cache_key = DB::FileCacheKey::fromPath(object.remote_path);
         auto modified_read_settings = read_settings.withNestedBuffer();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to