This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 79e1d588d [GLUTEN-1632][CH]Daily Update Clickhouse Version (20240620)
(#6150)
79e1d588d is described below
commit 79e1d588dcad5a655c2e5d363d18f8d695ef1cee
Author: Kyligence Git <[email protected]>
AuthorDate: Thu Jun 20 03:04:25 2024 -0500
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20240620) (#6150)
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20240620)
* Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/61047
* fix style
* Using assertResult instead of assert, so we can know the actual result
once failed.
---------
Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
---
...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 111 +++++++++---------
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 127 ++++++++++-----------
cpp-ch/clickhouse.version | 4 +-
.../Storages/Mergetree/SparkMergeTreeWriter.cpp | 32 +++---
.../Storages/Mergetree/SparkMergeTreeWriter.h | 13 ++-
5 files changed, 141 insertions(+), 146 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 56b8f056b..572d0cd50 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -57,6 +57,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
+ // .set("spark.gluten.sql.columnar.backend.ch.runtime_config.path",
"/data") // for local test
}
override protected def beforeEach(): Unit = {
@@ -139,7 +140,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
@@ -151,8 +152,8 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assert(addFiles.size == 1)
- assert(addFiles.head.rows == 600572)
+ assertResult(1)(addFiles.size)
+ assertResult(600572)(addFiles.head.rows)
}
spark.sql("drop table lineitem_mergetree_hdfs")
}
@@ -224,7 +225,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
@@ -232,24 +233,22 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
- assert(
+ assertResult("l_shipdate,l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
- .mkString(",")
- .equals("l_shipdate,l_orderkey"))
- assert(
+ .mkString(","))
+ assertResult("l_shipdate")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.primaryKeyOption
.get
- .mkString(",")
- .equals("l_shipdate"))
+ .mkString(","))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assert(addFiles.size == 1)
- assert(addFiles.head.rows == 600572)
+ assertResult(1)(addFiles.size)
+ assertResult(600572)(addFiles.head.rows)
}
spark.sql("drop table lineitem_mergetree_orderbykey_hdfs")
}
@@ -386,51 +385,49 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
runTPCHQueryBySQL(1, sqlStr, compareResult = false) {
df =>
val result = df.collect()
- assert(result.length == 4)
- assert(result(0).getString(0).equals("A"))
- assert(result(0).getString(1).equals("F"))
- assert(result(0).getDouble(2) == 7578058.0)
+ assertResult(4)(result.length)
+ assertResult("A")(result(0).getString(0))
+ assertResult("F")(result(0).getString(1))
+ assertResult(7578058.0)(result(0).getDouble(2))
- assert(result(2).getString(0).equals("N"))
- assert(result(2).getString(1).equals("O"))
- assert(result(2).getDouble(2) == 7454519.0)
+ assertResult("N")(result(2).getString(0))
+ assertResult("O")(result(2).getString(1))
+ assertResult(7454519.0)(result(2).getDouble(2))
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
- assert(mergetreeScan.metrics("numFiles").value == 6)
+ assertResult(6)(mergetreeScan.metrics("numFiles").value)
val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
- assert(
+ assertResult("l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
- .mkString(",")
- .equals("l_orderkey"))
- assert(
+ .mkString(","))
+ assertResult("l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.primaryKeyOption
.get
- .mkString(",")
- .equals("l_orderkey"))
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size ==
1)
- assert(
+ .mkString(","))
+
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+ assertResult("l_returnflag")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
- .partitionColumns(0)
- .equals("l_returnflag"))
+ .partitionColumns
+ .head)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assert(addFiles.size == 6)
- assert(addFiles.map(_.rows).sum == 750735)
+ assertResult(6)(addFiles.size)
+ assertResult(750735)(addFiles.map(_.rows).sum)
}
spark.sql("drop table lineitem_mergetree_partition_hdfs")
}
@@ -503,36 +500,35 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
- val mergetreeScan = scanExec(0)
+ val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
if (sparkVersion.equals("3.2")) {
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
} else {
- assert(
+ assertResult("l_partkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
- .mkString(",")
- .equals("l_partkey"))
+ .mkString(","))
}
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size ==
1)
- assert(
+
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+ assertResult("l_returnflag")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
- .partitionColumns(0)
- .equals("l_returnflag"))
+ .partitionColumns
+ .head)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assert(addFiles.size == 12)
- assert(addFiles.map(_.rows).sum == 600572)
+ assertResult(12)(addFiles.size)
+ assertResult(600572)(addFiles.map(_.rows).sum)
}
spark.sql("drop table lineitem_mergetree_bucket_hdfs")
}
@@ -585,39 +581,38 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
- val mergetreeScan = scanExec(0)
+ val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
- assert(
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
+ assertResult("l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
- .mkString(",")
- .equals("l_orderkey"))
+ .mkString(","))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.nonEmpty)
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size ==
1)
- assert(
+
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+ assertResult("l_returnflag")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
- .partitionColumns(0)
- .equals("l_returnflag"))
+ .partitionColumns
+ .head)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assert(addFiles.size == 12)
- assert(addFiles.map(_.rows).sum == 600572)
+ assertResult(12)(addFiles.size)
+ assertResult(600572)(addFiles.map(_.rows).sum)
}
val result = spark.read
.format("clickhouse")
.load(dataPath)
.count()
- assert(result == 600572)
+ assertResult(600572)(result)
}
}
// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index c5dc3a237..30f443265 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -55,6 +55,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
+ // .set("spark.gluten.sql.columnar.backend.ch.runtime_config.path",
"/data") // for local test
}
override protected def beforeEach(): Unit = {
@@ -152,7 +153,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
@@ -164,8 +165,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assert(addFiles.size == 1)
- assert(addFiles.head.rows == 600572)
+ assertResult(1)(addFiles.size)
+ assertResult(600572)(addFiles.head.rows)
}
spark.sql("drop table lineitem_mergetree_s3") // clean up
}
@@ -237,7 +238,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
@@ -245,24 +246,22 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
- assert(
+ assertResult("l_shipdate,l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
- .mkString(",")
- .equals("l_shipdate,l_orderkey"))
- assert(
+ .mkString(","))
+ assertResult("l_shipdate")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.primaryKeyOption
.get
- .mkString(",")
- .equals("l_shipdate"))
+ .mkString(","))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assert(addFiles.size == 1)
- assert(addFiles.head.rows == 600572)
+ assertResult(1)(addFiles.size)
+ assertResult(600572)(addFiles.head.rows)
}
spark.sql("drop table lineitem_mergetree_orderbykey_s3")
}
@@ -399,51 +398,49 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
runTPCHQueryBySQL(1, sqlStr, compareResult = false) {
df =>
val result = df.collect()
- assert(result.length == 4)
- assert(result(0).getString(0).equals("A"))
- assert(result(0).getString(1).equals("F"))
- assert(result(0).getDouble(2) == 7578058.0)
+ assertResult(4)(result.length)
+ assertResult("A")(result(0).getString(0))
+ assertResult("F")(result(0).getString(1))
+ assertResult(7578058.0)(result(0).getDouble(2))
- assert(result(2).getString(0).equals("N"))
- assert(result(2).getString(1).equals("O"))
- assert(result(2).getDouble(2) == 7454519.0)
+ assertResult("N")(result(2).getString(0))
+ assertResult("O")(result(2).getString(1))
+ assertResult(7454519.0)(result(2).getDouble(2))
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
- assert(mergetreeScan.metrics("numFiles").value == 6)
+ assertResult(6)(mergetreeScan.metrics("numFiles").value)
val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
- assert(
+ assertResult("l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
- .mkString(",")
- .equals("l_orderkey"))
- assert(
+ .mkString(","))
+ assertResult("l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.primaryKeyOption
.get
- .mkString(",")
- .equals("l_orderkey"))
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size ==
1)
- assert(
+ .mkString(","))
+
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+ assertResult("l_returnflag")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
- .partitionColumns(0)
- .equals("l_returnflag"))
+ .partitionColumns
+ .head)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assert(addFiles.size == 6)
- assert(addFiles.map(_.rows).sum == 750735)
+ assertResult(6)(addFiles.size)
+ assertResult(750735)(addFiles.map(_.rows).sum)
}
spark.sql("drop table lineitem_mergetree_partition_s3")
@@ -517,36 +514,35 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
- val mergetreeScan = scanExec(0)
+ val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
if (sparkVersion.equals("3.2")) {
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
} else {
- assert(
+ assertResult("l_partkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
- .mkString(",")
- .equals("l_partkey"))
+ .mkString(","))
}
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size ==
1)
- assert(
+
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+ assertResult("l_returnflag")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
- .partitionColumns(0)
- .equals("l_returnflag"))
+ .partitionColumns
+ .head)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assert(addFiles.size == 12)
- assert(addFiles.map(_.rows).sum == 600572)
+ assertResult(12)(addFiles.size)
+ assertResult(600572)(addFiles.map(_.rows).sum)
}
spark.sql("drop table lineitem_mergetree_bucket_s3")
}
@@ -599,39 +595,38 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
- val mergetreeScan = scanExec(0)
+ val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
- assert(
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
+ assertResult("l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
- .mkString(",")
- .equals("l_orderkey"))
+ .mkString(","))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.nonEmpty)
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size ==
1)
- assert(
+
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+ assertResult("l_returnflag")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
- .partitionColumns(0)
- .equals("l_returnflag"))
+ .partitionColumns
+ .head)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assert(addFiles.size == 12)
- assert(addFiles.map(_.rows).sum == 600572)
+ assertResult(12)(addFiles.size)
+ assertResult(600572)(addFiles.map(_.rows).sum)
}
val result = spark.read
.format("clickhouse")
.load(dataPath)
.count()
- assert(result == 600572)
+ assertResult(600572)(result)
}
test("test mergetree insert with optimize basic") {
@@ -639,8 +634,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val dataPath = s"s3a://$BUCKET_NAME/$tableName"
withSQLConf(
- ("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
-
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
-> "true")
+ "spark.databricks.delta.optimize.minFileSize" -> "200000000",
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
-> "true"
) {
spark.sql(s"""
|DROP TABLE IF EXISTS $tableName;
@@ -654,7 +649,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
|""".stripMargin)
val ret = spark.sql(s"select count(*) from $tableName").collect()
- assert(ret.apply(0).get(0) == 600572)
+ assertResult(600572)(ret.apply(0).get(0))
assert(
!new
File(s"$CH_DEFAULT_STORAGE_DIR/lineitem_mergetree_insert_optimize_basic").exists())
}
@@ -713,22 +708,22 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
|""".stripMargin
withSQLConf(
-
("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index"
-> "true")) {
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index"
-> "true") {
runTPCHQueryBySQL(6, sqlStr) {
df =>
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
- assert(scanExec.size == 1)
+ assertResult(1)(scanExec.size)
- val mergetreeScan = scanExec(0)
+ val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
val plans = collect(df.queryExecution.executedPlan) {
case scanExec: BasicScanExecTransformer => scanExec
}
- assert(plans.size == 1)
- assert(plans(0).getSplitInfos.size == 1)
+ assertResult(1)(plans.size)
+ assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 2bbb29453..1e3ac8d88 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20240616
-CH_COMMIT=803ee50cdb9fd56a5d77c710da1cbd071a74d1da
+CH_BRANCH=rebase_ch/20240620
+CH_COMMIT=f9c3886a767
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index 259af5698..c1f2391a2 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -87,8 +87,7 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
metadata_snapshot = storage->getInMemoryMetadataPtr();
header = metadata_snapshot->getSampleBlock();
const DB::Settings & settings = context->getSettingsRef();
- squashing_transform
- =
std::make_unique<DB::SquashingTransform>(settings.min_insert_block_size_rows,
settings.min_insert_block_size_bytes);
+ squashing = std::make_unique<DB::Squashing>(header,
settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
if (!partition_dir.empty())
extractPartitionValues(partition_dir, partition_values);
@@ -105,25 +104,33 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
merge_limit_parts = limit_cnt_field.get<Int64>() <= 0 ?
merge_limit_parts : limit_cnt_field.get<Int64>();
}
-void SparkMergeTreeWriter::write(DB::Block & block)
+void SparkMergeTreeWriter::write(const DB::Block & block)
{
auto new_block = removeColumnSuffix(block);
if (auto converter = ActionsDAG::makeConvertingActions(
new_block.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(), DB::ActionsDAG::MatchColumnsMode::Position))
ExpressionActions(converter).execute(new_block);
- if (auto add_block = squashing_transform->add(new_block))
+ bool has_part = chunkToPart(squashing->add({new_block.getColumns(),
new_block.rows()}));
+
+ if (has_part && merge_after_insert)
+ checkAndMerge();
+}
+
+bool SparkMergeTreeWriter::chunkToPart(Chunk && chunk)
+{
+ if (chunk.hasChunkInfo())
{
- bool has_part = blockToPart(add_block);
- if (has_part && merge_after_insert)
- checkAndMerge();
+ Chunk squash_chunk = DB::Squashing::squash(std::move(chunk));
+ Block result = header.cloneWithColumns(squash_chunk.getColumns());
+ return blockToPart(result);
}
+ return false;
}
bool SparkMergeTreeWriter::blockToPart(Block & block)
{
- auto blocks_with_partition
- = MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10,
metadata_snapshot, context);
+ auto blocks_with_partition =
MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10,
metadata_snapshot, context);
if (blocks_with_partition.empty())
return false;
@@ -180,12 +187,7 @@ void SparkMergeTreeWriter::manualFreeMemory(size_t
before_write_memory)
void SparkMergeTreeWriter::finalize()
{
- if (auto block = squashing_transform->add({}))
- {
- if (block.rows())
- blockToPart(block);
- }
-
+ chunkToPart(squashing->flush());
if (merge_after_insert)
finalizeMerge();
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
index 5c4b66403..2b07521ed 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
@@ -17,7 +17,7 @@
#pragma once
#include <Interpreters/Context.h>
-#include <Interpreters/SquashingTransform.h>
+#include <Interpreters/Squashing.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/StorageMergeTreeFactory.h>
@@ -59,13 +59,15 @@ public:
const String & partition_dir_ = "",
const String & bucket_dir_ = "");
- void write(DB::Block & block);
+ void write(const DB::Block & block);
void finalize();
std::vector<PartInfo> getAllPartInfo();
private:
- void
- writeTempPart(MergeTreeDataWriter::TemporaryPart & temp_part,
DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr &
metadata_snapshot);
+ void writeTempPart(
+ MergeTreeDataWriter::TemporaryPart & temp_part,
+ DB::BlockWithPartition & block_with_partition,
+ const DB::StorageMetadataPtr & metadata_snapshot);
DB::MergeTreeDataWriter::TemporaryPart
writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition,
const DB::StorageMetadataPtr & metadata_snapshot);
void checkAndMerge(bool force = false);
@@ -75,6 +77,7 @@ private:
void saveMetadata();
void commitPartToRemoteStorageIfNeeded();
void finalizeMerge();
+ bool chunkToPart(Chunk && chunk);
bool blockToPart(Block & block);
CustomStorageMergeTreePtr storage = nullptr;
@@ -87,7 +90,7 @@ private:
String bucket_dir;
DB::ContextPtr context;
- std::unique_ptr<DB::SquashingTransform> squashing_transform;
+ std::unique_ptr<DB::Squashing> squashing;
int part_num = 1;
ConcurrentDeque<DB::MergeTreeDataPartPtr> new_parts;
std::unordered_map<String, String> partition_values;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]