This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ca35e47df [GLUTEN-6378][CH] Support delta count optimizer for the 
MergeTree format (#6379)
ca35e47df is described below

commit ca35e47df09943e0a794d6a9963f90394bb490fc
Author: Zhichao Zhang <[email protected]>
AuthorDate: Tue Jul 9 23:06:22 2024 +0800

    [GLUTEN-6378][CH] Support delta count optimizer for the MergeTree format 
(#6379)
    
    * [GLUTEN-6378][CH] Support delta count optimizer for the MergeTree format
    
    Support delta count optimizer for the MergeTree format:
    
    In Delta, it will use the rule PrepareDeltaScan to optimize the count 
command, which will directly use the delta meta to response the sql select 
count(*) from table, now support this optimizer for the MergeTree format.
    
    Close #6378.
    
    * fix ut
---
 .../v2/clickhouse/metadata/AddFileTags.scala       |  10 +-
 ...tenClickHouseMergeTreePathBasedWriteSuite.scala |  30 ++++++
 .../GlutenClickHouseMergeTreeWriteSuite.scala      | 113 +++++++++++----------
 .../GlutenClickHouseTPCHBucketSuite.scala          |   9 +-
 4 files changed, 107 insertions(+), 55 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
index 0680663eb..8acc23aec 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
@@ -144,7 +144,15 @@ object AddFileTags {
       "dirName" -> dirName,
       "marks" -> marks.toString
     )
-    AddFile(name, partitionValues, bytesOnDisk, modificationTime, dataChange, 
stats, tags)
+    val mapper: ObjectMapper = new ObjectMapper()
+    val rootNode = mapper.createObjectNode()
+    rootNode.put("numRecords", rows)
+    rootNode.put("minValues", "")
+    rootNode.put("maxValues", "")
+    rootNode.put("nullCount", "")
+    // Add the `stats` into delta meta log
+    val metricsStats = mapper.writeValueAsString(rootNode)
+    AddFile(name, partitionValues, bytesOnDisk, modificationTime, dataChange, 
metricsStats, tags)
   }
 
   def addFileToAddMergeTreeParts(addFile: AddFile): AddMergeTreeParts = {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index 79d663deb..6c3d7dea0 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -20,6 +20,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
 import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.execution.LocalTableScanExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 import org.apache.spark.sql.functions._
@@ -1305,5 +1306,34 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
     dataFileList = dataPathFile.list(fileFilter)
     assertResult(6)(dataFileList.length)
   }
+
+  test("GLUTEN-6378: Support delta count optimizer for the MergeTree format") {
+    val dataPath = s"$basePath/lineitem_mergetree_count_opti"
+    clearDataPath(dataPath)
+
+    val sourceDF = spark.sql(s"""
+                                |select * from lineitem
+                                |""".stripMargin)
+
+    sourceDF.write
+      .format("clickhouse")
+      .partitionBy("l_shipdate", "l_returnflag")
+      .option("clickhouse.orderByKey", "l_orderkey")
+      .option("clickhouse.primaryKey", "l_orderkey")
+      .mode(SaveMode.Append)
+      .save(dataPath)
+
+    val df = spark.read
+      .format("clickhouse")
+      .load(dataPath)
+      .groupBy()
+      .count()
+    val result = df.collect()
+    assertResult(600572)(result(0).getLong(0))
+    // Spark 3.2 + Delta 2.0 does not support this feature
+    if (!sparkVersion.equals("3.2")) {
+      assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
+    }
+  }
 }
 // scalastyle:off line.size.limit
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index 27bd4372a..e88eb1fed 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -20,6 +20,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
 import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.execution.LocalTableScanExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 
