This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f24703d0b8 [GLUTEN-9697][CH] Add 'reorg' command ut for the mergetree
+ delta dv (#9699)
f24703d0b8 is described below
commit f24703d0b82b0ac0460d72f375122ae2d404659f
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed May 21 09:10:20 2025 +0800
[GLUTEN-9697][CH] Add 'reorg' command ut for the mergetree + delta dv
(#9699)
[CH] Add 'reorg' command ut for the mergetree + delta dv
Close #9697.
---
.../GlutenDeltaMergeTreeDeletionVectorSuite.scala | 108 ++++++++++++++++++++-
1 file changed, 107 insertions(+), 1 deletion(-)
diff --git
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergeTreeDeletionVectorSuite.scala
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergeTreeDeletionVectorSuite.scala
index ba0d8824f1..e970839d42 100644
---
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergeTreeDeletionVectorSuite.scala
+++
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergeTreeDeletionVectorSuite.scala
@@ -16,9 +16,12 @@
*/
package org.apache.spark.gluten.delta
-import org.apache.gluten.execution.CreateMergeTreeSuite
+import org.apache.gluten.backendsapi.clickhouse.CHConfig
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.{CreateMergeTreeSuite,
FileSourceScanExecTransformer}
import org.apache.spark.SparkConf
+import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit
@@ -35,6 +38,8 @@ class GlutenDeltaMergeTreeDeletionVectorSuite extends
CreateMergeTreeSuite {
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.storeAssignmentPolicy", "legacy")
.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
+ .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
+ .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
}
test("Gluten-9334: column `_tmp_metadata_row_index` and `file_path` not
found") {
@@ -160,5 +165,106 @@ class GlutenDeltaMergeTreeDeletionVectorSuite extends
CreateMergeTreeSuite {
)
checkFallbackOperators(df, 0)
}
+
+ test("Gluten-9697: Add 'reorg' command ut for the mergetree + delta dv") {
+ val tableName = "mergetree_delta_dv_reorg"
+ withTable(tableName) {
+ withTempDir {
+ dirName =>
+ val s = createTableBuilder(tableName, "clickhouse",
s"$dirName/$tableName")
+ .withProps(Map("delta.enableDeletionVectors" -> "'true'"))
+ .withTableKey("lineitem")
+ .build()
+ spark.sql(s)
+
+ spark.sql(s"""
+ |insert into table $tableName
+ |select /*+ REPARTITION(6) */ * from lineitem
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |delete from $tableName
+ |where mod(l_orderkey, 3) = 2
+ |""".stripMargin)
+
+ var df = spark.sql(s"""
+ | select sum(l_linenumber) from $tableName
+ |""".stripMargin)
+ var result = df.collect()
+ assert(
+ result(0).get(0) === 1200671
+ )
+ checkFallbackOperators(df, 0)
+
+ spark.sql(s"""
+ | REORG TABLE $tableName APPLY (PURGE)
+ |""".stripMargin)
+ df = spark.sql(s"""
+ | select sum(l_linenumber) from $tableName
+ |""".stripMargin)
+ result = df.collect()
+ assert(
+ result(0).get(0) === 1200671
+ )
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ val parquetScan = scanExec.head
+ val fileIndex =
parquetScan.relation.location.asInstanceOf[PreparedDeltaFileIndex]
+ val addFiles = fileIndex.preparedScan.files
+ assert(addFiles.size === 1)
+ assert(addFiles(0).deletionVector === null)
+ }
+ }
+ }
+
+ test("Gluten-9697: Add 'reorg' command ut for the mergetree + delta dv +
partition") {
+ val tableName = "mergetree_delta_dv_reorg_partition"
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS $tableName;
+ |""".stripMargin)
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS $tableName
+ |(${table2columns.get("lineitem").get(true)})
+ |USING clickhouse
+ |PARTITIONED BY (l_returnflag)
+ |TBLPROPERTIES (delta.enableDeletionVectors='true')
+ |LOCATION '$dataHome/$tableName'
+ |""".stripMargin)
+ spark.sql(s"""
+ |insert into table $tableName
+ | select /*+ REPARTITION(6) */ * from lineitem
+ |""".stripMargin)
+ spark.sql(s"""
+ |delete from $tableName
+ | where mod(l_orderkey, 3) = 1
+ |""".stripMargin)
+ var df = spark.sql(s"""
+ |select sum(l_linenumber) from $tableName
+ |""".stripMargin)
+ var result = df.collect()
+ assert(
+ result(0).get(0) === 1201486
+ )
+ checkFallbackOperators(df, 0)
+ spark.sql(s"""
+ |REORG TABLE $tableName APPLY (PURGE)
+ |""".stripMargin)
+ df = spark.sql(s"""
+ |select sum(l_linenumber) from $tableName
+ |""".stripMargin)
+ result = df.collect()
+ assert(
+ result(0).get(0) === 1201486
+ )
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ val parquetScan = scanExec.head
+ val fileIndex =
parquetScan.relation.location.asInstanceOf[PreparedDeltaFileIndex]
+ val addFiles = fileIndex.preparedScan.files
+ assert(addFiles.size === 3)
+ assert(addFiles.forall(_.deletionVector === null))
+ }
}
// scalastyle:off line.size.limit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]