This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4b8da065e6 [VL] Support generating DV-enabled TPC-DS Delta table in
gluten-it (#10639)
4b8da065e6 is described below
commit 4b8da065e69966d8321058222e381378218728af
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Sep 9 18:38:30 2025 +0200
[VL] Support generating DV-enabled TPC-DS Delta table in gluten-it (#10639)
---
.../org/apache/gluten/integration/BaseMixin.java | 10 ++++
.../org/apache/gluten/integration/DataGen.scala | 27 +++++++++-
.../integration/clickbench/ClickBenchDataGen.scala | 2 +-
.../gluten/integration/ds/TpcdsDataGen.scala | 29 ++++++++---
.../integration/ds/TpcdsDataGenFeatures.scala | 58 ++++++++++++++++++++++
.../apache/gluten/integration/ds/TpcdsSuite.scala | 9 ++--
.../apache/gluten/integration/h/TpchDataGen.scala | 17 +++++--
.../apache/gluten/integration/h/TpchSuite.scala | 8 ++-
8 files changed, 142 insertions(+), 18 deletions(-)
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
index 0240bd408f..88ccf9c512 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
@@ -29,6 +29,7 @@ import org.apache.log4j.LogManager;
import org.apache.spark.SparkConf;
import picocli.CommandLine;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -91,6 +92,13 @@ public class BaseMixin {
defaultValue = "false")
private boolean genPartitionedData;
+ @CommandLine.Option(
+ names = {"--data-gen-features"},
+ description =
+ "Set a comma-separated list of data generator features to enable.
Example: --data-gen-features=enable_dv,delete_10pc",
+ split = ",")
+ private String[] dataGenFeatures = new String[0];
+
@CommandLine.Option(
names = {"--enable-ui"},
description = "Enable Spark UI",
@@ -207,6 +215,7 @@ public class BaseMixin {
dataDir,
dataScale,
genPartitionedData,
+
JavaCollectionConverter.asScalaSeq(Arrays.asList(dataGenFeatures)),
enableUi,
enableHsUi,
hsUiPort,
@@ -233,6 +242,7 @@ public class BaseMixin {
dataDir,
dataScale,
genPartitionedData,
+
JavaCollectionConverter.asScalaSeq(Arrays.asList(dataGenFeatures)),
enableUi,
enableHsUi,
hsUiPort,
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
index e853fdc73b..e722b2767a 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
@@ -16,8 +16,11 @@
*/
package org.apache.gluten.integration
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import scala.collection.mutable
+
trait DataGen {
def gen(): Unit
}
@@ -32,7 +35,7 @@ class NoopModifier(t: DataType) extends TypeModifier(_ =>
true, t) {
}
object DataGen {
- def getRowModifier(schema: StructType, typeModifiers: List[TypeModifier]):
Int => TypeModifier = {
+ def getRowModifier(schema: StructType, typeModifiers: Seq[TypeModifier]):
Int => TypeModifier = {
val modifiers = schema.fields.map {
f =>
val matchedModifiers = typeModifiers.flatMap {
@@ -65,4 +68,26 @@ object DataGen {
})
modifiedSchema
}
+
+ trait Feature extends Serializable {
+ def name(): String
+ def run(spark: SparkSession, source: String)
+ }
+
+ class FeatureRegistry extends Serializable {
+ private val lookup: mutable.LinkedHashMap[String, Feature] =
mutable.LinkedHashMap()
+
+ def register(feature: Feature): Unit = {
+ require(feature.name().matches("^\\w+$"))
+ require(!lookup.contains(feature.name()))
+ lookup(feature.name()) = feature
+ }
+
+ def getFeature(name: String): Feature = {
+ require(
+ lookup.contains(name),
+ s"No feature found by name: $name, available features:
${lookup.keys.mkString(", ")}")
+ lookup(name)
+ }
+ }
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
index d6c56e631c..7e75b153ee 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
@@ -27,7 +27,7 @@ import java.io.File
import scala.language.postfixOps
import scala.sys.process._
-class ClickBenchDataGen(val spark: SparkSession, dir: String) extends DataGen {
+class ClickBenchDataGen(spark: SparkSession, dir: String) extends DataGen {
import ClickBenchDataGen._
override def gen(): Unit = {
println(s"Start to download ClickBench Parquet dataset from URL:
$DATA_URL... ")
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
index caa5c1adcf..50404228f5 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
@@ -28,16 +28,24 @@ import java.io.File
import scala.collection.JavaConverters._
class TpcdsDataGen(
- val spark: SparkSession,
+ spark: SparkSession,
scale: Double,
partitions: Int,
source: String,
dir: String,
- typeModifiers: List[TypeModifier] = List(),
- val genPartitionedData: Boolean)
- extends Serializable
+ genPartitionedData: Boolean,
+ featureNames: Seq[String],
+ typeModifiers: Seq[TypeModifier]
+) extends Serializable
with DataGen {
+ private val featureRegistry = new DataGen.FeatureRegistry
+
+ featureRegistry.register(TpcdsDataGenFeatures.EnableDeltaDeletionVector)
+ featureRegistry.register(TpcdsDataGenFeatures.DeleteTenPercentData)
+
+ private val features = featureNames.map(featureRegistry.getFeature)
+
def writeParquetTable(t: Table): Unit = {
val name = t.getName
if (name.equals("dbgen_version")) {
@@ -73,6 +81,7 @@ class TpcdsDataGen(
List[String]()
} else {
name match {
+ case "store_sales" => List("ss_sold_date_sk")
case "catalog_sales" => List("cs_sold_date_sk")
case "web_sales" => List("ws_sold_date_sk")
case _ => List[String]()
@@ -94,9 +103,6 @@ class TpcdsDataGen(
val stringSchema = StructType(modifiedSchema.fields.map(f =>
StructField(f.name, StringType)))
val columns = modifiedSchema.fields.map(f => new
Column(f.name).cast(f.dataType).as(f.name))
- // dwrf support was temporarily dropped since it impacts data gen skipping
strategy.
- // Better design is required to re-enable it
- val tablePath = dir + File.separator + tableName
spark
.range(0, partitions, 1L, partitions)
.mapPartitions {
@@ -124,11 +130,18 @@ class TpcdsDataGen(
.format(source)
.partitionBy(partitionBy.toArray: _*)
.mode(SaveMode.Overwrite)
- .save(dir + File.separator + tableName)
+ .option("path", dir + File.separator + tableName) // storage location
+ .saveAsTable(tableName)
}
override def gen(): Unit = {
Table.getBaseTables.forEach(t => writeParquetTable(t))
+
+ features.foreach {
+ feature =>
+ println(s"Execute feature: ${feature.name()}")
+ feature.run(spark, source)
+ }
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGenFeatures.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGenFeatures.scala
new file mode 100644
index 0000000000..efb595d3d1
--- /dev/null
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGenFeatures.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.integration.ds
+
+import org.apache.gluten.integration.DataGen
+
+import org.apache.spark.sql.SparkSession
+
+object TpcdsDataGenFeatures {
+ object EnableDeltaDeletionVector extends DataGen.Feature {
+ override def name(): String = "enable_dv"
+ override def run(spark: SparkSession, source: String): Unit = {
+ require(
+ source == "delta",
+ s"${EnableDeltaDeletionVector.getClass} only supports Delta data
source")
+ spark.sql(
+ "ALTER TABLE store_sales SET TBLPROPERTIES
('delta.enableDeletionVectors' = 'true')")
+ spark.sql(
+ "ALTER TABLE store_returns SET TBLPROPERTIES
('delta.enableDeletionVectors' = 'true')")
+ spark.sql(
+ "ALTER TABLE catalog_sales SET TBLPROPERTIES
('delta.enableDeletionVectors' = 'true')")
+ spark.sql(
+ "ALTER TABLE catalog_returns SET TBLPROPERTIES
('delta.enableDeletionVectors' = 'true')")
+ spark.sql("ALTER TABLE web_sales SET TBLPROPERTIES
('delta.enableDeletionVectors' = 'true')")
+ spark.sql(
+ "ALTER TABLE web_returns SET TBLPROPERTIES
('delta.enableDeletionVectors' = 'true')")
+ }
+ }
+
+ object DeleteTenPercentData extends DataGen.Feature {
+ override def name(): String = "delete_10pc"
+ override def run(spark: SparkSession, source: String): Unit = {
+ require(
+ source == "delta",
+ s"${DeleteTenPercentData.getClass} only supports Delta data source")
+ spark.sql("DELETE FROM store_sales WHERE ss_ticket_number % 10 =
7").show()
+ spark.sql("DELETE FROM store_returns WHERE sr_ticket_number % 10 =
7").show()
+ spark.sql("DELETE FROM catalog_sales WHERE cs_order_number % 10 =
7").show()
+ spark.sql("DELETE FROM catalog_returns WHERE cr_order_number % 10 =
7").show()
+ spark.sql("DELETE FROM web_sales WHERE ws_order_number % 10 = 7").show()
+ spark.sql("DELETE FROM web_returns WHERE wr_order_number % 10 =
7").show()
+ }
+ }
+}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
index 3372856ba0..e6cff8f8cf 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
@@ -38,6 +38,7 @@ class TpcdsSuite(
val dataDir: String,
val dataScale: Double,
val genPartitionedData: Boolean,
+ val dataGenFeatures: Seq[String],
val enableUi: Boolean,
val enableHsUi: Boolean,
val hsUiPort: Int,
@@ -79,8 +80,9 @@ class TpcdsSuite(
} else {
"non_partitioned"
}
+ val featureFlags = dataGenFeatures.map(feature =>
s"-$feature").mkString("")
new File(dataDir).toPath
-
.resolve(s"$TPCDS_WRITE_RELATIVE_PATH-$dataScale-$dataSource-$partitionedFlag")
+
.resolve(s"$TPCDS_WRITE_RELATIVE_PATH-$dataScale-$dataSource-$partitionedFlag$featureFlags")
.toFile
.getAbsolutePath
}
@@ -93,8 +95,9 @@ class TpcdsSuite(
shufflePartitions,
dataSource,
dataWritePath(),
- typeModifiers(),
- genPartitionedData)
+ genPartitionedData,
+ dataGenFeatures,
+ typeModifiers())
}
override private[integration] def allQueries(): QuerySet = {
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
index 02a8e629d3..eaf4b4ded9 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
@@ -29,15 +29,19 @@ import java.sql.Date
import scala.collection.JavaConverters._
class TpchDataGen(
- val spark: SparkSession,
+ spark: SparkSession,
scale: Double,
partitions: Int,
source: String,
dir: String,
- typeModifiers: List[TypeModifier] = List())
+ featureNames: Seq[String],
+ typeModifiers: Seq[TypeModifier])
extends Serializable
with DataGen {
+ private val featureRegistry = new DataGen.FeatureRegistry
+ private val features = featureNames.map(featureRegistry.getFeature)
+
override def gen(): Unit = {
generate(dir, "lineitem", lineItemSchema, partitions, lineItemGenerator,
lineItemParser)
generate(dir, "customer", customerSchema, partitions, customerGenerator,
customerParser)
@@ -53,6 +57,12 @@ class TpchDataGen(
generate(dir, "nation", nationSchema, nationGenerator, nationParser)
generate(dir, "part", partSchema, partitions, partGenerator, partParser)
generate(dir, "region", regionSchema, regionGenerator, regionParser)
+
+ features.foreach {
+ feature =>
+ println(s"Execute feature: ${feature.name()}")
+ feature.run(spark, source)
+ }
}
// lineitem
@@ -339,6 +349,7 @@ class TpchDataGen(
.write
.format(source)
.mode(SaveMode.Overwrite)
- .save(dir + File.separator + tableName)
+ .option("path", dir + File.separator + tableName) // storage location
+ .saveAsTable(tableName)
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
index 2894d359ac..a0361e9c9f 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
@@ -38,6 +38,7 @@ class TpchSuite(
val dataDir: String,
val dataScale: Double,
val genPartitionedData: Boolean,
+ val dataGenFeatures: Seq[String],
val enableUi: Boolean,
val enableHsUi: Boolean,
val hsUiPort: Int,
@@ -73,11 +74,13 @@ class TpchSuite(
override protected def historyWritePath(): String = HISTORY_WRITE_PATH
- override private[integration] def dataWritePath(): String =
+ override private[integration] def dataWritePath(): String = {
+ val featureFlags = dataGenFeatures.map(feature =>
s"-$feature").mkString("")
new File(dataDir).toPath
- .resolve(s"$TPCH_WRITE_RELATIVE_PATH-$dataScale-$dataSource")
+
.resolve(s"$TPCH_WRITE_RELATIVE_PATH-$dataScale-$dataSource$featureFlags")
.toFile
.getAbsolutePath
+ }
override private[integration] def createDataGen(): DataGen = {
checkDataGenArgs(dataSource, dataScale, genPartitionedData)
@@ -87,6 +90,7 @@ class TpchSuite(
shufflePartitions,
dataSource,
dataWritePath(),
+ dataGenFeatures,
typeModifiers())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]