This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f24703d0b8 [GLUTEN-9697][CH] Add 'reorg' command ut for the mergetree 
+ delta dv (#9699)
f24703d0b8 is described below

commit f24703d0b82b0ac0460d72f375122ae2d404659f
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed May 21 09:10:20 2025 +0800

    [GLUTEN-9697][CH] Add 'reorg' command ut for the mergetree + delta dv 
(#9699)
    
    [CH] Add 'reorg' command ut for the mergetree + delta dv
    
    Close #9697.
---
 .../GlutenDeltaMergeTreeDeletionVectorSuite.scala  | 108 ++++++++++++++++++++-
 1 file changed, 107 insertions(+), 1 deletion(-)

diff --git 
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergeTreeDeletionVectorSuite.scala
 
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergeTreeDeletionVectorSuite.scala
index ba0d8824f1..e970839d42 100644
--- 
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergeTreeDeletionVectorSuite.scala
+++ 
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergeTreeDeletionVectorSuite.scala
@@ -16,9 +16,12 @@
  */
 package org.apache.spark.gluten.delta
 
-import org.apache.gluten.execution.CreateMergeTreeSuite
+import org.apache.gluten.backendsapi.clickhouse.CHConfig
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.{CreateMergeTreeSuite, 
FileSourceScanExecTransformer}
 
 import org.apache.spark.SparkConf
+import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
 
 // Some sqls' line length exceeds 100
 // scalastyle:off line.size.limit
@@ -35,6 +38,8 @@ class GlutenDeltaMergeTreeDeletionVectorSuite extends 
CreateMergeTreeSuite {
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.sql.storeAssignmentPolicy", "legacy")
       .set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
+      .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
+      .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
   }
 
   test("Gluten-9334: column `_tmp_metadata_row_index` and `file_path` not 
found") {
@@ -160,5 +165,106 @@ class GlutenDeltaMergeTreeDeletionVectorSuite extends 
CreateMergeTreeSuite {
     )
     checkFallbackOperators(df, 0)
   }
+
+  test("Gluten-9697: Add 'reorg' command ut for the mergetree + delta dv") {
+    val tableName = "mergetree_delta_dv_reorg"
+    withTable(tableName) {
+      withTempDir {
+        dirName =>
+          val s = createTableBuilder(tableName, "clickhouse", 
s"$dirName/$tableName")
+            .withProps(Map("delta.enableDeletionVectors" -> "'true'"))
+            .withTableKey("lineitem")
+            .build()
+          spark.sql(s)
+
+          spark.sql(s"""
+                       |insert into table $tableName
+                       |select /*+ REPARTITION(6) */ * from lineitem
+                       |""".stripMargin)
+
+          spark.sql(s"""
+                       |delete from $tableName
+                       |where mod(l_orderkey, 3) = 2
+                       |""".stripMargin)
+
+          var df = spark.sql(s"""
+                                | select sum(l_linenumber) from $tableName
+                                |""".stripMargin)
+          var result = df.collect()
+          assert(
+            result(0).get(0) === 1200671
+          )
+          checkFallbackOperators(df, 0)
+
+          spark.sql(s"""
+                       | REORG TABLE $tableName APPLY (PURGE)
+                       |""".stripMargin)
+          df = spark.sql(s"""
+                            | select sum(l_linenumber) from $tableName
+                            |""".stripMargin)
+          result = df.collect()
+          assert(
+            result(0).get(0) === 1200671
+          )
+          val scanExec = collect(df.queryExecution.executedPlan) {
+            case f: FileSourceScanExecTransformer => f
+          }
+          val parquetScan = scanExec.head
+          val fileIndex = 
parquetScan.relation.location.asInstanceOf[PreparedDeltaFileIndex]
+          val addFiles = fileIndex.preparedScan.files
+          assert(addFiles.size === 1)
+          assert(addFiles(0).deletionVector === null)
+      }
+    }
+  }
+
+  test("Gluten-9697: Add 'reorg' command ut for the mergetree + delta dv + 
partition") {
+    val tableName = "mergetree_delta_dv_reorg_partition"
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS $tableName;
+                 |""".stripMargin)
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS $tableName
+                 |(${table2columns.get("lineitem").get(true)})
+                 |USING clickhouse
+                 |PARTITIONED BY (l_returnflag)
+                 |TBLPROPERTIES (delta.enableDeletionVectors='true')
+                 |LOCATION '$dataHome/$tableName'
+                 |""".stripMargin)
+    spark.sql(s"""
+                 |insert into table $tableName
+                 | select /*+ REPARTITION(6) */ * from lineitem
+                 |""".stripMargin)
+    spark.sql(s"""
+                 |delete from $tableName
+                 | where mod(l_orderkey, 3) = 1
+                 |""".stripMargin)
+    var df = spark.sql(s"""
+                          |select sum(l_linenumber) from $tableName
+                          |""".stripMargin)
+    var result = df.collect()
+    assert(
+      result(0).get(0) === 1201486
+    )
+    checkFallbackOperators(df, 0)
+    spark.sql(s"""
+                 |REORG TABLE $tableName APPLY (PURGE)
+                 |""".stripMargin)
+    df = spark.sql(s"""
+                      |select sum(l_linenumber) from $tableName
+                      |""".stripMargin)
+    result = df.collect()
+    assert(
+      result(0).get(0) === 1201486
+    )
+    val scanExec = collect(df.queryExecution.executedPlan) {
+      case f: FileSourceScanExecTransformer => f
+    }
+    val parquetScan = scanExec.head
+    val fileIndex = 
parquetScan.relation.location.asInstanceOf[PreparedDeltaFileIndex]
+    val addFiles = fileIndex.preparedScan.files
+    assert(addFiles.size === 3)
+    assert(addFiles.forall(_.deletionVector === null))
+  }
 }
 // scalastyle:off line.size.limit


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to