This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f70cd45e51 [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250424)
(#9406)
f70cd45e51 is described below
commit f70cd45e51fffe018c738b4de8b489c776133612
Author: Kyligence Git <[email protected]>
AuthorDate: Thu Apr 24 08:38:00 2025 -0500
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20250424) (#9406)
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250424)
* Fix ut due to https://github.com/ClickHouse/ClickHouse/pull/77940
It introduces columns_substreams.txt for MergeTree's compact mode, causing
test failures as both increased file sizes and additional file count alter
compaction patterns compared to prior implementations.
Changes:
- Updated file counting logic to exclude the new "columns_substreams.txt"
- Updated comments with correct file sizes and improved clarity
- Updated hardcoded config strings to use DeltaSQLConf constants
---------
Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang chen <[email protected]>
---
.../GlutenClickHouseMergeTreeOptimizeSuite.scala | 65 +++++++++++++---------
cpp-ch/clickhouse.version | 4 +-
2 files changed, 41 insertions(+), 28 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
index 3746501bb8..cc7e7febff 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -23,12 +23,16 @@ import
org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHo
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.MergeTreeConf
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import io.delta.tables.ClickhouseTable
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter._
import java.io.File
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.concurrent.duration.DurationInt
class GlutenClickHouseMergeTreeOptimizeSuite
@@ -85,7 +89,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
}
test("test mergetree optimize basic") {
- withSQLConf("spark.databricks.delta.optimize.maxFileSize" -> "2000000") {
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> "2000000") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_optimize;
|""".stripMargin)
@@ -108,15 +112,24 @@ class GlutenClickHouseMergeTreeOptimizeSuite
}
def countFiles(directory: File): Int = {
- if (directory.exists && directory.isDirectory &&
!directory.getName.equals("_commits")) {
- val files = directory.listFiles
- val count = files
- .filter(!_.getName.endsWith(".crc"))
- .count(_.isFile) + files.filter(_.isDirectory).map(countFiles).sum
- count + 1
- } else {
- 0
- }
+ val NO_COMMIT_DIR = new AndFileFilter(
+ DirectoryFileFilter.DIRECTORY,
+ new NotFileFilter(new NameFileFilter("_commits")))
+
+ val CRC_FILES = new SuffixFileFilter(".crc")
+ // https://github.com/ClickHouse/ClickHouse/pull/77940 introduce
"columns_substreams.txt"
+ val COLUMNS_SUBSTREAMS = new NameFileFilter("columns_substreams.txt")
+
+ val EXClUDE_FILES = new NotFileFilter(
+ new OrFileFilter(
+ CRC_FILES,
+ COLUMNS_SUBSTREAMS
+ )
+ )
+ FileUtils
+ .listFilesAndDirs(directory, EXClUDE_FILES, NO_COMMIT_DIR)
+ .asScala
+ .count(_ => true)
}
test("test mergetree optimize partitioned, each partition too small to
trigger optimize") {
@@ -279,8 +292,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
}
test("test mergetree optimize with optimize.minFileSize and
optimize.maxFileSize") {
- withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "838000") {
- // 3 from 37 parts are larger than this, so after optimize there should
be 4 parts:
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> "838018") {
+ // 3 of 37 parts are >= 838,018, so after optimizing there should be 4
parts:
// 3 original parts and 1 merged part
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_optimize_p5;
@@ -310,12 +323,12 @@ class GlutenClickHouseMergeTreeOptimizeSuite
}
withSQLConf(
- "spark.databricks.delta.optimize.maxFileSize" -> "10000000",
- "spark.databricks.delta.optimize.minFileSize" -> "838250") {
- // of the remaing 3 original parts, 2 are less than 838250, 1 is larger
(size 838255)
- // the merged part is ~27MB, so after optimize there should be 3 parts:
+ DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> "10000000",
+ DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> "838291") {
+ // of the remaining 3 original parts, 2 are < 838,291, 1 is larger (size
838,306)
+ // the merged part is ~27MB, so after optimizing there should be 3 parts:
// 1 merged part from 2 original parts, 1 merged part from 34 original
parts
- // and 1 original part (size 838255)
+ // and 1 original part (size 838,306)
with_ut_conf(spark.sql("optimize lineitem_mergetree_optimize_p5"))
@@ -381,7 +394,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
test("test skip index after optimize") {
withSQLConf(
- "spark.databricks.delta.optimize.maxFileSize" -> "2000000",
+ DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> "2000000",
"spark.sql.adaptive.enabled" -> "false") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_index;
@@ -424,8 +437,8 @@ class GlutenClickHouseMergeTreeOptimizeSuite
test("test mergetree optimize with the path based table") {
val dataPath = s"$basePath/lineitem_mergetree_optimize_path_based"
clearDataPath(dataPath)
- withSQLConf("spark.databricks.delta.optimize.minFileSize" -> "838000") {
- // 3 from 37 parts are larger than this, so after optimize there should
be 4 parts:
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> "838018") {
+ // 3 of 37 parts are >= 838,018, so after optimizing there should be 4
parts:
// 3 original parts and 1 merged part
val sourceDF = spark.sql(s"""
@@ -453,12 +466,12 @@ class GlutenClickHouseMergeTreeOptimizeSuite
}
withSQLConf(
- "spark.databricks.delta.optimize.maxFileSize" -> "10000000",
- "spark.databricks.delta.optimize.minFileSize" -> "838250") {
- // of the remaing 3 original parts, 2 are less than 838250, 1 is larger
(size 838255)
- // the merged part is ~27MB, so after optimize there should be 3 parts:
+ DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> "10000000",
+ DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> "838291") {
+ // of the remaining 3 original parts, 2 are < 838,291, 1 is larger (size
838,306)
+ // the merged part is ~27MB, so after optimizing there should be 3 parts:
// 1 merged part from 2 original parts, 1 merged part from 34 original
parts
- // and 1 original part (size 838255)
+ // and 1 original part (size 838,306)
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
with_ut_conf(clickhouseTable.optimize().executeCompaction())
@@ -493,7 +506,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
test("test mergetree insert with optimize basic") {
withSQLConf(
- "spark.databricks.delta.optimize.minFileSize" -> "200000000",
+ DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> "200000000",
CHConfig.runtimeSettings("mergetree.merge_after_insert") -> "true"
) {
spark.sql(s"""
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 88ff8be073..4c46a577ef 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20250421
-CH_COMMIT=646533f3001
+CH_BRANCH=rebase_ch/20250424
+CH_COMMIT=e4cf15f59c5
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]