This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b8da065e6 [VL] Support generating DV-enabled TPC-DS Delta table in 
gluten-it (#10639)
4b8da065e6 is described below

commit 4b8da065e69966d8321058222e381378218728af
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Sep 9 18:38:30 2025 +0200

    [VL] Support generating DV-enabled TPC-DS Delta table in gluten-it (#10639)
---
 .../org/apache/gluten/integration/BaseMixin.java   | 10 ++++
 .../org/apache/gluten/integration/DataGen.scala    | 27 +++++++++-
 .../integration/clickbench/ClickBenchDataGen.scala |  2 +-
 .../gluten/integration/ds/TpcdsDataGen.scala       | 29 ++++++++---
 .../integration/ds/TpcdsDataGenFeatures.scala      | 58 ++++++++++++++++++++++
 .../apache/gluten/integration/ds/TpcdsSuite.scala  |  9 ++--
 .../apache/gluten/integration/h/TpchDataGen.scala  | 17 +++++--
 .../apache/gluten/integration/h/TpchSuite.scala    |  8 ++-
 8 files changed, 142 insertions(+), 18 deletions(-)

diff --git 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
index 0240bd408f..88ccf9c512 100644
--- 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
+++ 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
@@ -29,6 +29,7 @@ import org.apache.log4j.LogManager;
 import org.apache.spark.SparkConf;
 import picocli.CommandLine;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -91,6 +92,13 @@ public class BaseMixin {
       defaultValue = "false")
   private boolean genPartitionedData;
 
+  @CommandLine.Option(
+      names = {"--data-gen-features"},
+      description =
+          "Set a comma-separated list of data generator features to enable. 
Example: --data-gen-features=enable_dv,delete_10pc",
+      split = ",")
+  private String[] dataGenFeatures = new String[0];
+
   @CommandLine.Option(
       names = {"--enable-ui"},
       description = "Enable Spark UI",
@@ -207,6 +215,7 @@ public class BaseMixin {
                 dataDir,
                 dataScale,
                 genPartitionedData,
+                
JavaCollectionConverter.asScalaSeq(Arrays.asList(dataGenFeatures)),
                 enableUi,
                 enableHsUi,
                 hsUiPort,
@@ -233,6 +242,7 @@ public class BaseMixin {
                 dataDir,
                 dataScale,
                 genPartitionedData,
+                
JavaCollectionConverter.asScalaSeq(Arrays.asList(dataGenFeatures)),
                 enableUi,
                 enableHsUi,
                 hsUiPort,
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
index e853fdc73b..e722b2767a 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
@@ -16,8 +16,11 @@
  */
 package org.apache.gluten.integration
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
 
+import scala.collection.mutable
+
 trait DataGen {
   def gen(): Unit
 }
@@ -32,7 +35,7 @@ class NoopModifier(t: DataType) extends TypeModifier(_ => 
true, t) {
 }
 
 object DataGen {
-  def getRowModifier(schema: StructType, typeModifiers: List[TypeModifier]): 
Int => TypeModifier = {
+  def getRowModifier(schema: StructType, typeModifiers: Seq[TypeModifier]): 
Int => TypeModifier = {
     val modifiers = schema.fields.map {
       f =>
         val matchedModifiers = typeModifiers.flatMap {
@@ -65,4 +68,26 @@ object DataGen {
     })
     modifiedSchema
   }
+
+  trait Feature extends Serializable {
+    def name(): String
+    def run(spark: SparkSession, source: String)
+  }
+
+  class FeatureRegistry extends Serializable {
+    private val lookup: mutable.LinkedHashMap[String, Feature] = 
mutable.LinkedHashMap()
+
+    def register(feature: Feature): Unit = {
+      require(feature.name().matches("^\\w+$"))
+      require(!lookup.contains(feature.name()))
+      lookup(feature.name()) = feature
+    }
+
+    def getFeature(name: String): Feature = {
+      require(
+        lookup.contains(name),
+        s"No feature found by name: $name, available features: 
${lookup.keys.mkString(", ")}")
+      lookup(name)
+    }
+  }
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
index d6c56e631c..7e75b153ee 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
@@ -27,7 +27,7 @@ import java.io.File
 import scala.language.postfixOps
 import scala.sys.process._
 
-class ClickBenchDataGen(val spark: SparkSession, dir: String) extends DataGen {
+class ClickBenchDataGen(spark: SparkSession, dir: String) extends DataGen {
   import ClickBenchDataGen._
   override def gen(): Unit = {
     println(s"Start to download ClickBench Parquet dataset from URL: 
$DATA_URL... ")
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
index caa5c1adcf..50404228f5 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
@@ -28,16 +28,24 @@ import java.io.File
 import scala.collection.JavaConverters._
 
 class TpcdsDataGen(
-    val spark: SparkSession,
+    spark: SparkSession,
     scale: Double,
     partitions: Int,
     source: String,
     dir: String,
-    typeModifiers: List[TypeModifier] = List(),
-    val genPartitionedData: Boolean)
-  extends Serializable
+    genPartitionedData: Boolean,
+    featureNames: Seq[String],
+    typeModifiers: Seq[TypeModifier]
+) extends Serializable
   with DataGen {
 
+  private val featureRegistry = new DataGen.FeatureRegistry
+
+  featureRegistry.register(TpcdsDataGenFeatures.EnableDeltaDeletionVector)
+  featureRegistry.register(TpcdsDataGenFeatures.DeleteTenPercentData)
+
+  private val features = featureNames.map(featureRegistry.getFeature)
+
   def writeParquetTable(t: Table): Unit = {
     val name = t.getName
     if (name.equals("dbgen_version")) {
@@ -73,6 +81,7 @@ class TpcdsDataGen(
       List[String]()
     } else {
       name match {
+        case "store_sales" => List("ss_sold_date_sk")
         case "catalog_sales" => List("cs_sold_date_sk")
         case "web_sales" => List("ws_sold_date_sk")
         case _ => List[String]()
@@ -94,9 +103,6 @@ class TpcdsDataGen(
     val stringSchema = StructType(modifiedSchema.fields.map(f => 
StructField(f.name, StringType)))
 
     val columns = modifiedSchema.fields.map(f => new 
Column(f.name).cast(f.dataType).as(f.name))
-    // dwrf support was temporarily dropped since it impacts data gen skipping 
strategy.
-    // Better design is required to re-enable it
-    val tablePath = dir + File.separator + tableName
     spark
       .range(0, partitions, 1L, partitions)
       .mapPartitions {
@@ -124,11 +130,18 @@ class TpcdsDataGen(
       .format(source)
       .partitionBy(partitionBy.toArray: _*)
       .mode(SaveMode.Overwrite)
-      .save(dir + File.separator + tableName)
+      .option("path", dir + File.separator + tableName) // storage location
+      .saveAsTable(tableName)
   }
 
   override def gen(): Unit = {
     Table.getBaseTables.forEach(t => writeParquetTable(t))
+
+    features.foreach {
+      feature =>
+        println(s"Execute feature: ${feature.name()}")
+        feature.run(spark, source)
+    }
   }
 }
 
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGenFeatures.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGenFeatures.scala
new file mode 100644
index 0000000000..efb595d3d1
--- /dev/null
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGenFeatures.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.integration.ds
+
+import org.apache.gluten.integration.DataGen
+
+import org.apache.spark.sql.SparkSession
+
+object TpcdsDataGenFeatures {
+  object EnableDeltaDeletionVector extends DataGen.Feature {
+    override def name(): String = "enable_dv"
+    override def run(spark: SparkSession, source: String): Unit = {
+      require(
+        source == "delta",
+        s"${EnableDeltaDeletionVector.getClass} only supports Delta data 
source")
+      spark.sql(
+        "ALTER TABLE store_sales SET TBLPROPERTIES 
('delta.enableDeletionVectors' = 'true')")
+      spark.sql(
+        "ALTER TABLE store_returns SET TBLPROPERTIES 
('delta.enableDeletionVectors' = 'true')")
+      spark.sql(
+        "ALTER TABLE catalog_sales SET TBLPROPERTIES 
('delta.enableDeletionVectors' = 'true')")
+      spark.sql(
+        "ALTER TABLE catalog_returns SET TBLPROPERTIES 
('delta.enableDeletionVectors' = 'true')")
+      spark.sql("ALTER TABLE web_sales SET TBLPROPERTIES 
('delta.enableDeletionVectors' = 'true')")
+      spark.sql(
+        "ALTER TABLE web_returns SET TBLPROPERTIES 
('delta.enableDeletionVectors' = 'true')")
+    }
+  }
+
+  object DeleteTenPercentData extends DataGen.Feature {
+    override def name(): String = "delete_10pc"
+    override def run(spark: SparkSession, source: String): Unit = {
+      require(
+        source == "delta",
+        s"${DeleteTenPercentData.getClass} only supports Delta data source")
+      spark.sql("DELETE FROM store_sales WHERE ss_ticket_number % 10 = 
7").show()
+      spark.sql("DELETE FROM store_returns WHERE sr_ticket_number % 10 = 
7").show()
+      spark.sql("DELETE FROM catalog_sales WHERE cs_order_number % 10 = 
7").show()
+      spark.sql("DELETE FROM catalog_returns WHERE cr_order_number % 10 = 
7").show()
+      spark.sql("DELETE FROM web_sales WHERE ws_order_number % 10 = 7").show()
+      spark.sql("DELETE FROM web_returns WHERE wr_order_number % 10 = 
7").show()
+    }
+  }
+}
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
index 3372856ba0..e6cff8f8cf 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
@@ -38,6 +38,7 @@ class TpcdsSuite(
     val dataDir: String,
     val dataScale: Double,
     val genPartitionedData: Boolean,
+    val dataGenFeatures: Seq[String],
     val enableUi: Boolean,
     val enableHsUi: Boolean,
     val hsUiPort: Int,
@@ -79,8 +80,9 @@ class TpcdsSuite(
     } else {
       "non_partitioned"
     }
+    val featureFlags = dataGenFeatures.map(feature => 
s"-$feature").mkString("")
     new File(dataDir).toPath
-      
.resolve(s"$TPCDS_WRITE_RELATIVE_PATH-$dataScale-$dataSource-$partitionedFlag")
+      
.resolve(s"$TPCDS_WRITE_RELATIVE_PATH-$dataScale-$dataSource-$partitionedFlag$featureFlags")
       .toFile
       .getAbsolutePath
   }
@@ -93,8 +95,9 @@ class TpcdsSuite(
       shufflePartitions,
       dataSource,
       dataWritePath(),
-      typeModifiers(),
-      genPartitionedData)
+      genPartitionedData,
+      dataGenFeatures,
+      typeModifiers())
   }
 
   override private[integration] def allQueries(): QuerySet = {
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
index 02a8e629d3..eaf4b4ded9 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
@@ -29,15 +29,19 @@ import java.sql.Date
 import scala.collection.JavaConverters._
 
 class TpchDataGen(
-    val spark: SparkSession,
+    spark: SparkSession,
     scale: Double,
     partitions: Int,
     source: String,
     dir: String,
-    typeModifiers: List[TypeModifier] = List())
+    featureNames: Seq[String],
+    typeModifiers: Seq[TypeModifier])
   extends Serializable
   with DataGen {
 
+  private val featureRegistry = new DataGen.FeatureRegistry
+  private val features = featureNames.map(featureRegistry.getFeature)
+
   override def gen(): Unit = {
     generate(dir, "lineitem", lineItemSchema, partitions, lineItemGenerator, 
lineItemParser)
     generate(dir, "customer", customerSchema, partitions, customerGenerator, 
customerParser)
@@ -53,6 +57,12 @@ class TpchDataGen(
     generate(dir, "nation", nationSchema, nationGenerator, nationParser)
     generate(dir, "part", partSchema, partitions, partGenerator, partParser)
     generate(dir, "region", regionSchema, regionGenerator, regionParser)
+
+    features.foreach {
+      feature =>
+        println(s"Execute feature: ${feature.name()}")
+        feature.run(spark, source)
+    }
   }
 
   // lineitem
@@ -339,6 +349,7 @@ class TpchDataGen(
       .write
       .format(source)
       .mode(SaveMode.Overwrite)
-      .save(dir + File.separator + tableName)
+      .option("path", dir + File.separator + tableName) // storage location
+      .saveAsTable(tableName)
   }
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
index 2894d359ac..a0361e9c9f 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
@@ -38,6 +38,7 @@ class TpchSuite(
     val dataDir: String,
     val dataScale: Double,
     val genPartitionedData: Boolean,
+    val dataGenFeatures: Seq[String],
     val enableUi: Boolean,
     val enableHsUi: Boolean,
     val hsUiPort: Int,
@@ -73,11 +74,13 @@ class TpchSuite(
 
   override protected def historyWritePath(): String = HISTORY_WRITE_PATH
 
-  override private[integration] def dataWritePath(): String =
+  override private[integration] def dataWritePath(): String = {
+    val featureFlags = dataGenFeatures.map(feature => 
s"-$feature").mkString("")
     new File(dataDir).toPath
-      .resolve(s"$TPCH_WRITE_RELATIVE_PATH-$dataScale-$dataSource")
+      
.resolve(s"$TPCH_WRITE_RELATIVE_PATH-$dataScale-$dataSource$featureFlags")
       .toFile
       .getAbsolutePath
+  }
 
   override private[integration] def createDataGen(): DataGen = {
     checkDataGenArgs(dataSource, dataScale, genPartitionedData)
@@ -87,6 +90,7 @@ class TpchSuite(
       shufflePartitions,
       dataSource,
       dataWritePath(),
+      dataGenFeatures,
       typeModifiers())
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to