@@ -357,34 +358,10 @@ class GlutenClickHouseMergeTreeWriteSuite
            |""".stripMargin
 
       val df = spark.sql(sql1)
-      val result = df.collect()
       assertResult(1)(
         // in test data, there are only 1 row with l_orderkey = 12647
-        result.apply(0).get(0)
+        df.collect().apply(0).get(0)
       )
-      val scanExec = collect(df.queryExecution.executedPlan) {
-        case f: FileSourceScanExecTransformer => f
-      }
-      assertResult(1)(scanExec.size)
-
-      val mergetreeScan = scanExec.head
-      assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
-
-      val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
-      
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-      
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
-      
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
-      
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
-      
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
-      val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-      assertResult(600572)(addFiles.map(_.rows).sum)
-
-      // 4 parts belong to the first batch
-      // 2 parts belong to the second batch (1 actual updated part, 1 
passively updated).
-      assertResult(6)(addFiles.size)
-      val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, 
name.lastIndexOf("_")))
-      assertResult(2)(filePaths.size)
-      assertResult(Array(2, 4))(filePaths.values.map(paths => 
paths.size).toArray.sorted)
     }
 
     val sql2 =
@@ -439,22 +416,9 @@ class GlutenClickHouseMergeTreeWriteSuite
       val df = spark.sql(s"""
                             | select count(*) from lineitem_mergetree_delete
                             |""".stripMargin)
-      val result = df.collect()
       assertResult(600571)(
-        result.apply(0).get(0)
+        df.collect().apply(0).get(0)
       )
-      val scanExec = collect(df.queryExecution.executedPlan) {
-        case f: FileSourceScanExecTransformer => f
-      }
-      val mergetreeScan = scanExec.head
-      val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
-      val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-      // 4 parts belong to the first batch
-      // 2 parts belong to the second batch (1 actual updated part, 1 
passively updated).
-      assertResult(6)(addFiles.size)
-      val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, 
name.lastIndexOf("_")))
-      assertResult(2)(filePaths.size)
-      assertResult(Array(2, 4))(filePaths.values.map(paths => 
paths.size).toArray.sorted)
     }
 
     {
@@ -1491,19 +1455,6 @@ class GlutenClickHouseMergeTreeWriteSuite
         val result = df.collect()
         assertResult(1)(result.length)
         assertResult(10)(result(0).getLong(0))
-
-        val scanExec = collect(df.queryExecution.executedPlan) {
-          case f: FileSourceScanExecTransformer => f
-        }
-        assertResult(1)(scanExec.size)
-
-        val mergetreeScan = scanExec.head
-        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
-
-        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
-        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-        assertResult(1)(addFiles.size)
-        assertResult(10)(addFiles.head.rows)
       })
   }
 
@@ -1962,5 +1913,63 @@ class GlutenClickHouseMergeTreeWriteSuite
         }
       })
   }
+
+  test("GLUTEN-6378: Support delta count optimizer for the MergeTree format") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_count_opti;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_count_opti
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |PARTITIONED BY (l_shipdate, l_returnflag)
+                 |TBLPROPERTIES (orderByKey='l_orderkey',
+                 |               primaryKey='l_orderkey')
+                 |LOCATION '$basePath/lineitem_mergetree_count_opti'
+                 |""".stripMargin)
+
+    // dynamic partitions
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_count_opti
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_count_opti
+         |""".stripMargin
+    runSql(sqlStr)(
+      df => {
+        val result = df.collect()
+        assertResult(1)(result.length)
+        assertResult("600572")(result(0).getLong(0).toString)
+
+        // Spark 3.2 + Delta 2.0 does not support this feature
+        if (!sparkVersion.equals("3.2")) {
+          
assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
+        }
+      })
+  }
 }
 // scalastyle:off line.size.limit
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
index 79a708ce5..59912e722 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
@@ -739,8 +739,13 @@ class GlutenClickHouseTPCHBucketSuite
     runSql(SQL6)(
       df => {
         checkResult(df, Array(Row(600572)))
-        // there is a shuffle between two phase hash aggregates.
-        checkHashAggregateCount(df, 2)
+        if (sparkVersion.equals("3.2")) {
+          // there is a shuffle between two phase hash aggregate.
+          checkHashAggregateCount(df, 2)
+        } else {
+          // the delta will use the delta log meta to response this sql
+          checkHashAggregateCount(df, 0)
+        }
       })
 
     // test sort aggregates


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to