This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c275b1ee1f [GLUTEN-6867][CH] Fix Bug that cann't read file on minio
(#9332)
c275b1ee1f is described below
commit c275b1ee1fb1220fa77a8bb120372d398865ef1c
Author: Chang chen <[email protected]>
AuthorDate: Wed Apr 16 21:41:30 2025 +0800
[GLUTEN-6867][CH] Fix Bug that cann't read file on minio (#9332)
[CH] Fix Bug that cann't read file on minio
* Introduce GlutenClickHouseCacheBaseTestSuite for testing Minio
* create bucket before create table
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 2 -
.../GlutenClickHouseExcelFormatSuite.scala | 5 +-
...lutenClickHouseWholeStageTransformerSuite.scala | 74 ++-----------
.../GlutenClickHouseCacheBaseTestSuite.scala} | 88 ++++------------
.../cache/GlutenClickHouseHDFSSuite.scala | 45 ++++++++
.../cache/GlutenClickHouseMINIOSuite.scala | 52 +++++++++
.../GlutenClickHouseMergeTreeCacheDataSuite.scala | 44 ++++----
...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 10 +-
...eMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala | 10 +-
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 82 ++++-----------
.../GlutenClickHouseMergetreeWriteStatsSuite.scala | 8 +-
.../org/apache/gluten/utils/CacheTestHelper.scala | 66 ++++++++++++
.../org/apache/gluten/utils/HDFSTestHelper.scala | 82 +++++++++++++++
.../org/apache/gluten/utils/MinioTestHelper.scala | 117 +++++++++++++++++++++
.../Storages/SubstraitSource/ReadBufferBuilder.cpp | 115 ++++++++++----------
15 files changed, 501 insertions(+), 299 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 7de4e1e832..edf354b0c3 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -201,8 +201,6 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
fileSizes.add(JLong.valueOf(size))
modificationTimes.add(JLong.valueOf(time))
case _ =>
- fileSizes.add(0)
- modificationTimes.add(0)
}
val otherConstantMetadataColumnValues =
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
index eb9e13e451..1c5b359dc4 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
@@ -1471,7 +1471,8 @@ class GlutenClickHouseExcelFormatSuite
* close the hdfs file, and hence the file is not flushed.HDFS file is
closed when LocalExecutor
* is destroyed, but before that, the file moved by spark committer.
*/
- val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/write_into_hdfs/"
+
+ val tablePath = hdfsHelper.getHdfsUrl(s"$SPARK_DIR_NAME/write_into_hdfs/")
val format = "parquet"
val sql =
s"""
@@ -1487,7 +1488,7 @@ class GlutenClickHouseExcelFormatSuite
// TODO: pass spark configuration to FileFormatWriter in Spark 3.3 and 3.2
testWithMinSparkVersion("write succeed even if set wrong snappy compression
codec level", "3.5") {
// TODO: remove duplicated test codes
- val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/failed_test/"
+ val tablePath = hdfsHelper.getHdfsUrl(s"$SPARK_DIR_NAME/failed_test/")
val format = "parquet"
val sql =
s"""
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index 0e557bd26b..cc8b758c5b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.clickhouse.RuntimeConfig
-import org.apache.gluten.utils.UTSystemParameters
+import org.apache.gluten.utils.{HDFSTestHelper, MinioTestHelper,
UTSystemParameters}
import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
@@ -41,27 +41,15 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
}
val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")
- private val TMP_PREFIX = s"/tmp/gluten/$SPARK_DIR_NAME"
+ protected val TMP_PREFIX = s"/tmp/gluten/$SPARK_DIR_NAME"
- val S3_METADATA_PATH = s"$TMP_PREFIX/s3/metadata"
- val S3_CACHE_PATH = s"$TMP_PREFIX/s3/cache"
- val S3_ENDPOINT = "s3://127.0.0.1:9000/"
- val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
val BUCKET_NAME: String = SPARK_DIR_NAME
- val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
-
- val HDFS_METADATA_PATH = s"$TMP_PREFIX/hdfs/metadata"
- val HDFS_CACHE_PATH = s"$TMP_PREFIX/hdfs/cache"
- val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
- val HDFS_URL = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME"
-
- val S3_ACCESS_KEY = "minioadmin"
- val S3_SECRET_KEY = "minioadmin"
+ val minioHelper = new MinioTestHelper(TMP_PREFIX)
+ val hdfsHelper = new HDFSTestHelper(TMP_PREFIX)
+ val HDFS_URL: String = hdfsHelper.getHdfsUrl(SPARK_DIR_NAME)
val CH_DEFAULT_STORAGE_DIR = "/data"
- val LOCAL_CACHE_PATH = s"$TMP_PREFIX/local/cache"
-
protected def spark32: Boolean = sparkVersion.equals("3.2")
protected def spark33: Boolean = sparkVersion.equals("3.3")
protected def spark35: Boolean = sparkVersion.equals("3.5")
@@ -88,53 +76,11 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
.set(RuntimeConfig.PATH.key, UTSystemParameters.diskOutputDataPath)
.set(RuntimeConfig.TMP_PATH.key, s"/tmp/libch/$SPARK_DIR_NAME")
if (UTSystemParameters.testMergeTreeOnObjectStorage) {
- conf
- .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
- .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
- .set("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
- .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
- .set("spark.hadoop.fs.s3a.path.style.access", "true")
- .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
- .setCHConfig(
- "storage_configuration.disks.s3.type" -> "s3_gluten",
- "storage_configuration.disks.s3.endpoint" -> WHOLE_PATH,
- "storage_configuration.disks.s3.access_key_id" -> S3_ACCESS_KEY,
- "storage_configuration.disks.s3.secret_access_key" -> S3_SECRET_KEY,
- "storage_configuration.disks.s3.metadata_path" -> S3_METADATA_PATH,
- "storage_configuration.disks.s3_cache.type" -> "cache",
- "storage_configuration.disks.s3_cache.disk" -> "s3",
- "storage_configuration.disks.s3_cache.path" -> S3_CACHE_PATH,
- "storage_configuration.disks.s3_cache.max_size" -> "10Gi",
- "storage_configuration.policies.__s3_main.volumes" -> "main",
- "storage_configuration.policies.__s3_main.volumes.main.disk" ->
"s3_cache"
- )
- .setCHConfig(
- "storage_configuration.disks.hdfs.type" -> "hdfs_gluten",
- "storage_configuration.disks.hdfs.endpoint" ->
s"$HDFS_URL_ENDPOINT/",
- "storage_configuration.disks.hdfs.metadata_path" ->
HDFS_METADATA_PATH,
- "storage_configuration.disks.hdfs_cache.type" -> "cache",
- "storage_configuration.disks.hdfs_cache.disk" -> "hdfs",
- "storage_configuration.disks.hdfs_cache.path" -> HDFS_CACHE_PATH,
- "storage_configuration.disks.hdfs_cache.max_size" -> "10Gi",
- "storage_configuration.policies.__hdfs_main.volumes" -> "main",
- "storage_configuration.policies.__hdfs_main.volumes.main.disk" ->
"hdfs_cache"
- )
- .setCHConfig(
- "storage_configuration.disks.hdfs2.type" -> "hdfs_gluten",
- "storage_configuration.disks.hdfs2.endpoint" ->
s"$HDFS_URL_ENDPOINT/",
- "storage_configuration.disks.hdfs2.metadata_path" ->
HDFS_METADATA_PATH,
- "storage_configuration.disks.hdfs2.metadata_type" -> "rocksdb",
- "storage_configuration.disks.hdfs_cache2.type" -> "cache",
- "storage_configuration.disks.hdfs_cache2.disk" -> "hdfs2",
- "storage_configuration.disks.hdfs_cache2.path" -> HDFS_CACHE_PATH,
- "storage_configuration.disks.hdfs_cache2.max_size" -> "10Gi",
- "storage_configuration.policies.__hdfs_main_rocksdb.volumes" ->
"main",
-
"storage_configuration.policies.__hdfs_main_rocksdb.volumes.main.disk" ->
"hdfs_cache2"
- )
- .setCHConfig(
- "hdfs.dfs_client_read_shortcircuit" -> "false",
- "hdfs.dfs_default_replica" -> "1"
- )
+ minioHelper.setHadoopFileSystemConfig(conf)
+ minioHelper.setObjectStoreConfig(conf, BUCKET_NAME)
+ hdfsHelper.setHDFSStoreConfig(conf)
+ hdfsHelper.setHDFSStoreConfigRocksDB(conf)
+ hdfsHelper.setHdfsClientConfig(conf)
} else {
conf
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
similarity index 67%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
index 31dbe38f10..bb72e34e3e 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
@@ -14,34 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution.tpch
+package org.apache.gluten.execution.cache
-import org.apache.gluten.backendsapi.clickhouse.CHConfig
import org.apache.gluten.backendsapi.clickhouse.CHConfig._
-import org.apache.gluten.execution.{CHNativeCacheManager,
FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}
+import org.apache.gluten.execution.{FileSourceScanExecTransformer,
GlutenClickHouseTPCHAbstractSuite}
+import org.apache.gluten.utils.CacheTestHelper
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.commons.io.IOUtils
-import org.apache.hadoop.fs.{FileUtil, Path}
+import org.apache.hadoop.fs.Path
import java.nio.charset.Charset
-class GlutenClickHouseHDFSSuite
+abstract class GlutenClickHouseCacheBaseTestSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {
-
- override protected val tablesPath: String = HDFS_URL_ENDPOINT + "/tpch-data"
+ // Common paths
override protected val tpchQueries: String =
rootPath +
"../../../../tools/gluten-it/common/src/main/resources/tpch-queries"
override protected val queriesResults: String = rootPath + "queries-output"
- private val cache_name = "gluten_cache"
+ // Abstract methods to be implemented by subclasses
+ protected def cleanupCache(): Unit = cacheHelper.deleteCache(spark,
tablesPath)
+ protected def copyDataIfNeeded(): Unit
+
+ // Initialize the cache helper - accessible to subclasses
+ protected val cacheHelper = new CacheTestHelper(TMP_PREFIX)
/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
- super.sparkConf
+ val conf = super.sparkConf
.set("spark.shuffle.manager", "sort")
.set("spark.io.compression.codec", "snappy")
.set("spark.sql.shuffle.partitions", "5")
@@ -49,19 +53,10 @@ class GlutenClickHouseHDFSSuite
.set("spark.sql.adaptive.enabled", "true")
.setCHConfig("use_local_format", true)
.set(prefixOf("shuffle.hash.algorithm"), "sparkMurmurHash3_32")
- .set(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key, "true")
- .setCHConfig("gluten_cache.local.name", cache_name)
- .setCHConfig("gluten_cache.local.path", LOCAL_CACHE_PATH)
- .setCHConfig("gluten_cache.local.max_size", "10Gi")
- // If reuse_disk_cache is set to false,the cache will be deleted in
JNI_OnUnload
- // but CacheManager and JobScheduler of backend are static global
variables
- // and is destroyed at the end of the program which causes backend
reporting logical errors.
- // TODO: fix reuse_disk_cache
- .setCHConfig("reuse_disk_cache", "true")
.set("spark.sql.adaptive.enabled", "false")
- // TODO: spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm =>
- // CHConf.prefixOf("shuffle.hash.algorithm")
+ // Apply cache configuration using the helper
+ cacheHelper.setCacheConfig(conf)
}
override protected def createTPCHNotNullTables(): Unit = {
@@ -70,55 +65,20 @@ class GlutenClickHouseHDFSSuite
override def beforeAll(): Unit = {
super.beforeAll()
- val targetFile = new Path(s"$tablesPath/lineitem")
- val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
- val existed = fs.exists(targetFile)
- // If the 'lineitem' directory doesn't exist in HDFS,
- // upload the 'lineitem' data from the local system.
- if (!existed) {
- val localDataDir = new Path(s"$absoluteParquetPath/lineitem")
- val localFs =
localDataDir.getFileSystem(spark.sessionState.newHadoopConf())
- FileUtil.copy(
- localFs,
- localDataDir,
- fs,
- targetFile,
- false,
- true,
- spark.sessionState.newHadoopConf())
- }
+ copyDataIfNeeded()
}
override protected def beforeEach(): Unit = {
super.beforeEach()
- deleteCache()
+ cleanupCache()
}
override protected def afterAll(): Unit = {
- deleteCache()
- super.afterEach()
- }
-
- private def deleteCache(): Unit = {
- val targetFile = new Path(tablesPath)
- val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
- fs.listStatus(targetFile)
- .foreach(
- table => {
- if (table.isDirectory) {
- fs.listStatus(table.getPath)
- .foreach(
- data => {
- if (data.isFile) {
- CHNativeCacheManager
- .removeFiles(data.getPath.toUri.getPath.substring(1),
cache_name)
- }
- })
- }
- })
+ cleanupCache()
+ super.afterAll()
}
- val runWithoutCache: () => Unit = () => {
+ def runWithoutCache(): Unit = {
runTPCHQuery(6) {
df =>
val plans = df.queryExecution.executedPlan.collect {
@@ -129,7 +89,7 @@ class GlutenClickHouseHDFSSuite
}
}
- val runWithCache: () => Unit = () => {
+ def runWithCache(): Unit = {
runTPCHQuery(6) {
df =>
val plans = df.queryExecution.executedPlan.collect {
@@ -141,15 +101,13 @@ class GlutenClickHouseHDFSSuite
}
}
- test("test hdfs cache") {
+ test("test cache") {
runWithoutCache()
runWithCache()
}
test("test cache file command") {
- runSql(
- s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
- noFallBack = false) { _ => }
+ runSql(s"CACHE FILES select * from '$tablesPath/lineitem'", noFallBack =
false) { _ => }
runWithCache()
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
new file mode 100644
index 0000000000..f07fd61678
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.cache
+
+import org.apache.hadoop.fs.FileUtil
+import org.apache.hadoop.fs.Path
+
+class GlutenClickHouseHDFSSuite extends GlutenClickHouseCacheBaseTestSuite {
+
+ override protected val tablesPath: String =
hdfsHelper.getHdfsUrl("tpch-data")
+
+ override protected def copyDataIfNeeded(): Unit = {
+ val targetFile = new Path(s"$tablesPath/lineitem")
+ val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+ val existed = fs.exists(targetFile)
+ // If the 'lineitem' directory doesn't exist in HDFS,
+ // upload the 'lineitem' data from the local system.
+ if (!existed) {
+ val localDataDir = new Path(s"$absoluteParquetPath/lineitem")
+ val localFs =
localDataDir.getFileSystem(spark.sessionState.newHadoopConf())
+ FileUtil.copy(
+ localFs,
+ localDataDir,
+ fs,
+ targetFile,
+ false,
+ true,
+ spark.sessionState.newHadoopConf())
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseMINIOSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseMINIOSuite.scala
new file mode 100644
index 0000000000..cf4ecf6fd0
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseMINIOSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.cache
+
+import org.apache.hadoop.fs.{FileUtil, Path}
+
+class GlutenClickHouseMINIOSuite extends GlutenClickHouseCacheBaseTestSuite {
+
+ private val BUCKET = "tpch-data"
+ override protected val tablesPath: String = s"s3a://$BUCKET"
+
+ override protected def copyDataIfNeeded(): Unit = {
+ val targetFile = new Path(s"$tablesPath/lineitem")
+ val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+ val existed = fs.exists(targetFile)
+ // If the 'lineitem' directory doesn't exist in Minio,
+ // upload the 'lineitem' data from the local system.
+ if (!existed) {
+ val localDataDir = new Path(s"$absoluteParquetPath/lineitem")
+ val localFs =
localDataDir.getFileSystem(spark.sessionState.newHadoopConf())
+ FileUtil.copy(
+ localFs,
+ localDataDir,
+ fs,
+ targetFile,
+ false,
+ true,
+ spark.sessionState.newHadoopConf())
+ }
+ }
+
+ override def beforeAll(): Unit = {
+ if (!minioHelper.bucketExists(BUCKET)) {
+ minioHelper.createBucket(BUCKET)
+ }
+ super.beforeAll()
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
index 7db265de80..16f565637b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
-import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
@@ -69,10 +68,8 @@ class GlutenClickHouseMergeTreeCacheDataSuite
conf.set("fs.defaultFS", HDFS_URL)
val fs = FileSystem.get(conf)
fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
- FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
- FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
- FileUtils.forceMkdir(new File(HDFS_CACHE_PATH))
+ hdfsHelper.resetMeta()
+ hdfsHelper.resetCache()
}
def countFiles(directory: File): Int = {
@@ -123,9 +120,8 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| select * from lineitem a
| where a.l_shipdate between date'1995-01-01' and
date'1995-01-31'
|""".stripMargin)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
- FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
- val dataPath = new File(HDFS_CACHE_PATH)
+ hdfsHelper.resetMeta()
+ val dataPath = new File(hdfsHelper.HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)
val res = spark
@@ -137,7 +133,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
- val metaPath = new
File(s"$HDFS_METADATA_PATH/$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
+ val metaPath = new
File(hdfsHelper.metaPath(s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs"))
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
@@ -227,9 +223,8 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| select * from lineitem a
| where a.l_shipdate between date'1995-01-01' and
date'1995-01-31'
|""".stripMargin)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
- FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
- val dataPath = new File(HDFS_CACHE_PATH)
+ hdfsHelper.resetMeta()
+ val dataPath = new File(hdfsHelper.HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)
val res = spark
@@ -241,7 +236,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
- val metaPath = new
File(s"$HDFS_METADATA_PATH/$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
+ val metaPath = new
File(hdfsHelper.metaPath(s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs"))
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
eventually(timeout(60.seconds), interval(2.seconds)) {
assertResult(22)(metaPath.list().length)
@@ -335,9 +330,8 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| select * from lineitem a
| where a.l_shipdate between date'1995-01-01' and
date'1995-01-31'
|""".stripMargin)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
- FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
- val dataPath = new File(HDFS_CACHE_PATH)
+ hdfsHelper.resetMeta()
+ val dataPath = new File(hdfsHelper.HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)
val res = spark
@@ -349,7 +343,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
- val metaPath = new
File(s"$HDFS_METADATA_PATH/$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
+ val metaPath = new
File(hdfsHelper.metaPath(s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs"))
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
@@ -437,12 +431,11 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| select * from lineitem a
| where a.l_shipdate between date'1995-01-01' and
date'1995-01-31'
|""".stripMargin)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
- FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
- val dataPath = new File(HDFS_CACHE_PATH)
+ hdfsHelper.resetMeta()
+ val dataPath = new File(hdfsHelper.HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)
- val metaPath = new
File(s"$HDFS_METADATA_PATH/$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
+ val metaPath = new
File(hdfsHelper.metaPath(s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs"))
val res1 = spark.sql(s"cache data select * from
lineitem_mergetree_hdfs").collect()
assertResult(true)(res1(0).getBoolean(0))
assertResult(1)(metaPath.list().length)
@@ -528,9 +521,8 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| select * from lineitem a
| where a.l_shipdate between date'1995-01-01' and
date'1995-01-31'
|""".stripMargin)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
- FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
- val dataPath = new File(HDFS_CACHE_PATH)
+ hdfsHelper.resetMeta()
+ val dataPath = new File(hdfsHelper.HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)
val res = spark
@@ -542,7 +534,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
- val metaPath = new
File(s"$HDFS_METADATA_PATH/$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
+ val metaPath = new
File(hdfsHelper.metaPath(s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs"))
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
@@ -597,7 +589,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
test("test disable cache files return") {
withSQLConf(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key -> "false") {
runSql(
- s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
+ s"CACHE FILES select * from
'${hdfsHelper.getHdfsUrl("tpch-data/lineitem")}'",
noFallBack = false) {
df =>
val res = df.collect()
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index faffe19136..7da18478ba 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -29,12 +29,9 @@ import
org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types._
-import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import java.io.File
-
import scala.concurrent.duration.DurationInt
class GlutenClickHouseMergeTreeWriteOnHDFSSuite
@@ -72,13 +69,12 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
conf.set("fs.defaultFS", HDFS_URL)
val fs = FileSystem.get(conf)
fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
- FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ hdfsHelper.resetMeta()
}
override protected def afterEach(): Unit = {
super.afterEach()
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ hdfsHelper.resetMeta()
}
test("test mergetree table write") {
@@ -115,7 +111,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
| insert into table lineitem_mergetree_hdfs
| select * from lineitem
|""".stripMargin)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ hdfsHelper.resetMeta()
runTPCHQueryBySQL(1, q1("lineitem_mergetree_hdfs")) {
df =>
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
index bc93cdb0f4..cc20ddc5f8 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
@@ -28,12 +28,9 @@ import
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
-import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import java.io.File
-
import scala.concurrent.duration.DurationInt
class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
@@ -71,13 +68,12 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
conf.set("fs.defaultFS", HDFS_URL)
val fs = FileSystem.get(conf)
fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
- FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ hdfsHelper.resetMeta()
}
override protected def afterEach(): Unit = {
super.afterEach()
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ hdfsHelper.resetMeta()
}
test("test mergetree table write") {
@@ -114,7 +110,7 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
| insert into table lineitem_mergetree_hdfs
| select * from lineitem
|""".stripMargin)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ hdfsHelper.resetMeta()
runTPCHQueryBySQL(1, q1("lineitem_mergetree_hdfs")) {
df =>
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index b840f03eee..50a32e3e44 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -28,12 +28,7 @@ import
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
-import io.minio.{BucketExistsArgs, ListObjectsArgs, MakeBucketArgs,
MinioClient, RemoveBucketArgs, RemoveObjectsArgs}
-import io.minio.messages.DeleteObject
-import org.apache.commons.io.FileUtils
-
import java.io.File
-import java.util
import scala.concurrent.duration.DurationInt
@@ -47,12 +42,6 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
- private val client = MinioClient
- .builder()
- .endpoint(MINIO_ENDPOINT)
- .credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
- .build()
-
override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
}
@@ -71,27 +60,16 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
override protected def beforeEach(): Unit = {
super.beforeEach()
- if
(client.bucketExists(BucketExistsArgs.builder().bucket(BUCKET_NAME).build())) {
- val results =
-
client.listObjects(ListObjectsArgs.builder().bucket(BUCKET_NAME).recursive(true).build())
- val objects = new util.LinkedList[DeleteObject]()
- results.forEach(
- obj => {
- objects.add(new DeleteObject(obj.get().objectName()))
- })
- val removeResults = client.removeObjects(
-
RemoveObjectsArgs.builder().bucket(BUCKET_NAME).objects(objects).build())
- removeResults.forEach(result => result.get().message())
-
client.removeBucket(RemoveBucketArgs.builder().bucket(BUCKET_NAME).build())
+ if (minioHelper.bucketExists(BUCKET_NAME)) {
+ minioHelper.clearBucket(BUCKET_NAME)
}
- client.makeBucket(MakeBucketArgs.builder().bucket(BUCKET_NAME).build())
- FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
- FileUtils.forceMkdir(new File(S3_METADATA_PATH))
+ minioHelper.createBucket(BUCKET_NAME)
+ minioHelper.resetMeta()
}
override protected def afterEach(): Unit = {
super.afterEach()
- FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+ minioHelper.resetMeta()
}
test("test mergetree table write") {
@@ -128,7 +106,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
| insert into table lineitem_mergetree_s3
| select * from lineitem
|""".stripMargin)
- FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+ minioHelper.resetMeta()
runTPCHQueryBySQL(1, q1("lineitem_mergetree_s3")) {
df =>
@@ -160,41 +138,25 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
}
private def verifyS3CompactFileExist(table: String): Unit = {
- val args = ListObjectsArgs
- .builder()
- .bucket(BUCKET_NAME)
- .recursive(true)
- .prefix(table)
- .build()
- var objectCount: Int = 0
- var metadataGlutenExist: Boolean = false
- var metadataBinExist: Boolean = false
- var dataBinExist: Boolean = false
+ val objectNames = minioHelper.listObjects(BUCKET_NAME, table)
+ var metadataGlutenExist = false
+ var metadataBinExist = false
+ var dataBinExist = false
var hasCommits = false
- client
- .listObjects(args)
- .forEach(
- obj => {
- objectCount += 1
- val objectName = obj.get().objectName()
- if (objectName.contains("metadata.gluten")) {
- metadataGlutenExist = true
- } else if (objectName.contains("part_meta.gluten")) {
- metadataBinExist = true
- } else if (objectName.contains("part_data.gluten")) {
- dataBinExist = true
- } else if (objectName.contains("_commits")) {
- // Spark 35 has _commits directory
- // table/_delta_log/_commits/
- hasCommits = true
- }
- })
+
+ objectNames.foreach {
+ objectName =>
+ if (objectName.contains("metadata.gluten")) metadataGlutenExist = true
+ else if (objectName.contains("part_meta.gluten")) metadataBinExist =
true
+ else if (objectName.contains("part_data.gluten")) dataBinExist = true
+ else if (objectName.contains("_commits")) hasCommits = true
+ }
if (isSparkVersionGE("3.5")) {
- assertResult(6)(objectCount)
+ assertResult(6)(objectNames.size)
assert(hasCommits)
} else {
- assertResult(5)(objectCount)
+ assertResult(5)(objectNames.size)
}
assert(metadataGlutenExist)
@@ -615,7 +577,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
| select * from lineitem
|""".stripMargin)
- FileUtils.forceDelete(new File(S3_METADATA_PATH))
+ minioHelper.resetMeta()
withSQLConf(CHConfig.runtimeSettings("enabled_driver_filter_mergetree_index")
-> "true") {
runTPCHQueryBySQL(6, q6(tableName)) {
@@ -675,7 +637,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
| select /*+ REPARTITION(3) */ * from lineitem
|""".stripMargin)
- FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+ minioHelper.resetMeta()
spark.sql("optimize lineitem_mergetree_bucket_s3")
spark.sql("drop table lineitem_mergetree_bucket_s3")
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
index cd8e07dd01..5a918ae694 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
@@ -30,12 +30,9 @@ import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.input_file_name
-import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
-import java.io.File
-
class GlutenClickHouseMergetreeWriteStatsSuite
extends GlutenClickHouseTPCDSAbstractSuite
with AdaptiveSparkPlanHelper {
@@ -65,13 +62,12 @@ class GlutenClickHouseMergetreeWriteStatsSuite
conf.set("fs.defaultFS", HDFS_URL)
val fs = FileSystem.get(conf)
fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
- FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ hdfsHelper.resetMeta()
}
override protected def afterEach(): Unit = {
super.afterEach()
- FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ hdfsHelper.resetMeta()
}
def tpcdsMergetreeTables: Map[String, String] = {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
new file mode 100644
index 0000000000..6cdc16e2a1
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.backendsapi.clickhouse.CHConfig
+import org.apache.gluten.backendsapi.clickhouse.CHConfig._
+import org.apache.gluten.execution.CHNativeCacheManager
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+
+import org.apache.hadoop.fs.Path
+
+class CacheTestHelper(val TMP_PREFIX: String) {
+
+ val LOCAL_CACHE_PATH = s"$TMP_PREFIX/local/cache"
+ val CACHE_NAME = "gluten_cache"
+
+ /** Configure SparkConf with cache-related settings */
+ def setCacheConfig(conf: SparkConf): SparkConf = {
+ conf
+ .set(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key, "true")
+ .setCHConfig("gluten_cache.local.name", CACHE_NAME)
+ .setCHConfig("gluten_cache.local.path", LOCAL_CACHE_PATH)
+ .setCHConfig("gluten_cache.local.max_size", "10Gi")
+ // If reuse_disk_cache is set to false,the cache will be deleted in
JNI_OnUnload
+ // but CacheManager and JobScheduler of backend are static global
variables
+ // and is destroyed at the end of the program which causes backend
reporting logical errors.
+ // TODO: fix reuse_disk_cache
+ .setCHConfig("reuse_disk_cache", "true")
+ }
+
+ /** Delete cache files for all tables in the data path */
+ def deleteCache(spark: SparkSession, dataPath: String): Unit = {
+ val targetFile = new Path(dataPath)
+ val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+ fs.listStatus(targetFile)
+ .foreach(
+ table => {
+ if (table.isDirectory) {
+ fs.listStatus(table.getPath)
+ .foreach(
+ data => {
+ if (data.isFile) {
+ CHNativeCacheManager
+ .removeFiles(data.getPath.toUri.getPath.substring(1),
CACHE_NAME)
+ }
+ })
+ }
+ })
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
new file mode 100644
index 0000000000..6aa872146e
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+
+import org.apache.spark.SparkConf
+
+import org.apache.commons.io.FileUtils
+
+import java.io.File
+
+class HDFSTestHelper(TMP_PREFIX: String) {
+
+ // HDFS parameters
+ val HDFS_CACHE_PATH = s"$TMP_PREFIX/hdfs/cache"
+ private val HDFS_METADATA_PATH = s"$TMP_PREFIX/hdfs/metadata"
+ private val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
+
+ def getHdfsUrl(dirName: String): String = s"$HDFS_URL_ENDPOINT/$dirName"
+ def metaPath(dirName: String): String = s"$HDFS_METADATA_PATH/$dirName"
+
+ def setHDFSStoreConfig(conf: SparkConf): SparkConf = {
+ conf.setCHConfig(
+ "storage_configuration.disks.hdfs.type" -> "hdfs_gluten",
+ "storage_configuration.disks.hdfs.endpoint" -> s"$HDFS_URL_ENDPOINT/",
+ "storage_configuration.disks.hdfs.metadata_path" -> HDFS_METADATA_PATH,
+ "storage_configuration.disks.hdfs_cache.type" -> "cache",
+ "storage_configuration.disks.hdfs_cache.disk" -> "hdfs",
+ "storage_configuration.disks.hdfs_cache.path" -> HDFS_CACHE_PATH,
+ "storage_configuration.disks.hdfs_cache.max_size" -> "10Gi",
+ "storage_configuration.policies.__hdfs_main.volumes" -> "main",
+ "storage_configuration.policies.__hdfs_main.volumes.main.disk" ->
"hdfs_cache"
+ )
+ }
+
+ def setHDFSStoreConfigRocksDB(conf: SparkConf): SparkConf = {
+ conf.setCHConfig(
+ "storage_configuration.disks.hdfs2.type" -> "hdfs_gluten",
+ "storage_configuration.disks.hdfs2.endpoint" -> s"$HDFS_URL_ENDPOINT/",
+ "storage_configuration.disks.hdfs2.metadata_path" -> HDFS_METADATA_PATH,
+ "storage_configuration.disks.hdfs2.metadata_type" -> "rocksdb",
+ "storage_configuration.disks.hdfs_cache2.type" -> "cache",
+ "storage_configuration.disks.hdfs_cache2.disk" -> "hdfs2",
+ "storage_configuration.disks.hdfs_cache2.path" -> HDFS_CACHE_PATH,
+ "storage_configuration.disks.hdfs_cache2.max_size" -> "10Gi",
+ "storage_configuration.policies.__hdfs_main_rocksdb.volumes" -> "main",
+ "storage_configuration.policies.__hdfs_main_rocksdb.volumes.main.disk"
-> "hdfs_cache2"
+ )
+ }
+
+ def setHdfsClientConfig(conf: SparkConf): SparkConf = {
+ conf.setCHConfig(
+ "hdfs.dfs_client_read_shortcircuit" -> "false",
+ "hdfs.dfs_default_replica" -> "1"
+ )
+ }
+
+ def resetMeta(): Unit = {
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ }
+
+ def resetCache(): Unit = {
+ FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
+ FileUtils.forceMkdir(new File(HDFS_CACHE_PATH))
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
new file mode 100644
index 0000000000..b106734e33
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+
+import org.apache.spark.SparkConf
+
+import io.minio.{BucketExistsArgs, ListObjectsArgs, MakeBucketArgs,
MinioClient, RemoveBucketArgs, RemoveObjectsArgs}
+import io.minio.messages.DeleteObject
+import org.apache.commons.io.FileUtils
+
+import java.io.File
+import java.util
+
+import scala.jdk.CollectionConverters._
+
+class MinioTestHelper(TMP_PREFIX: String) {
+
+ // MINIO parameters
+ val MINIO_ENDPOINT: String = "http://127.0.0.1:9000/"
+ val S3_ACCESS_KEY = "minioadmin"
+ val S3_SECRET_KEY = "minioadmin"
+
+ // Object Store parameters
+ val S3_METADATA_PATH = s"$TMP_PREFIX/s3/metadata"
+ val S3_CACHE_PATH = s"$TMP_PREFIX/s3/cache"
+ val S3A_ENDPOINT = "s3a://"
+
+ private lazy val client = MinioClient
+ .builder()
+ .endpoint(MINIO_ENDPOINT)
+ .credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
+ .build()
+
+ def bucketExists(bucketName: String): Boolean = {
+ client.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build())
+ }
+
+ def clearBucket(bucketName: String): Unit = {
+ val results = client.listObjects(
+ ListObjectsArgs
+ .builder()
+ .bucket(bucketName)
+ .recursive(true)
+ .build())
+ val objects = new util.LinkedList[DeleteObject]()
+ results.forEach(obj => objects.add(new
DeleteObject(obj.get().objectName())))
+ val removeResults = client
+ .removeObjects(
+ RemoveObjectsArgs
+ .builder()
+ .bucket(bucketName)
+ .objects(objects)
+ .build())
+ removeResults.forEach(result => result.get().message())
+ client.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build())
+ }
+
+ def createBucket(bucketName: String): Unit = {
+ client.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build())
+ }
+
+ def listObjects(bucketName: String, prefix: String): Iterable[String] = {
+ val args =
ListObjectsArgs.builder().bucket(bucketName).recursive(true).prefix(prefix).build()
+ val objectNames = new util.ArrayList[String]()
+ client.listObjects(args).forEach(obj =>
objectNames.add(obj.get().objectName()))
+ objectNames.asScala
+ }
+
+ def setHadoopFileSystemConfig(conf: SparkConf): SparkConf = {
+ conf
+ .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
+ .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
+ .set("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
+ .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
+ .set("spark.hadoop.fs.s3a.path.style.access", "true")
+ .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
+ }
+
+ def setObjectStoreConfig(conf: SparkConf, BUCKET_NAME: String): SparkConf = {
+ val wholePath: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
+ conf
+ .setCHConfig(
+ "storage_configuration.disks.s3.type" -> "s3_gluten",
+ "storage_configuration.disks.s3.endpoint" -> wholePath,
+ "storage_configuration.disks.s3.access_key_id" -> S3_ACCESS_KEY,
+ "storage_configuration.disks.s3.secret_access_key" -> S3_SECRET_KEY,
+ "storage_configuration.disks.s3.metadata_path" -> S3_METADATA_PATH,
+ "storage_configuration.disks.s3_cache.type" -> "cache",
+ "storage_configuration.disks.s3_cache.disk" -> "s3",
+ "storage_configuration.disks.s3_cache.path" -> S3_CACHE_PATH,
+ "storage_configuration.disks.s3_cache.max_size" -> "10Gi",
+ "storage_configuration.policies.__s3_main.volumes" -> "main",
+ "storage_configuration.policies.__s3_main.volumes.main.disk" ->
"s3_cache"
+ )
+ }
+
+ def resetMeta(): Unit = {
+ FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+ FileUtils.forceMkdir(new File(S3_METADATA_PATH))
+ }
+}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 4ac155a9bf..dadb852442 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -82,11 +82,11 @@ extern const SettingsUInt64 max_read_buffer_size;
}
namespace ErrorCodes
{
- extern const int BAD_ARGUMENTS;
- extern const int CANNOT_OPEN_FILE;
- extern const int UNKNOWN_FILE_SIZE;
- extern const int CANNOT_SEEK_THROUGH_FILE;
- extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
+extern const int BAD_ARGUMENTS;
+extern const int CANNOT_OPEN_FILE;
+extern const int UNKNOWN_FILE_SIZE;
+extern const int CANNOT_SEEK_THROUGH_FILE;
+extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
}
namespace FileCacheSetting
@@ -219,7 +219,7 @@ adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer>
read_buffer, const s
if (dynamic_cast<DB::ReadBufferFromHDFS *>(read_buffer.get()) ||
dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(read_buffer.get())
|| dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
read_buffer =
std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
-#else
+#else
if (dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
read_buffer =
std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
#endif
@@ -236,8 +236,7 @@ public:
bool isRemote() const override { return false; }
- std::unique_ptr<DB::ReadBuffer>
- build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
override
+ std::unique_ptr<DB::ReadBuffer> build(const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
{
Poco::URI file_uri(file_info.uri_file());
std::unique_ptr<DB::ReadBufferFromFileBase> read_buffer;
@@ -258,13 +257,14 @@ public:
#if USE_HDFS
class HDFSFileReadBufferBuilder : public ReadBufferBuilder
{
- using ReadBufferCreator =
std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek,
const DB::StoredObject & object)>;
+ using ReadBufferCreator
+ = std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool
restricted_seek, const DB::StoredObject & object)>;
+
public:
explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) :
ReadBufferBuilder(context_), context(context_) { }
~HDFSFileReadBufferBuilder() override = default;
- std::unique_ptr<DB::ReadBuffer>
- build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
override
+ std::unique_ptr<DB::ReadBuffer> build(const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
{
DB::ReadSettings read_settings = getReadSettings();
const auto & config = context->getConfigRef();
@@ -304,13 +304,13 @@ public:
bool use_async_prefetch
= read_settings.remote_fs_prefetch && thread_pool_read &&
(file_info.has_text() || file_info.has_json());
auto raw_read_buffer = std::make_unique<ReadBufferFromHDFS>(
- hdfs_uri,
- hdfs_file_path,
- config,
- read_settings,
- /* read_until_position */ 0,
- /* use_external_buffer */ false,
- file_size);
+ hdfs_uri,
+ hdfs_file_path,
+ config,
+ read_settings,
+ /* read_until_position */ 0,
+ /* use_external_buffer */ false,
+ file_size);
if (use_async_prefetch)
read_buffer = std::make_unique<AsynchronousReadBufferFromHDFS>(
@@ -338,15 +338,15 @@ public:
ReadBufferCreator read_buffer_creator
= [this, hdfs_uri = hdfs_uri, hdfs_file_path = hdfs_file_path,
read_settings, &config](
- bool /* restricted_seek */, const DB::StoredObject &
object) -> std::unique_ptr<DB::ReadBufferFromHDFS> {
+ bool /* restricted_seek */, const DB::StoredObject &
object) -> std::unique_ptr<DB::ReadBufferFromHDFS>
+ {
return std::make_unique<DB::ReadBufferFromHDFS>(
hdfs_uri, hdfs_file_path, config, read_settings, 0, true,
object.bytes_size);
};
auto remote_path = uri.getPath().substr(1);
DB::StoredObjects stored_objects{DB::StoredObject{remote_path, "",
*file_size}};
- auto cache_creator = wrapWithCache(
- read_buffer_creator, read_settings, remote_path,
*modified_time, *file_size);
+ auto cache_creator = wrapWithCache(read_buffer_creator,
read_settings, remote_path, *modified_time, *file_size);
size_t buffer_size =
std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
if (*file_size > 0)
buffer_size = std::min(*file_size, buffer_size);
@@ -391,13 +391,12 @@ public:
~S3FileReadBufferBuilder() override = default;
- std::unique_ptr<DB::ReadBuffer>
- build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
override
+ std::unique_ptr<DB::ReadBuffer> build(const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
{
DB::ReadSettings read_settings = getReadSettings();
Poco::URI file_uri(file_info.uri_file());
// file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet
- const std::string& bucket = file_uri.getHost();
+ const std::string & bucket = file_uri.getHost();
const auto client = getClient(bucket);
std::string pathKey = file_uri.getPath().substr(1);
@@ -410,25 +409,25 @@ public:
}
else
{
- DB::S3::ObjectInfo object_info = DB::S3::getObjectInfo(*client,
bucket, pathKey, "");
+ DB::S3::ObjectInfo object_info = DB::S3::getObjectInfo(*client,
bucket, pathKey, "");
object_size = object_info.size;
object_modified_time = object_info.last_modification_time;
}
- auto read_buffer_creator
- = [bucket, client, read_settings, this](bool restricted_seek,
const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
+ auto read_buffer_creator = [bucket, client, read_settings, this](
+ bool restricted_seek, const
DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
{
- return std::make_unique<DB::ReadBufferFromS3>(
- client,
- bucket,
- object.remote_path,
- "",
- DB::S3::S3RequestSettings(),
- read_settings,
- /* use_external_buffer */ true,
- /* offset */ 0,
- /* read_until_position */ 0,
- restricted_seek);
+ return std::make_unique<DB::ReadBufferFromS3>(
+ client,
+ bucket,
+ object.remote_path,
+ "",
+ DB::S3::S3RequestSettings(),
+ read_settings,
+ /* use_external_buffer */ true,
+ /* offset */ 0,
+ /* read_until_position */ 0,
+ restricted_seek);
};
auto cache_creator = wrapWithCache(read_buffer_creator, read_settings,
pathKey, object_modified_time, object_size);
@@ -441,8 +440,8 @@ public:
size_t buffer_size =
std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
if (object_size > 0)
buffer_size = std::min(object_size, buffer_size);
- auto async_reader
- =
std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl),
pool_reader, read_settings,
buffer_size,read_settings.remote_read_min_bytes_for_seek);
+ auto async_reader =
std::make_unique<DB::AsynchronousBoundedReadBuffer>(
+ std::move(s3_impl), pool_reader, read_settings, buffer_size,
read_settings.remote_read_min_bytes_for_seek);
if (read_settings.remote_fs_prefetch)
async_reader->prefetch(Priority{});
@@ -526,7 +525,8 @@ private:
String config_prefix = "s3";
auto endpoint = getSetting(settings, bucket_name,
BackendInitializerUtil::HADOOP_S3_ENDPOINT,
"https://s3.us-west-2.amazonaws.com");
- if (!endpoint.starts_with("https://"))
+ bool end_point_start_with_http_or_https =
endpoint.starts_with("https://") || endpoint.starts_with("http://");
+ if (!end_point_start_with_http_or_https)
{
if (endpoint.starts_with("s3"))
// as
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/bk_cloud-data-access/content/s3-config-parameters.html
@@ -586,18 +586,17 @@ private:
auto new_client = DB::S3::ClientFactory::instance().create(
client_configuration,
client_settings,
- ak, // access_key_id
- sk, // secret_access_key
- "", // server_side_encryption_customer_key_base64
- {}, // sse_kms_config
- {}, // headers
+ ak, // access_key_id
+ sk, // secret_access_key
+ "", // server_side_encryption_customer_key_base64
+ {}, // sse_kms_config
+ {}, // headers
DB::S3::CredentialsConfiguration{
.use_environment_credentials = true,
.use_insecure_imds_request = false,
.role_arn = getSetting(settings, bucket_name,
BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE),
.session_name = getSetting(settings, bucket_name,
BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME),
- .external_id = getSetting(settings, bucket_name,
BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)
- });
+ .external_id = getSetting(settings, bucket_name,
BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)});
//TODO: support online change config for cached per_bucket_clients
std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
@@ -609,14 +608,12 @@ private:
auto new_client = DB::S3::ClientFactory::instance().create(
client_configuration,
client_settings,
- ak, // access_key_id
- sk, // secret_access_key
- "", // server_side_encryption_customer_key_base64
- {}, // sse_kms_config
- {}, // headers
- DB::S3::CredentialsConfiguration{
- .use_environment_credentials = true,
- .use_insecure_imds_request = false});
+ ak, // access_key_id
+ sk, // secret_access_key
+ "", // server_side_encryption_customer_key_base64
+ {}, // sse_kms_config
+ {}, // headers
+ DB::S3::CredentialsConfiguration{.use_environment_credentials
= true, .use_insecure_imds_request = false});
std::shared_ptr<DB::S3::Client> ret = std::move(new_client);
cacheClient(bucket_name, is_per_bucket, ret);
@@ -643,7 +640,6 @@ public:
}
private:
-
std::shared_ptr<DB::AzureBlobStorage::ContainerClient> shared_client;
std::shared_ptr<DB::AzureBlobStorage::ContainerClient> getClient()
@@ -655,8 +651,7 @@ private:
const Poco::Util::AbstractConfiguration & config =
context->getConfigRef();
bool is_client_for_disk = false;
auto new_settings = DB::AzureBlobStorage::getRequestSettings(config,
config_prefix, context);
- DB::AzureBlobStorage::ConnectionParams params
- {
+ DB::AzureBlobStorage::ConnectionParams params{
.endpoint = DB::AzureBlobStorage::processEndpoint(config,
config_prefix),
.auth_method = DB::AzureBlobStorage::getAuthMethod(config,
config_prefix),
.client_options =
DB::AzureBlobStorage::getClientOptions(*new_settings, is_client_for_disk),
@@ -840,7 +835,7 @@ ReadBufferBuilder::ReadBufferCreator
ReadBufferBuilder::wrapWithCache(
updateCaches(key, last_modified_time, file_size);
return [read_buffer_creator, read_settings, this](
- bool restricted_seek, const DB::StoredObject & object) ->
std::unique_ptr<DB::ReadBufferFromFileBase>
+ bool restricted_seek, const DB::StoredObject & object) ->
std::unique_ptr<DB::ReadBufferFromFileBase>
{
auto cache_key = DB::FileCacheKey::fromPath(object.remote_path);
auto modified_read_settings = read_settings.withNestedBuffer();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]