This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ca35e47df [GLUTEN-6378][CH] Support delta count optimizer for the
MergeTree format (#6379)
ca35e47df is described below
commit ca35e47df09943e0a794d6a9963f90394bb490fc
Author: Zhichao Zhang <[email protected]>
AuthorDate: Tue Jul 9 23:06:22 2024 +0800
[GLUTEN-6378][CH] Support delta count optimizer for the MergeTree format
(#6379)
* [GLUTEN-6378][CH] Support delta count optimizer for the MergeTree format
Support delta count optimizer for the MergeTree format:
In Delta, it will use the rule PrepareDeltaScan to optimize the count
command, which will directly use the delta meta to response the sql select
count(*) from table, now support this optimizer for the MergeTree format.
Close #6378.
* fix ut
---
.../v2/clickhouse/metadata/AddFileTags.scala | 10 +-
...tenClickHouseMergeTreePathBasedWriteSuite.scala | 30 ++++++
.../GlutenClickHouseMergeTreeWriteSuite.scala | 113 +++++++++++----------
.../GlutenClickHouseTPCHBucketSuite.scala | 9 +-
4 files changed, 107 insertions(+), 55 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
index 0680663eb..8acc23aec 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
@@ -144,7 +144,15 @@ object AddFileTags {
"dirName" -> dirName,
"marks" -> marks.toString
)
- AddFile(name, partitionValues, bytesOnDisk, modificationTime, dataChange,
stats, tags)
+ val mapper: ObjectMapper = new ObjectMapper()
+ val rootNode = mapper.createObjectNode()
+ rootNode.put("numRecords", rows)
+ rootNode.put("minValues", "")
+ rootNode.put("maxValues", "")
+ rootNode.put("nullCount", "")
+ // Add the `stats` into delta meta log
+ val metricsStats = mapper.writeValueAsString(rootNode)
+ AddFile(name, partitionValues, bytesOnDisk, modificationTime, dataChange,
metricsStats, tags)
}
def addFileToAddMergeTreeParts(addFile: AddFile): AddMergeTreeParts = {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index 79d663deb..6c3d7dea0 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -20,6 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.functions._
@@ -1305,5 +1306,34 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
dataFileList = dataPathFile.list(fileFilter)
assertResult(6)(dataFileList.length)
}
+
+ test("GLUTEN-6378: Support delta count optimizer for the MergeTree format") {
+ val dataPath = s"$basePath/lineitem_mergetree_count_opti"
+ clearDataPath(dataPath)
+
+ val sourceDF = spark.sql(s"""
+ |select * from lineitem
+ |""".stripMargin)
+
+ sourceDF.write
+ .format("clickhouse")
+ .partitionBy("l_shipdate", "l_returnflag")
+ .option("clickhouse.orderByKey", "l_orderkey")
+ .option("clickhouse.primaryKey", "l_orderkey")
+ .mode(SaveMode.Append)
+ .save(dataPath)
+
+ val df = spark.read
+ .format("clickhouse")
+ .load(dataPath)
+ .groupBy()
+ .count()
+ val result = df.collect()
+ assertResult(600572)(result(0).getLong(0))
+ // Spark 3.2 + Delta 2.0 does not support this feature
+ if (!sparkVersion.equals("3.2")) {
+ assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
+ }
+ }
}
// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index 27bd4372a..e88eb1fed 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -20,6 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
@@ -357,34 +358,10 @@ class GlutenClickHouseMergeTreeWriteSuite
|""".stripMargin
val df = spark.sql(sql1)
- val result = df.collect()
assertResult(1)(
// in test data, there are only 1 row with l_orderkey = 12647
- result.apply(0).get(0)
+ df.collect().apply(0).get(0)
)
- val scanExec = collect(df.queryExecution.executedPlan) {
- case f: FileSourceScanExecTransformer => f
- }
- assertResult(1)(scanExec.size)
-
- val mergetreeScan = scanExec.head
- assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
-
- val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
- val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assertResult(600572)(addFiles.map(_.rows).sum)
-
- // 4 parts belong to the first batch
- // 2 parts belong to the second batch (1 actual updated part, 1
passively updated).
- assertResult(6)(addFiles.size)
- val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0,
name.lastIndexOf("_")))
- assertResult(2)(filePaths.size)
- assertResult(Array(2, 4))(filePaths.values.map(paths =>
paths.size).toArray.sorted)
}
val sql2 =
@@ -439,22 +416,9 @@ class GlutenClickHouseMergeTreeWriteSuite
val df = spark.sql(s"""
| select count(*) from lineitem_mergetree_delete
|""".stripMargin)
- val result = df.collect()
assertResult(600571)(
- result.apply(0).get(0)
+ df.collect().apply(0).get(0)
)
- val scanExec = collect(df.queryExecution.executedPlan) {
- case f: FileSourceScanExecTransformer => f
- }
- val mergetreeScan = scanExec.head
- val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
- val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- // 4 parts belong to the first batch
- // 2 parts belong to the second batch (1 actual updated part, 1
passively updated).
- assertResult(6)(addFiles.size)
- val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0,
name.lastIndexOf("_")))
- assertResult(2)(filePaths.size)
- assertResult(Array(2, 4))(filePaths.values.map(paths =>
paths.size).toArray.sorted)
}
{
@@ -1491,19 +1455,6 @@ class GlutenClickHouseMergeTreeWriteSuite
val result = df.collect()
assertResult(1)(result.length)
assertResult(10)(result(0).getLong(0))
-
- val scanExec = collect(df.queryExecution.executedPlan) {
- case f: FileSourceScanExecTransformer => f
- }
- assertResult(1)(scanExec.size)
-
- val mergetreeScan = scanExec.head
- assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
-
- val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
- val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assertResult(1)(addFiles.size)
- assertResult(10)(addFiles.head.rows)
})
}
@@ -1962,5 +1913,63 @@ class GlutenClickHouseMergeTreeWriteSuite
}
})
}
+
+ test("GLUTEN-6378: Support delta count optimizer for the MergeTree format") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_count_opti;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_count_opti
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_shipdate, l_returnflag)
+ |TBLPROPERTIES (orderByKey='l_orderkey',
+ | primaryKey='l_orderkey')
+ |LOCATION '$basePath/lineitem_mergetree_count_opti'
+ |""".stripMargin)
+
+ // dynamic partitions
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_count_opti
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_count_opti
+ |""".stripMargin
+ runSql(sqlStr)(
+ df => {
+ val result = df.collect()
+ assertResult(1)(result.length)
+ assertResult("600572")(result(0).getLong(0).toString)
+
+ // Spark 3.2 + Delta 2.0 does not support this feature
+ if (!sparkVersion.equals("3.2")) {
+
assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
+ }
+ })
+ }
}
// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
index 79a708ce5..59912e722 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
@@ -739,8 +739,13 @@ class GlutenClickHouseTPCHBucketSuite
runSql(SQL6)(
df => {
checkResult(df, Array(Row(600572)))
- // there is a shuffle between two phase hash aggregates.
- checkHashAggregateCount(df, 2)
+ if (sparkVersion.equals("3.2")) {
+ // there is a shuffle between two phase hash aggregate.
+ checkHashAggregateCount(df, 2)
+ } else {
+ // the delta will use the delta log meta to response this sql
+ checkHashAggregateCount(df, 0)
+ }
})
// test sort aggregates
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]