This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 33f123e60a [GLUTEN-9517][CH] Allow merge tree format don't configure
disk cache (#9520)
33f123e60a is described below
commit 33f123e60ac2f15b4c8b9f5f89fc22209c03f60f
Author: Chang chen <[email protected]>
AuthorDate: Tue May 6 20:41:58 2025 +0800
[GLUTEN-9517][CH] Allow merge tree format don't configure disk cache (#9520)
* [Refactor] Add GlutenObjectStorageConfig to improve consistency and
maintainability
This commit introduces a Scala object in the ClickHouse backend module and
refactors the C++ implementation to use defined constants instead of hardcoded
string literals for object storage disk types. The changes include:
1. Create a centralized Scala object with S3 and HDFS disk type constants
`GlutenObjectStorageConfig`
2. Align C++ with the Scala implementation `GlutenObjectStorageConfig`
3. Replace all hardcoded string literals with the respective constants from
`GlutenObjectStorageConfig`
4. Improve code formatting and organization in affected files
These changes make the code more maintainable by centralizing important
constants, reducing duplication, and ensuring consistency between Scala and C++
implementations.
* [Refactor] Add MergeTree configuration options for Delta optimized writer
Add two important configuration options for the ClickHouse backend that
optimize writing to Delta tables:
- `RuntimeSettings.MERGE_AFTER_INSERT`: Controls whether to merge data
after insert in each task. Should be set to false when using
DeltaOptimizedWriterTransformer.
- `RuntimeSettings.INSERT_WITHOUT_LOCAL_STORAGE`: Controls whether to
bypass local temporary storage when inserting into remote storage. Should be
set to true when using DeltaOptimizedWriterTransformer.
Also removed unnecessary `RuntimeConfig.LOGGER_LEVEL` settings from various
test files.
Added TODO comment in MetricsUtil to improve error handling when
`RuntimeSettings.COLLECT_METRICS` is set to false.
* [Refactor] Introduces `StoreConfigBuilder`
Introduces a class to streamline and standardize the configuration of
object storage disks for HDFS and S3 in test environments.
- Adds a new class implementing a fluent builder pattern
`StoreConfigBuilder`
- Refactors HDFS and MinIO test helpers to use this common configuration
approach
- Simplifies storage configuration code and reduces duplication
- Makes storage setup more flexible with better parameter control
The builder pattern provides a cleaner interface for configuring various
storage options like disk caching, RocksDB metadata, and access credentials.
* [Refactor] Improve HDFS URL handling in test infrastructure
The commit improves the HDFS URL management in test classes by:
- Removing the global `HDFS_URL` constant and centralizing URL generation
in the `HDFSTestHelper` class
- Adding new helper methods: `independentHdfsURL()`, `hdfsURL()`,
`deleteDir()`, and `countDir()`
- Making directory paths consistent across tests by using a shared base
directory parameter
- Simplifying HDFS file operations with helper methods that handle path
formatting
- Removing redundant code for file system configuration and file operations
- Adding constants for commonly used storage policies
* [Refactor] Clean up object storage APIs and add TPCH test suites
- Refactor storage API by unifying helper methods for MinIO and HDFS
- Replace `setHadoopFileSystemConfig` with `setFileSystem` in
MinioTestHelper
- Replace `setObjectStoreConfig` with `setStoreConfig` in MinioTestHelper
- Consolidate multiple HDFS configuration methods into a single
`setStoreConfig`
- Simplify filesystem configuration with `setFileSystem` in HDFSTestHelper
- Add new test suites for TPCH benchmarks with object storage
- Create `MinioTCPHTestSuite` for testing with MinIO
- Create `HDFSTCPHTestSuite` for testing with HDFS
- Add common `WriteMergeTreeConf` trait for MergeTree configuration
- Simplify code in `GlutenClickHouseWholeStageTransformerSuite` by using the
refactored methods
* Include backend commit
https://github.com/ClickHouse/ClickHouse/commit/83de175e411120003fed985c6e8a07e3053ea046
Fix read position setting for HDFS buffer and simplify remote FS gather
read logic. This commit implements two main improvements:
1. Added implementations for `setReadUntilPosition` and `setReadUntilEnd`
methods in the `ReadBufferFromHDFS` class, allowing HDFS read buffers to
properly limit reading ranges, consistent with other storage interfaces
2. Simplified the read position setting logic in
`ReadBufferFromRemoteFSGather`, removing redundant condition checks and making
the code more clear:
- Setting read position is needed regardless of whether the object has
an offset
- Different strategies are used based on whether the read position is
within the range
These improvements help ensure proper control of data reading ranges when
working with HDFS storage, particularly when MergeTree format doesn't configure
disk cache.
---------
Co-authored-by: Chang chen <[email protected]>
---
.../gluten/backendsapi/clickhouse/CHConfig.scala | 5 +
.../backendsapi/clickhouse/RuntimeSettings.scala | 16 +++
.../org/apache/gluten/metrics/MetricsUtil.scala | 2 +
.../GlutenClickHouseDeltaParquetWriteSuite.scala | 5 +-
.../GlutenClickHouseExcelFormatSuite.scala | 6 +-
...lutenClickHouseWholeStageTransformerSuite.scala | 12 +-
.../cache/GlutenClickHouseHDFSSuite.scala | 2 +-
.../GlutenClickHouseNativeWriteTableSuite.scala | 2 -
.../hive/GlutenClickHouseTableAfterRestart.scala | 5 +-
.../GlutenClickHouseMergeTreeCacheDataSuite.scala | 31 ++---
.../GlutenClickHouseMergeTreeOptimizeSuite.scala | 7 +-
...tenClickHouseMergeTreePathBasedWriteSuite.scala | 2 +-
.../GlutenClickHouseMergeTreeTPCHSuite.scala | 117 +++++++++++++++++++
...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 43 +++----
...eMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala | 41 +++----
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 7 +-
.../GlutenClickHouseMergeTreeWriteStatsSuite.scala | 29 ++---
.../GlutenClickHouseMergeTreeWriteSuite.scala | 2 +-
.../metrics/GlutenClickHouseTPCHMetricsSuite.scala | 2 -
.../org/apache/gluten/utils/HDFSTestHelper.scala | 87 +++++++++-----
.../org/apache/gluten/utils/MinioTestHelper.scala | 48 ++++----
.../apache/gluten/utils/StoreConfigBuilder.scala | 126 +++++++++++++++++++++
cpp-ch/clickhouse.version | 2 +-
cpp-ch/local-engine/Common/CHUtil.cpp | 14 ++-
cpp-ch/local-engine/Common/GlutenConfig.h | 7 +-
.../registerGlutenDiskObjectStorage.cpp | 66 ++++-------
cpp-ch/local-engine/Disks/registerGlutenDisks.cpp | 13 ++-
27 files changed, 469 insertions(+), 230 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
index 4720db3ad7..59f2ae3d8a 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
@@ -131,3 +131,8 @@ class CHConfig(conf: SQLConf) extends GlutenConfig(conf) {
def enableGlutenLocalFileCache: Boolean =
getConf(ENABLE_GLUTEN_LOCAL_FILE_CACHE)
}
+
+object GlutenObjectStorageConfig {
+ val S3_DISK_TYPE: String = "s3_gluten"
+ val HDFS_DISK_TYPE: String = "hdfs_gluten"
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
index bcd245eafe..1c0e83b1d3 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
@@ -102,4 +102,20 @@ object RuntimeSettings {
.doc("Enable memory spill scheduler")
.booleanConf
.createWithDefault(true)
+
+ val MERGE_AFTER_INSERT =
+ buildConf(runtimeSettings("mergetree.merge_after_insert"))
+ .doc(s"""Merge after insert in each task.
+ |Set to false If DeltaOptimizedWriterTransformer is used
+ |""".stripMargin)
+ .booleanConf
+ .createWithDefault(true)
+
+ val INSERT_WITHOUT_LOCAL_STORAGE =
+ buildConf(runtimeSettings("mergetree.insert_without_local_storage"))
+ .doc(s"""When insert into remote storage, don't write to local temporary
storage first.
+ |Set to true If DeltaOptimizedWriterTransformer is used
+ |""".stripMargin)
+ .booleanConf
+ .createWithDefault(false)
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index 3630205fd0..4219bdc509 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -106,6 +106,8 @@ object MetricsUtil extends Logging {
val numNativeMetrics = metrics.metricsDataList.size()
val relSize = relMap.values().asScala.flatMap(l => l.asScala).size
if (numNativeMetrics == 0 || numNativeMetrics != relSize) {
+ // TODO: if `RuntimeSettings.COLLECT_METRICS` set to false, we
should not log the warning
+ // otherwise, we should raise an exception
logWarning(
s"Updating native metrics failed due to the wrong size of metrics
data: " +
s"$numNativeMetrics")
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
index a26f3607e4..33a657820e 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.backendsapi.clickhouse.RuntimeSettings
import org.apache.gluten.config.GlutenConfig
import org.apache.spark.SparkConf
@@ -32,8 +33,6 @@ import java.io.File
class GlutenClickHouseDeltaParquetWriteSuite extends ParquetTPCHSuite {
- import org.apache.gluten.backendsapi.clickhouse.CHConfig._
-
/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
super.sparkConf
@@ -45,7 +44,7 @@ class GlutenClickHouseDeltaParquetWriteSuite extends
ParquetTPCHSuite {
.set("spark.sql.files.maxPartitionBytes", "20000000")
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, spark35.toString)
.set("spark.sql.storeAssignmentPolicy", "legacy")
- .setCHSettings("mergetree.merge_after_insert", false)
+ .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
index dcc5f39c54..761d48b0ce 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
@@ -1034,7 +1034,7 @@ class GlutenClickHouseExcelFormatSuite extends
GlutenClickHouseWholeStageTransfo
| from $orcFileFormat.`$filePath`
| where long_field > 30
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, compareResult = true, df => {},
noFallBack = true)
+ compareResultsAgainstVanillaSpark(sql, compareResult = true, df => {})
}
// TODO: Fix: if the field names has upper case form, it will return null
value
@@ -1458,7 +1458,7 @@ class GlutenClickHouseExcelFormatSuite extends
GlutenClickHouseWholeStageTransfo
* is destroyed, but before that, the file moved by spark committer.
*/
- val tablePath = hdfsHelper.getHdfsUrl(s"$SPARK_DIR_NAME/write_into_hdfs/")
+ val tablePath = hdfsHelper.independentHdfsURL("write_into_hdfs")
val format = "parquet"
val sql =
s"""
@@ -1474,7 +1474,7 @@ class GlutenClickHouseExcelFormatSuite extends
GlutenClickHouseWholeStageTransfo
// TODO: pass spark configuration to FileFormatWriter in Spark 3.3 and 3.2
testWithMinSparkVersion("write succeed even if set wrong snappy compression
codec level", "3.5") {
// TODO: remove duplicated test codes
- val tablePath = hdfsHelper.getHdfsUrl(s"$SPARK_DIR_NAME/failed_test/")
+ val tablePath = hdfsHelper.independentHdfsURL("failed_test")
val format = "parquet"
val sql =
s"""
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index 5dd8b0c2c4..af33880e27 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -60,8 +60,7 @@ class GlutenClickHouseWholeStageTransformerSuite
val BUCKET_NAME: String = SPARK_DIR_NAME
val minioHelper = new MinioTestHelper(TMP_PREFIX)
- val hdfsHelper = new HDFSTestHelper(TMP_PREFIX)
- val HDFS_URL: String = hdfsHelper.getHdfsUrl(SPARK_DIR_NAME)
+ val hdfsHelper = new HDFSTestHelper(TMP_PREFIX, SPARK_DIR_NAME)
val CH_DEFAULT_STORAGE_DIR = "/data"
@@ -91,11 +90,10 @@ class GlutenClickHouseWholeStageTransformerSuite
.set(RuntimeConfig.PATH.key, UTSystemParameters.diskOutputDataPath)
.set(RuntimeConfig.TMP_PATH.key, s"/tmp/libch/$SPARK_DIR_NAME")
if (UTSystemParameters.testMergeTreeOnObjectStorage) {
- minioHelper.setHadoopFileSystemConfig(conf)
- minioHelper.setObjectStoreConfig(conf, BUCKET_NAME)
- hdfsHelper.setHDFSStoreConfig(conf)
- hdfsHelper.setHDFSStoreConfigRocksDB(conf)
- hdfsHelper.setHdfsClientConfig(conf)
+ minioHelper.setFileSystem(conf)
+ minioHelper.setStoreConfig(conf, BUCKET_NAME)
+ hdfsHelper.setFileSystem(conf)
+ hdfsHelper.setStoreConfig(conf)
} else {
conf
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
index e7064d1edc..0cc1e0955a 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseHDFSSuite.scala
@@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path
class GlutenClickHouseHDFSSuite extends GlutenClickHouseCacheBaseTestSuite {
- override protected val remotePath: String =
hdfsHelper.getHdfsUrl("tpch-data")
+ override protected val remotePath: String = hdfsHelper.hdfsURL("tpch-data")
override protected def copyDataIfNeeded(): Unit = {
val targetFile = new Path(s"$remotePath/lineitem")
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
index c1dc110779..81bf35e2eb 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution.hive
-import org.apache.gluten.backendsapi.clickhouse.RuntimeConfig
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite
import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData
@@ -65,7 +64,6 @@ class GlutenClickHouseNativeWriteTableSuite
.set("spark.sql.storeAssignmentPolicy", "legacy")
.set("spark.sql.warehouse.dir", getWarehouseDir)
.set("spark.sql.session.timeZone", sessionTimeZone)
- .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
.setMaster("local[1]")
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
index bc84f060b2..60f06ce959 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.hive
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig,
RuntimeSettings}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.CreateMergeTreeSuite
@@ -46,14 +46,13 @@ class GlutenClickHouseTableAfterRestart extends
CreateMergeTreeSuite with ReCrea
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
- .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
.setCHConfig("user_defined_path", "/tmp/user_defined")
.set("spark.sql.files.maxPartitionBytes", "20000000")
.set("spark.ui.enabled", "true")
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
.set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "100000")
- .setCHSettings("mergetree.merge_after_insert", false)
+ .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
.setCHSettings("input_format_parquet_max_block_size", 8192)
.setMaster("local[2]")
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
index 90985f412f..712ef5efb4 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.mergetree
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{CreateMergeTreeSuite,
FileSourceScanExecTransformer}
@@ -24,9 +24,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.delta.files.TahoeFileIndex
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-
import java.io.File
import scala.concurrent.duration.DurationInt
@@ -34,7 +31,6 @@ import scala.concurrent.duration.DurationInt
class GlutenClickHouseMergeTreeCacheDataSuite extends CreateMergeTreeSuite {
override protected def sparkConf: SparkConf = {
- import org.apache.gluten.backendsapi.clickhouse.CHConfig._
super.sparkConf
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
@@ -42,19 +38,16 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends
CreateMergeTreeSuite {
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
- .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
.set("spark.gluten.soft-affinity.enabled", "true")
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
- .setCHSettings("mergetree.merge_after_insert", false)
+ .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
}
+ private val remotePath: String = hdfsHelper.independentHdfsURL("test")
override protected def beforeEach(): Unit = {
super.beforeEach()
- val conf = new Configuration
- conf.set("fs.defaultFS", HDFS_URL)
- val fs = FileSystem.get(conf)
- fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+ hdfsHelper.deleteDir(remotePath)
hdfsHelper.resetMeta()
hdfsHelper.resetCache()
}
@@ -97,7 +90,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends
CreateMergeTreeSuite {
|)
|USING clickhouse
|PARTITIONED BY (l_shipdate)
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='l_linenumber,l_orderkey')
|""".stripMargin)
@@ -200,7 +193,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends
CreateMergeTreeSuite {
|)
|USING clickhouse
|PARTITIONED BY (l_shipdate)
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='l_linenumber,l_orderkey')
|""".stripMargin)
@@ -307,7 +300,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends
CreateMergeTreeSuite {
|)
|USING clickhouse
|PARTITIONED BY (l_shipdate)
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='l_linenumber,l_orderkey')
|""".stripMargin)
@@ -324,7 +317,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends
CreateMergeTreeSuite {
val res = spark
.sql(s"""
|cache data
- | select * from '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ | select * from '$remotePath/lineitem_mergetree_hdfs'
| after l_shipdate AS OF '1995-01-10'
| CACHEPROPERTIES(storage_policy='__hdfs_main',
| aaa='ccc')""".stripMargin)
@@ -409,7 +402,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends
CreateMergeTreeSuite {
| L_COMMENT string
|)
|USING clickhouse
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin)
@@ -498,7 +491,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends
CreateMergeTreeSuite {
|)
|USING clickhouse
|PARTITIONED BY (L_SHIPDATE)
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='L_LINENUMBER,L_ORDERKEY')
|""".stripMargin)
@@ -515,7 +508,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends
CreateMergeTreeSuite {
val res = spark
.sql(s"""
|cache data
- | select * from '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ | select * from '$remotePath/lineitem_mergetree_hdfs'
| after L_SHIPDATE AS OF '1995-01-10'
| CACHEPROPERTIES(storage_policy='__hdfs_main',
| aaa='ccc')""".stripMargin)
@@ -576,7 +569,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite extends
CreateMergeTreeSuite {
test("test disable cache files return") {
withSQLConf(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key -> "false") {
runSql(
- s"CACHE FILES select * from
'${hdfsHelper.getHdfsUrl("tpch-data/lineitem")}'",
+ s"CACHE FILES select * from
'${hdfsHelper.hdfsURL("tpch-data/lineitem")}'",
noFallBack = false) {
df =>
val res = df.collect()
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
index 9d8dc08bb9..a3908274ec 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.mergetree
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig,
RuntimeSettings}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{CreateMergeTreeSuite,
FileSourceScanExecTransformer}
@@ -46,7 +46,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends
CreateMergeTreeSuite {
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
- .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
.set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "10000")
@@ -54,7 +53,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends
CreateMergeTreeSuite {
"spark.databricks.delta.retentionDurationCheck.enabled",
"false"
) // otherwise, RETAIN 0 HOURS will fail
- .setCHSettings("mergetree.merge_after_insert", false)
+ .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
.setCHSettings("input_format_parquet_max_block_size", 8192)
}
@@ -494,7 +493,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends
CreateMergeTreeSuite {
test("test mergetree insert with optimize basic") {
withSQLConf(
DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> "200000000",
- CHConfig.runtimeSettings("mergetree.merge_after_insert") -> "true"
+ RuntimeSettings.MERGE_AFTER_INSERT.key -> "true"
) {
spark.sql(s"""
|DROP TABLE IF EXISTS
lineitem_mergetree_insert_optimize_basic;
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index dc5ffd936d..ff237e9600 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -54,7 +54,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite extends
CreateMergeTreeSuite
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
.set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "100000")
- .setCHSettings("mergetree.merge_after_insert", false)
+ .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
.setCHSettings("input_format_parquet_max_block_size", 8192)
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeTPCHSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeTPCHSuite.scala
new file mode 100644
index 0000000000..67fe2c1837
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeTPCHSuite.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.mergetree
+
+import org.apache.gluten.backendsapi.clickhouse.CHConfig
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+trait TPCHObjectStore extends TPCHDatabase {
+ // parquet data source
+ override protected def parquetSourceDB: String = "parquet_source"
+ val policy: String
+ val remotePath: String
+
+ override protected def createTestTables(): Unit = {
+ createTPCHTables(
+ s"$remotePath/default",
+ format = "clickhouse",
+ isNull = false,
+ props = Map("storage_policy" -> s"'$policy'") // use the policy to
create tables
+ )
+ insertIntoTPCHTables(parquetSourceDB)
+ }
+}
+
+/**
+ * An experimental configuration trait to find a way to reduce suite
configuration for MergeTree
+ * data format in Spark tests.
+ *
+ * This trait centralizes common configuration settings needed across multiple
test suites working
+ * with MergeTree format, eliminating the need to duplicate these settings in
each test class. By
+ * extending this trait, test suites gain a standardized configuration
foundation while only needing
+ * to implement the `useOnePipeline` variable.
+ *
+ * The trait handles several aspects of configuration:
+ * - Sets up the columnar shuffle management system
+ * - Configures Delta Lake session extensions
+ * - Establishes ClickHouse catalog implementation
+ * - Enables native writer optimizations with appropriate pipeline settings
+ *
+ * Test suites can customize behavior by defining the `useOnePipeline` value
based on their specific
+ * testing requirements or Spark version compatibility.
+ */
+
+trait WriteMergeTreeConf extends SharedSparkSession {
+ val useOnePipeline: Boolean
+ override protected def sparkConf: SparkConf = {
+ import org.apache.spark.shuffle.sort.ColumnarShuffleManager
+ import
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseSparkCatalog
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
+ import io.delta.sql.DeltaSparkSessionExtension
+
+ super.sparkConf
+ .set("spark.shuffle.manager", classOf[ColumnarShuffleManager].getName)
+ .set(SPARK_SESSION_EXTENSIONS.key,
classOf[DeltaSparkSessionExtension].getName)
+ .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
classOf[ClickHouseSparkCatalog].getName)
+ .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
+ .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key,
useOnePipeline.toString)
+ }
+}
+
+abstract class GlutenClickHouseMergeTreeTPCHSuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with TPCHMergeTreeResult
+ with TPCHObjectStore
+ with WriteMergeTreeConf {
+
+ override val useOnePipeline: Boolean = spark35
+ final override val testCases: Seq[Int] = Seq(
+ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
22
+ )
+ setupTestCase()
+}
+
+class MinioTCPHTestSuite extends GlutenClickHouseMergeTreeTPCHSuite {
+ override val policy: String = minioHelper.STORE_POLICY_NOCACHE
+ override val remotePath: String = s"s3a://$BUCKET_NAME"
+
+ override def beforeAll(): Unit = {
+ if (minioHelper.bucketExists(BUCKET_NAME)) {
+ minioHelper.clearBucket(BUCKET_NAME)
+ }
+ minioHelper.createBucket(BUCKET_NAME)
+ minioHelper.resetMeta()
+ super.beforeAll()
+ }
+}
+
+class HDFSTCPHTestSuite extends GlutenClickHouseMergeTreeTPCHSuite {
+
+ override val policy: String = hdfsHelper.STORE_POLICY_ROCKSDB
+ override val remotePath: String = hdfsHelper.independentHdfsURL()
+
+ override def beforeAll(): Unit = {
+ hdfsHelper.deleteDir(remotePath)
+ hdfsHelper.resetMeta()
+ super.beforeAll()
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 043b19b8f0..32876f1c10 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.mergetree
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig,
RuntimeSettings}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{CreateMergeTreeSuite,
FileSourceScanExecTransformer}
@@ -28,15 +28,11 @@ import
org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types._
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-
import scala.concurrent.duration.DurationInt
class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends CreateMergeTreeSuite {
override protected def sparkConf: SparkConf = {
- import org.apache.gluten.backendsapi.clickhouse.CHConfig._
super.sparkConf
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
@@ -44,18 +40,15 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends
CreateMergeTreeSuite {
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
- .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
- .setCHSettings("mergetree.merge_after_insert", false)
+ .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
}
+ private val remotePath: String = hdfsHelper.independentHdfsURL("test")
override protected def beforeEach(): Unit = {
super.beforeEach()
- val conf = new Configuration
- conf.set("fs.defaultFS", HDFS_URL)
- val fs = FileSystem.get(conf)
- fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+ hdfsHelper.deleteDir(remotePath)
hdfsHelper.resetMeta()
}
@@ -90,7 +83,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends
CreateMergeTreeSuite {
| l_comment string
|)
|USING clickhouse
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin)
@@ -155,7 +148,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends
CreateMergeTreeSuite {
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='l_shipdate,l_orderkey',
| primaryKey='l_shipdate')
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_orderbykey_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_orderbykey_hdfs'
|""".stripMargin)
spark.sql(s"""
@@ -222,7 +215,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends
CreateMergeTreeSuite {
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='l_orderkey',
| primaryKey='l_orderkey')
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_partition_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_partition_hdfs'
|""".stripMargin)
// dynamic partitions
@@ -384,7 +377,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends
CreateMergeTreeSuite {
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='c1',
| primaryKey='c1')
- |LOCATION '$HDFS_URL/test/partition_escape'
+ |LOCATION '$remotePath/partition_escape'
|""".stripMargin)
spark.sql("insert into partition_escape select * from origin_table")
@@ -420,7 +413,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends
CreateMergeTreeSuite {
|PARTITIONED BY (l_returnflag)
|CLUSTERED BY (l_orderkey)
|${if (spark32) "" else "SORTED BY (l_partkey)"} INTO 4
BUCKETS
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_bucket_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_bucket_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin)
@@ -469,7 +462,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends
CreateMergeTreeSuite {
}
testSparkVersionLE33("test mergetree write with the path based bucket
table") {
- val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
+ val dataPath = s"$remotePath/lineitem_mergetree_bucket_hdfs"
val sourceDF = spark.sql(s"""
|select * from lineitem
@@ -525,12 +518,12 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends
CreateMergeTreeSuite {
test("test mergetree insert with optimize basic") {
val tableName = "lineitem_mergetree_insert_optimize_basic_hdfs"
- val dataPath = s"$HDFS_URL/test/$tableName"
+ val dataPath = s"$remotePath/$tableName"
withSQLConf(
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
- CHConfig.runtimeSettings("mergetree.merge_after_insert") -> "true",
- CHConfig.runtimeSettings("mergetree.insert_without_local_storage") ->
"true",
+ RuntimeSettings.MERGE_AFTER_INSERT.key -> "true",
+ RuntimeSettings.INSERT_WITHOUT_LOCAL_STORAGE.key -> "true",
RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key -> "10000"
) {
spark.sql(s"""
@@ -547,17 +540,9 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite extends
CreateMergeTreeSuite {
val ret = spark.sql(s"select count(*) from $tableName").collect()
assertResult(600572)(ret.apply(0).get(0))
- val conf = new Configuration
- conf.set("fs.defaultFS", HDFS_URL)
- val fs = FileSystem.get(conf)
eventually(timeout(60.seconds), interval(2.seconds)) {
- val it = fs.listFiles(new Path(dataPath), true)
- var files = 0
- while (it.hasNext) {
- it.next()
- files += 1
- }
+ val files = hdfsHelper.countDir(dataPath)
assertResult(4)(files)
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
index 18da968b30..3840d0ec32 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.mergetree
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig,
RuntimeSettings}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{CreateMergeTreeSuite,
FileSourceScanExecTransformer}
@@ -27,15 +27,11 @@ import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-
import scala.concurrent.duration.DurationInt
class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends
CreateMergeTreeSuite {
override protected def sparkConf: SparkConf = {
- import org.apache.gluten.backendsapi.clickhouse.CHConfig._
super.sparkConf
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
@@ -43,18 +39,15 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
- .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
- .setCHSettings("mergetree.merge_after_insert", false)
+ .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
}
+ private val remotePath: String = hdfsHelper.independentHdfsURL("test")
override protected def beforeEach(): Unit = {
super.beforeEach()
- val conf = new Configuration
- conf.set("fs.defaultFS", HDFS_URL)
- val fs = FileSystem.get(conf)
- fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+ hdfsHelper.deleteDir(remotePath)
hdfsHelper.resetMeta()
}
@@ -89,7 +82,7 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
| l_comment string
|)
|USING clickhouse
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb')
|""".stripMargin)
@@ -154,7 +147,7 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
|TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb',
| orderByKey='l_shipdate,l_orderkey',
| primaryKey='l_shipdate')
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_orderbykey_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_orderbykey_hdfs'
|""".stripMargin)
spark.sql(s"""
@@ -221,7 +214,7 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
|TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb',
| orderByKey='l_orderkey',
| primaryKey='l_orderkey')
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_partition_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_partition_hdfs'
|""".stripMargin)
// dynamic partitions
@@ -370,7 +363,7 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
|PARTITIONED BY (l_returnflag)
|CLUSTERED BY (l_orderkey)
|${if (spark32) "" else "SORTED BY (l_partkey)"} INTO 4
BUCKETS
- |LOCATION '$HDFS_URL/test/lineitem_mergetree_bucket_hdfs'
+ |LOCATION '$remotePath/lineitem_mergetree_bucket_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb')
|""".stripMargin)
@@ -419,7 +412,7 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
}
testSparkVersionLE33("test mergetree write with the path based bucket
table") {
- val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
+ val dataPath = s"$remotePath/lineitem_mergetree_bucket_hdfs"
val sourceDF = spark.sql(s"""
|select * from lineitem
@@ -475,12 +468,12 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
test("test mergetree insert with optimize basic") {
val tableName = "lineitem_mergetree_insert_optimize_basic_hdfs"
- val dataPath = s"$HDFS_URL/test/$tableName"
+ val dataPath = s"$remotePath/$tableName"
withSQLConf(
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
- CHConfig.runtimeSettings("mergetree.merge_after_insert") -> "true",
- CHConfig.runtimeSettings("mergetree.insert_without_local_storage") ->
"true",
+ RuntimeSettings.MERGE_AFTER_INSERT.key -> "true",
+ RuntimeSettings.INSERT_WITHOUT_LOCAL_STORAGE.key -> "true",
RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key -> "10000"
) {
spark.sql(s"""
@@ -497,17 +490,9 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite extends CreateMer
val ret = spark.sql(s"select count(*) from $tableName").collect()
assertResult(600572)(ret.apply(0).get(0))
- val conf = new Configuration
- conf.set("fs.defaultFS", HDFS_URL)
- val fs = FileSystem.get(conf)
eventually(timeout(60.seconds), interval(2.seconds)) {
- val it = fs.listFiles(new Path(dataPath), true)
- var files = 0
- while (it.hasNext) {
- it.next()
- files += 1
- }
+ val files = hdfsHelper.countDir(dataPath)
assertResult(4)(files)
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 79b759c1ae..c68c51e50d 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.mergetree
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig}
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BasicScanExecTransformer,
CreateMergeTreeSuite, FileSourceScanExecTransformer}
@@ -40,7 +40,6 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite extends
CreateMergeTreeSuite {
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
- .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
}
@@ -506,8 +505,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite extends
CreateMergeTreeSuite {
withSQLConf(
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
- CHConfig.runtimeSettings("mergetree.insert_without_local_storage") ->
"true",
- CHConfig.runtimeSettings("mergetree.merge_after_insert") -> "true"
+ RuntimeSettings.INSERT_WITHOUT_LOCAL_STORAGE.key -> "true",
+ RuntimeSettings.MERGE_AFTER_INSERT.key -> "true"
) {
spark.sql(s"""
|DROP TABLE IF EXISTS $tableName;
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteStatsSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteStatsSuite.scala
index 43e8b25134..df4dae03c9 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteStatsSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteStatsSuite.scala
@@ -16,8 +16,7 @@
*/
package org.apache.gluten.execution.mergetree
-import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig}
-import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeSettings}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{FileSourceScanExecTransformer,
GlutenClickHouseTPCDSAbstractSuite}
@@ -29,9 +28,6 @@ import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.functions.input_file_name
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-
class GlutenClickHouseMergeTreeWriteStatsSuite extends
GlutenClickHouseTPCDSAbstractSuite {
override protected def sparkConf: SparkConf = {
@@ -47,18 +43,15 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends
GlutenClickHouseTPCDSAbst
.set("spark.databricks.delta.stats.enabled", "true")
.set("spark.databricks.delta.optimizeWrite.enabled", "true")
.set("spark.sql.storeAssignmentPolicy", "LEGACY")
- .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
- .setCHSettings("mergetree.merge_after_insert", false)
.set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
+ .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
}
+ private val remotePath: String = hdfsHelper.independentHdfsURL("stats")
override protected def beforeEach(): Unit = {
super.beforeEach()
- val conf = new Configuration
- conf.set("fs.defaultFS", HDFS_URL)
- val fs = FileSystem.get(conf)
- fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+ hdfsHelper.deleteDir(remotePath)
hdfsHelper.resetMeta()
}
@@ -98,7 +91,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends
GlutenClickHouseTPCDSAbst
| ss_sold_date_sk INT
|)
|USING clickhouse
- |LOCATION '$HDFS_URL/stats/store_sales'
+ |LOCATION '$remotePath/store_sales'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin,
"store_returns" ->
@@ -127,7 +120,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends
GlutenClickHouseTPCDSAbst
| sr_returned_date_sk INT
|)
|USING clickhouse
- |LOCATION '$HDFS_URL/stats/store_returns'
+ |LOCATION '$remotePath/store_returns'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin,
"catalog_sales" ->
@@ -170,7 +163,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends
GlutenClickHouseTPCDSAbst
| cs_sold_date_sk INT
|)
|USING clickhouse
- |LOCATION '$HDFS_URL/stats/catalog_sales'
+ |LOCATION '$remotePath/catalog_sales'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin,
"catalog_returns" ->
@@ -206,7 +199,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends
GlutenClickHouseTPCDSAbst
| cr_returned_date_sk INT
|)
|USING clickhouse
- |LOCATION '$HDFS_URL/stats/catalog_returns'
+ |LOCATION '$remotePath/catalog_returns'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin,
"web_sales" ->
@@ -249,7 +242,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends
GlutenClickHouseTPCDSAbst
| ws_sold_date_sk INT
|)
|USING clickhouse
- |LOCATION '$HDFS_URL/stats/web_sales'
+ |LOCATION '$remotePath/web_sales'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin,
"web_returns" ->
@@ -282,7 +275,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends
GlutenClickHouseTPCDSAbst
| wr_returned_date_sk INT
|)
|USING clickhouse
- |LOCATION '$HDFS_URL/stats/web_returns'
+ |LOCATION '$remotePath/web_returns'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin,
"inventory" ->
@@ -295,7 +288,7 @@ class GlutenClickHouseMergeTreeWriteStatsSuite extends
GlutenClickHouseTPCDSAbst
| inv_date_sk INT
|)
|USING clickhouse
- |LOCATION '$HDFS_URL/stats/inventory'
+ |LOCATION '$remotePath/inventory'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin
)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index ed516162cf..336c98333d 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -51,7 +51,7 @@ class GlutenClickHouseMergeTreeWriteSuite extends
CreateMergeTreeSuite {
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
.set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "100000")
- .setCHSettings("mergetree.merge_after_insert", false)
+ .set(RuntimeSettings.MERGE_AFTER_INSERT.key, "false")
.setCHSettings("input_format_parquet_max_block_size", 8192)
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
index e08bd280c3..4a5a923162 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution.metrics
-import org.apache.gluten.backendsapi.clickhouse.RuntimeConfig
import org.apache.gluten.execution._
import org.apache.gluten.execution.GlutenPlan
@@ -44,7 +43,6 @@ class GlutenClickHouseTPCHMetricsSuite extends
ParquetTPCHSuite {
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
- .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
.setCHSettings("input_format_parquet_max_block_size",
parquetMaxBlockSize)
.setCHConfig("enable_pre_projection_for_join_conditions", "false")
.setCHConfig("enable_streaming_aggregating", true)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
index 6aa872146e..366d53e4d2 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/HDFSTestHelper.scala
@@ -21,49 +21,80 @@ import
org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
import org.apache.spark.SparkConf
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.File
-class HDFSTestHelper(TMP_PREFIX: String) {
+class HDFSTestHelper(TMP_PREFIX: String, independentDir: String) {
// HDFS parameters
val HDFS_CACHE_PATH = s"$TMP_PREFIX/hdfs/cache"
private val HDFS_METADATA_PATH = s"$TMP_PREFIX/hdfs/metadata"
private val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
- def getHdfsUrl(dirName: String): String = s"$HDFS_URL_ENDPOINT/$dirName"
+ def hdfsURL(dirName: String): String = s"$HDFS_URL_ENDPOINT/$dirName"
+ def independentHdfsURL(dirName: String = ""): String =
+ if (dirName.isEmpty) {
+ s"$HDFS_URL_ENDPOINT/$independentDir"
+ } else {
+ s"$HDFS_URL_ENDPOINT/$independentDir/$dirName"
+ }
def metaPath(dirName: String): String = s"$HDFS_METADATA_PATH/$dirName"
- def setHDFSStoreConfig(conf: SparkConf): SparkConf = {
- conf.setCHConfig(
- "storage_configuration.disks.hdfs.type" -> "hdfs_gluten",
- "storage_configuration.disks.hdfs.endpoint" -> s"$HDFS_URL_ENDPOINT/",
- "storage_configuration.disks.hdfs.metadata_path" -> HDFS_METADATA_PATH,
- "storage_configuration.disks.hdfs_cache.type" -> "cache",
- "storage_configuration.disks.hdfs_cache.disk" -> "hdfs",
- "storage_configuration.disks.hdfs_cache.path" -> HDFS_CACHE_PATH,
- "storage_configuration.disks.hdfs_cache.max_size" -> "10Gi",
- "storage_configuration.policies.__hdfs_main.volumes" -> "main",
- "storage_configuration.policies.__hdfs_main.volumes.main.disk" ->
"hdfs_cache"
- )
+ def builder(policyName: String): StoreConfigBuilder = new
StoreConfigBuilder(policyName)
+
+ val STORE_POLICY = "__hdfs_main"
+ val STORE_POLICY_ROCKSDB = "__hdfs_main_rocksdb"
+
+ def deleteDir(dirName: String): Unit = {
+ val hdfs_dir = hdfsDir(dirName)
+ val conf = new Configuration
+ conf.set("fs.defaultFS", hdfs_dir)
+ val fs = FileSystem.get(conf)
+ fs.delete(new Path(hdfs_dir), true)
}
- def setHDFSStoreConfigRocksDB(conf: SparkConf): SparkConf = {
- conf.setCHConfig(
- "storage_configuration.disks.hdfs2.type" -> "hdfs_gluten",
- "storage_configuration.disks.hdfs2.endpoint" -> s"$HDFS_URL_ENDPOINT/",
- "storage_configuration.disks.hdfs2.metadata_path" -> HDFS_METADATA_PATH,
- "storage_configuration.disks.hdfs2.metadata_type" -> "rocksdb",
- "storage_configuration.disks.hdfs_cache2.type" -> "cache",
- "storage_configuration.disks.hdfs_cache2.disk" -> "hdfs2",
- "storage_configuration.disks.hdfs_cache2.path" -> HDFS_CACHE_PATH,
- "storage_configuration.disks.hdfs_cache2.max_size" -> "10Gi",
- "storage_configuration.policies.__hdfs_main_rocksdb.volumes" -> "main",
- "storage_configuration.policies.__hdfs_main_rocksdb.volumes.main.disk"
-> "hdfs_cache2"
- )
+ def countDir(dirName: String): Int = {
+ val hdfs_dir = hdfsDir(dirName)
+
+ val conf = new Configuration
+ conf.set("fs.defaultFS", hdfs_dir)
+ val fs = FileSystem.get(conf)
+ val it = fs.listFiles(new Path(hdfs_dir), true)
+ var count = 0
+ while (it.hasNext) {
+ it.next()
+ count += 1
+ }
+ count
+ }
+
+ private def hdfsDir(dirName: String) = {
+ dirName match {
+ case d if d.startsWith("/") => s"$HDFS_URL_ENDPOINT$d"
+ case d if d.startsWith(HDFS_URL_ENDPOINT) =>
+ d // Keep paths that already start with the endpoint
+ case d => s"$HDFS_URL_ENDPOINT/$d"
+ }
+ }
+
+ def setStoreConfig(conf: SparkConf): SparkConf = {
+ builder(STORE_POLICY)
+ .withEndpoint(s"$HDFS_URL_ENDPOINT/")
+ .withMetadataPath(HDFS_METADATA_PATH)
+ .withCachePath(HDFS_CACHE_PATH)
+ .build(conf)
+
+ builder(STORE_POLICY_ROCKSDB)
+ .withEndpoint(s"$HDFS_URL_ENDPOINT/")
+ .withMetadataPath(HDFS_METADATA_PATH)
+ .withDiskcache(false)
+ .withRocksDB(true)
+ .build(conf)
}
- def setHdfsClientConfig(conf: SparkConf): SparkConf = {
+ def setFileSystem(conf: SparkConf): SparkConf = {
conf.setCHConfig(
"hdfs.dfs_client_read_shortcircuit" -> "false",
"hdfs.dfs_default_replica" -> "1"
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
index 0667c2c843..f82c09adb6 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/MinioTestHelper.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.utils
-import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+import org.apache.gluten.backendsapi.clickhouse.GlutenObjectStorageConfig
import org.apache.spark.SparkConf
@@ -27,7 +27,7 @@ import org.apache.commons.io.FileUtils
import java.io.File
import java.util
-import scala.collection.JavaConverters._
+import scala.collection.mutable
class MinioTestHelper(TMP_PREFIX: String) {
@@ -41,6 +41,9 @@ class MinioTestHelper(TMP_PREFIX: String) {
val S3_CACHE_PATH = s"$TMP_PREFIX/s3/cache"
val S3A_ENDPOINT = "s3a://"
+ val STORE_POLICY = "__s3_main"
+ val STORE_POLICY_NOCACHE = "__s3_main_2"
+
private lazy val client = MinioClient
.builder()
.endpoint(MINIO_ENDPOINT)
@@ -77,12 +80,12 @@ class MinioTestHelper(TMP_PREFIX: String) {
def listObjects(bucketName: String, prefix: String): Iterable[String] = {
val args =
ListObjectsArgs.builder().bucket(bucketName).recursive(true).prefix(prefix).build()
- val objectNames = new util.ArrayList[String]()
- client.listObjects(args).forEach(obj =>
objectNames.add(obj.get().objectName()))
- objectNames.asScala
+ val objectNames = mutable.ArrayBuffer[String]()
+ client.listObjects(args).forEach(obj =>
objectNames.append(obj.get().objectName()))
+ objectNames
}
- def setHadoopFileSystemConfig(conf: SparkConf): SparkConf = {
+ def setFileSystem(conf: SparkConf): SparkConf = {
conf
.set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
.set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
@@ -92,22 +95,23 @@ class MinioTestHelper(TMP_PREFIX: String) {
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
}
- def setObjectStoreConfig(conf: SparkConf, BUCKET_NAME: String): SparkConf = {
- val wholePath: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
- conf
- .setCHConfig(
- "storage_configuration.disks.s3.type" -> "s3_gluten",
- "storage_configuration.disks.s3.endpoint" -> wholePath,
- "storage_configuration.disks.s3.access_key_id" -> S3_ACCESS_KEY,
- "storage_configuration.disks.s3.secret_access_key" -> S3_SECRET_KEY,
- "storage_configuration.disks.s3.metadata_path" -> S3_METADATA_PATH,
- "storage_configuration.disks.s3_cache.type" -> "cache",
- "storage_configuration.disks.s3_cache.disk" -> "s3",
- "storage_configuration.disks.s3_cache.path" -> S3_CACHE_PATH,
- "storage_configuration.disks.s3_cache.max_size" -> "10Gi",
- "storage_configuration.policies.__s3_main.volumes" -> "main",
- "storage_configuration.policies.__s3_main.volumes.main.disk" ->
"s3_cache"
- )
+ def builder(policyName: String): StoreConfigBuilder =
+ new StoreConfigBuilder(policyName, GlutenObjectStorageConfig.S3_DISK_TYPE)
+
+ def setStoreConfig(conf: SparkConf, BUCKET_NAME: String): SparkConf = {
+ builder(STORE_POLICY)
+ .withEndpoint(s"$MINIO_ENDPOINT$BUCKET_NAME/")
+ .withMetadataPath(S3_METADATA_PATH)
+ .withCachePath(S3_CACHE_PATH)
+ .withAKSK(S3_ACCESS_KEY, S3_SECRET_KEY)
+ .build(conf)
+
+ builder(STORE_POLICY_NOCACHE)
+ .withEndpoint(s"$MINIO_ENDPOINT$BUCKET_NAME/")
+ .withMetadataPath(S3_METADATA_PATH)
+ .withDiskcache(false)
+ .withAKSK(S3_ACCESS_KEY, S3_SECRET_KEY)
+ .build(conf)
}
def resetMeta(): Unit = {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/StoreConfigBuilder.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/StoreConfigBuilder.scala
new file mode 100644
index 0000000000..cb9678f88b
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/StoreConfigBuilder.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+import org.apache.gluten.backendsapi.clickhouse.GlutenObjectStorageConfig
+
+import org.apache.spark.SparkConf
+
+import scala.collection.mutable
+
+class StoreConfigBuilder(
+ val policyName: String,
+ val diskType: String = GlutenObjectStorageConfig.HDFS_DISK_TYPE) {
+ private var useDiskcache: Boolean = true
+ private var useRocksDB: Boolean = false
+
+ private var CACHE_PATH: String = _
+ private var METADATA_PATH: String = _
+ private var ENDPOINT: String = _
+
+ private var ak: String = _
+ private var sk: String = _
+
+ def withEndpoint(endpoint: String): StoreConfigBuilder = {
+ this.ENDPOINT = endpoint
+ this
+ }
+
+ def withMetadataPath(metadataPath: String): StoreConfigBuilder = {
+ this.METADATA_PATH = metadataPath
+ this
+ }
+
+ def withCachePath(cachePath: String): StoreConfigBuilder = {
+ this.CACHE_PATH = cachePath
+ this
+ }
+
+ def withDiskcache(useDiskcache: Boolean): StoreConfigBuilder = {
+ this.useDiskcache = useDiskcache
+ this
+ }
+
+ def withAKSK(ak: String, sk: String): StoreConfigBuilder = {
+ require(ak != null && sk != null, "ak and sk must not be null")
+ this.ak = ak
+ this.sk = sk
+ this
+ }
+
+ def withRocksDB(useRocksDB: Boolean): StoreConfigBuilder = {
+ this.useRocksDB = useRocksDB
+ this
+ }
+
+ private def extractStorageType(typeString: String): String = {
+ if (typeString.contains("_")) {
+ typeString.split("_").head
+ } else {
+ typeString
+ }
+ }
+
+ def build(conf: SparkConf): SparkConf = {
+ val prefix = extractStorageType(diskType)
+ val disk = if (useRocksDB) s"${prefix}2" else prefix
+ val disk_cache = if (useRocksDB) s"${prefix}_cache2" else
s"${prefix}_cache"
+ val main_disk = if (useDiskcache) disk_cache else disk
+
+ require(ENDPOINT != null, "ENDPOINT is null")
+ require(METADATA_PATH != null, "METADATA_PATH is null")
+
+ val settings = mutable.ArrayBuffer[(String, String)]()
+ settings.appendAll(
+ Seq(
+ s"storage_configuration.disks.$disk.type" -> diskType,
+ s"storage_configuration.disks.$disk.endpoint" -> s"$ENDPOINT",
+ s"storage_configuration.disks.$disk.metadata_path" -> METADATA_PATH
+ ))
+
+ if (ak != null && sk != null) {
+ settings.appendAll(
+ Seq(
+ s"storage_configuration.disks.$disk.access_key_id" -> ak,
+ s"storage_configuration.disks.$disk.secret_access_key" -> sk
+ ))
+ }
+
+ if (useDiskcache) {
+ require(CACHE_PATH != null, "CACHE_PATH is null")
+ settings.appendAll(
+ Seq(
+ s"storage_configuration.disks.$disk_cache.type" -> "cache",
+ s"storage_configuration.disks.$disk_cache.disk" -> disk,
+ s"storage_configuration.disks.$disk_cache.path" -> CACHE_PATH,
+ s"storage_configuration.disks.$disk_cache.max_size" -> "10Gi"
+ ))
+ }
+ settings.appendAll(
+ Seq(
+ s"storage_configuration.policies.$policyName.volumes" -> "main",
+ s"storage_configuration.policies.$policyName.volumes.main.disk" ->
main_disk
+ ))
+
+ if (useRocksDB) {
+ settings.append(s"storage_configuration.disks.$disk.metadata_type" ->
"rocksdb")
+ }
+ settings.foreach(setting => conf.setCHConfig(setting))
+ conf
+ }
+}
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index be6fb587e6..f16df23d38 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20250502
-CH_COMMIT=376bf9d0d91
+CH_COMMIT=83de175e411
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 84eb55f379..33f9a62f64 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -35,8 +35,8 @@
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeArray.h>
-#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeMap.h>
+#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesDecimal.h>
@@ -468,7 +468,7 @@ std::vector<String>
BackendInitializerUtil::wrapDiskPathConfig(
};
Poco::Util::AbstractConfiguration::Keys disks;
- std::unordered_set<String> disk_types = {"s3_gluten", "hdfs_gluten",
"cache"};
+ std::unordered_set<String> disk_types =
{GlutenObjectStorageConfig::S3_DISK_TYPE,
GlutenObjectStorageConfig::HDFS_DISK_TYPE, "cache"};
config.keys("storage_configuration.disks", disks);
std::ranges::for_each(
@@ -481,7 +481,7 @@ std::vector<String>
BackendInitializerUtil::wrapDiskPathConfig(
return;
if (disk_type == "cache")
change_func(disk_prefix + ".path");
- else if (disk_type == "s3_gluten" || disk_type == "hdfs_gluten")
+ else if (disk_type == GlutenObjectStorageConfig::S3_DISK_TYPE ||
disk_type == GlutenObjectStorageConfig::HDFS_DISK_TYPE)
change_func(disk_prefix + ".metadata_path");
});
@@ -545,7 +545,8 @@ DB::Context::ConfigurationPtr
BackendInitializerUtil::initConfig(const SparkConf
// FIXMEX: workaround for
https://github.com/ClickHouse/ClickHouse/pull/75452#pullrequestreview-2625467710
// entry in DiskSelector::initialize
// Bug in FileCacheSettings::loadFromConfig
- auto updateCacheDiskType = [](Poco::Util::AbstractConfiguration & config) {
+ auto updateCacheDiskType = [](Poco::Util::AbstractConfiguration & config)
+ {
const std::string config_prefix = "storage_configuration.disks";
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
@@ -671,7 +672,7 @@ void BackendInitializerUtil::initSettings(const
SparkConfigs::ConfigMap & spark_
/// We currently do not support lazy materialization.
/// "test 'order by' two keys" will failed if we enable it.
settings[Setting::query_plan_optimize_lazy_materialization] = false;
-
+
for (const auto & [key, value] : spark_conf_map)
{
// Firstly apply
spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to
settings
@@ -762,7 +763,8 @@ void BackendInitializerUtil::initSettings(const
SparkConfigs::ConfigMap & spark_
{
auto mem_gb = task_memory / static_cast<double>(1_GiB);
// 2.8x+5, Heuristics calculate the block size of external sort,
[8,16]
- settings[Setting::prefer_external_sort_block_bytes] =
std::max(std::min(static_cast<size_t>(2.8 * mem_gb + 5), 16ul), 8ul) * 1024 *
1024;
+ settings[Setting::prefer_external_sort_block_bytes]
+ = std::max(std::min(static_cast<size_t>(2.8 * mem_gb + 5),
16ul), 8ul) * 1024 * 1024;
}
}
}
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h
b/cpp-ch/local-engine/Common/GlutenConfig.h
index 62e0d228e0..7b2a33a9b6 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -31,7 +31,6 @@ struct ReadSettings;
}
namespace local_engine
{
-
struct SparkConfigs
{
using ConfigMap = google::protobuf::Map<std::string, std::string>;
@@ -196,4 +195,10 @@ struct GlutenCacheConfig
inline static const String ENABLED = "enable.gluten_cache.local";
};
+struct GlutenObjectStorageConfig
+{
+ inline static const String S3_DISK_TYPE = "s3_gluten";
+ inline static const String HDFS_DISK_TYPE = "hdfs_gluten";
+};
+
}
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
index b4073d0f39..60a5795bc3 100644
---
a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
@@ -15,10 +15,14 @@
* limitations under the License.
*/
#include "config.h"
+
#include <Core/Settings.h>
#include <Disks/ObjectStorages/ObjectStorageFactory.h>
+#include <Interpreters/Context.h>
+#include <Common/GlutenConfig.h>
+#include <Common/Macros.h>
+
#if USE_AWS_S3
-#include <Disks/ObjectStorages/S3/DiskS3Utils.h>
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#endif
@@ -27,10 +31,6 @@
#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
#endif
-#include <Interpreters/Context.h>
-#include <Common/Macros.h>
-
-
namespace DB
{
namespace Setting
@@ -50,10 +50,7 @@ namespace local_engine
using namespace DB;
#if USE_AWS_S3
-static S3::URI getS3URI(
- const Poco::Util::AbstractConfiguration & config,
- const std::string & config_prefix,
- const ContextPtr & context)
+static S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, const ContextPtr & context)
{
String endpoint =
context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
@@ -65,41 +62,31 @@ static S3::URI getS3URI(
return uri;
}
-static std::string getEndpoint(
- const Poco::Util::AbstractConfiguration & config,
- const std::string & config_prefix,
- const ContextPtr & context)
+static std::string
+getEndpoint(const Poco::Util::AbstractConfiguration & config, const
std::string & config_prefix, const ContextPtr & context)
{
return context->getMacros()->expand(config.getString(config_prefix +
".endpoint"));
}
void registerGlutenS3ObjectStorage(ObjectStorageFactory & factory)
{
- static constexpr auto disk_type = "s3_gluten";
-
factory.registerObjectStorageType(
- disk_type,
- [](
- const std::string & name,
- const Poco::Util::AbstractConfiguration & config,
- const std::string & config_prefix,
- const ContextPtr & context,
- bool /*skip_access_check*/) -> ObjectStoragePtr
+ GlutenObjectStorageConfig::S3_DISK_TYPE,
+ [](const std::string & name,
+ const Poco::Util::AbstractConfiguration & config,
+ const std::string & config_prefix,
+ const ContextPtr & context,
+ bool /*skip_access_check*/) -> ObjectStoragePtr
{
auto uri = getS3URI(config, config_prefix, context);
auto s3_capabilities = getCapabilitiesFromConfig(config,
config_prefix);
auto endpoint = getEndpoint(config, config_prefix, context);
- auto settings = getSettings(config, config_prefix, context,
endpoint, /* validate_settings */true);
- auto client = getClient(endpoint, *settings, context, /*
for_disk_s3 */true);
+ auto settings = getSettings(config, config_prefix, context,
endpoint, /* validate_settings */ true);
+ auto client = getClient(endpoint, *settings, context, /*
for_disk_s3 */ true);
auto key_generator =
createObjectStorageKeysGeneratorAsIsWithPrefix(uri.key);
- auto object_storage = std::make_shared<S3ObjectStorage>(
- std::move(client),
- std::move(settings),
- uri,
- s3_capabilities,
- key_generator,
- name);
+ auto object_storage
+ = std::make_shared<S3ObjectStorage>(std::move(client),
std::move(settings), uri, s3_capabilities, key_generator, name);
return object_storage;
});
}
@@ -110,13 +97,12 @@ void registerGlutenS3ObjectStorage(ObjectStorageFactory &
factory)
void registerGlutenHDFSObjectStorage(ObjectStorageFactory & factory)
{
factory.registerObjectStorageType(
- "hdfs_gluten",
- [](
- const std::string & /* name */,
- const Poco::Util::AbstractConfiguration & config,
- const std::string & config_prefix,
- const ContextPtr & context,
- bool /* skip_access_check */) -> ObjectStoragePtr
+ GlutenObjectStorageConfig::HDFS_DISK_TYPE,
+ [](const std::string & /* name */,
+ const Poco::Util::AbstractConfiguration & config,
+ const std::string & config_prefix,
+ const ContextPtr & context,
+ bool /* skip_access_check */) -> ObjectStoragePtr
{
auto uri =
context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
checkHDFSURL(uri);
@@ -124,9 +110,7 @@ void registerGlutenHDFSObjectStorage(ObjectStorageFactory &
factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must
ends with '/', but '{}' doesn't.", uri);
std::unique_ptr<HDFSObjectStorageSettings> settings =
std::make_unique<HDFSObjectStorageSettings>(
- config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 *
1024),
- context->getSettingsRef()[Setting::hdfs_replication]
- );
+ config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 *
1024), context->getSettingsRef()[Setting::hdfs_replication]);
return std::make_shared<GlutenHDFSObjectStorage>(uri,
std::move(settings), config);
});
}
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
index ce78afa169..20029a4121 100644
--- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include "registerGlutenDisks.h"
#include "config.h"
+
#include <Disks/DiskFactory.h>
-#include <Interpreters/Context.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/MetadataStorageFactory.h>
#include <Disks/ObjectStorages/ObjectStorageFactory.h>
+#include <Common/GlutenConfig.h>
#if USE_HDFS
#include <Disks/ObjectStorages/GlutenDiskHDFS.h>
@@ -33,8 +35,6 @@
#include <Disks/ObjectStorages/MetadataStorageFromRocksDB.h>
#endif
-#include "registerGlutenDisks.h"
-
namespace local_engine
{
#if USE_AWS_S3
@@ -93,7 +93,8 @@ void registerGlutenDisks(bool global_skip_access_check)
registerGlutenS3ObjectStorage(object_factory);
- factory.registerDiskType("s3_gluten", creator); /// For compatibility
+
+ factory.registerDiskType(GlutenObjectStorageConfig::S3_DISK_TYPE,
creator); /// For compatibility
#endif
#if USE_HDFS
@@ -112,7 +113,7 @@ void registerGlutenDisks(bool global_skip_access_check)
{ return DB::ObjectStorageFactory::instance().create(name, conf,
config_prefix, ctx, skip_access_check); };
auto object_storage = object_storage_creator(config, context);
DB::MetadataStoragePtr metadata_storage;
- auto metadata_type =
DB::MetadataStorageFactory::getMetadataType(config, config_prefix, "local");
+ auto metadata_type =
DB::MetadataStorageFactory::getMetadataType(config, config_prefix, "local");
if (metadata_type == "rocksdb")
{
#if USE_ROCKSDB
@@ -138,7 +139,7 @@ void registerGlutenDisks(bool global_skip_access_check)
};
registerGlutenHDFSObjectStorage(object_factory);
- factory.registerDiskType("hdfs_gluten", hdfs_creator); /// For
compatibility
+ factory.registerDiskType(GlutenObjectStorageConfig::HDFS_DISK_TYPE,
hdfs_creator); /// For compatibility
#endif
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]