This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 73882e5421 [KYUUBI #7318] Allow specifying the executionMode when
executing the tpcds benchmark
73882e5421 is described below
commit 73882e542100c50560577a51346ed5f572e449be
Author: yuqi <[email protected]>
AuthorDate: Tue Feb 3 10:28:22 2026 +0800
[KYUUBI #7318] Allow specifying the executionMode when executing the tpcds
benchmark
### Why are the changes needed?
The underlying system supports specifying the executionMode when running
the TPCDS benchmark, but this option is not exposed to users through parameters.
### How was this patch tested?
Manual test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #7318 from ychris78/tpcde_execution_mode.
Closes #7318
f61162a4a [yuqi] reformat code
76b9578fc [Cheng Pan] Apply suggestion from @pan3793
bb4fb17af [yuqi] reformat code
55f72bc5d [yuqi] refine
f3056949e [yuqi] Allow specifying the executionMode when executing the
tpcds benchmark
Lead-authored-by: yuqi <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
dev/kyuubi-tpcds/README.md | 19 ++++++------
.../apache/kyuubi/tpcds/benchmark/Benchmark.scala | 7 ++---
.../kyuubi/tpcds/benchmark/RunBenchmark.scala | 10 +++++--
.../org/apache/kyuubi/tpcds/benchmark/TPCDS.scala | 34 +++++++++++++++++++++-
.../kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala | 18 +-----------
5 files changed, 55 insertions(+), 33 deletions(-)
diff --git a/dev/kyuubi-tpcds/README.md b/dev/kyuubi-tpcds/README.md
index 717c1b0edb..1d2b054989 100644
--- a/dev/kyuubi-tpcds/README.md
+++ b/dev/kyuubi-tpcds/README.md
@@ -48,15 +48,16 @@ $SPARK_HOME/bin/spark-submit \
Support options:
-| key | default | description
|
-|-------------|------------------------|-------------------------------------------------------------------------------|
-| db | none(required) | the TPC-DS database
|
-| benchmark | tpcds-v2.4-benchmark | the name of application
|
-| iterations | 3 | the number of iterations to run
|
-| breakdown | false | whether to record breakdown results
of an execution |
-| results-dir | /spark/sql/performance | dir to store benchmark results, e.g.
hdfs://hdfs-nn:9870/pref |
-| include | none(optional) | name of the queries to run, use comma
to split multiple names, e.g. q1,q2 |
-| exclude | none(optional) | name of the queries to exclude, use
comma to split multiple names, e.g. q2,q4 |
+| key | default | description
|
+|----------------|-------------------------|----------------------------------------------------------------------------------------------------------------------------|
+| db | none(required) | the TPC-DS database
|
+| benchmark | tpcds-v2.4-benchmark | the name of application
|
+| iterations | 3 | the number of iterations to run
|
+| breakdown | false | whether to record breakdown
results of an execution
|
+| results-dir | /spark/sql/performance | dir to store benchmark results,
e.g. hdfs://hdfs-nn:9870/pref
|
+| include | none(optional) | name of the queries to run, use
comma to split multiple names, e.g. q1,q2
|
+| exclude | none(optional) | name of the queries to exclude,
use comma to split multiple names, e.g. q2,q4
|
+| execution-mode | collect | how a given Spark benchmark
should be run, only the following four modes are supported:
collect,foreach,saveToParquet,hash |
Example: the following command to benchmark TPC-DS sf10 with exists database
`tpcds_sf10`.
diff --git
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala
index 8071bca1b8..ca5d79953b 100644
---
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala
+++
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala
@@ -79,7 +79,7 @@ abstract class Benchmark(
variations: Seq[Variation[_]] = Seq(Variation("StandardRun",
Seq("true")) { _ => {} }),
tags: Map[String, String] = Map.empty,
timeout: Long = 0L,
- resultsDir: String = resultsLocation,
+ resultPath: String = resultsLocation,
forkThread: Boolean = true): ExperimentStatus = {
new ExperimentStatus(
@@ -89,7 +89,7 @@ abstract class Benchmark(
variations,
tags,
timeout,
- resultsDir,
+ resultPath,
sparkSession,
currentConfiguration,
forkThread = forkThread)
@@ -140,7 +140,7 @@ object Benchmark {
variations: Seq[Variation[_]],
tags: Map[String, String],
timeout: Long,
- resultsDir: String,
+ val resultPath: String,
sparkSession: SparkSession,
currentConfiguration: BenchmarkConfiguration,
forkThread: Boolean = true) {
@@ -169,7 +169,6 @@ object Benchmark {
}
val timestamp: Long = System.currentTimeMillis()
- val resultPath = s"$resultsDir/timestamp=$timestamp"
val combinations: Seq[List[Int]] =
cartesianProduct(variations.map(l => l.options.indices.toList).toList)
val resultsFuture: Future[Unit] = Future {
diff --git
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala
index 80f7422941..3692b9453b 100644
---
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala
+++
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala
@@ -30,7 +30,8 @@ case class RunConfig(
breakdown: Boolean = false,
resultsDir: String = "/spark/sql/performance",
include: Set[String] = Set.empty,
- exclude: Set[String] = Set.empty)
+ exclude: Set[String] = Set.empty,
+ executionMode: String = "collect")
// scalastyle:off
/**
@@ -73,6 +74,9 @@ object RunBenchmark {
c.copy(exclude = x.split(",").map(_.trim).filter(_.nonEmpty).toSet)
}
.text("name of the queries to exclude, use comma to split multiple
names, e.g. q2,q4")
+ opt[String]("execution-mode")
+ .action((x, c) => c.copy(executionMode = x))
+ .text("how a given Spark benchmark should be run, only the following
four modes are supported: collect,foreach,saveToParquet,hash")
help("help")
.text("prints this usage text")
}
@@ -90,7 +94,9 @@ object RunBenchmark {
val sparkSession =
SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
import sparkSession.implicits._
- sparkSession.conf.set("spark.sql.perf.results", config.resultsDir)
+ val timestamp: Long = System.currentTimeMillis()
+ sparkSession.conf.set("spark.sql.perf.results",
s"${config.resultsDir}/timestamp=$timestamp")
+ sparkSession.conf.set("spark.sql.benchmark.executionMode",
config.executionMode)
val benchmark = new TPCDS(sparkSession)
diff --git
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala
index ba25ec0f8b..a6bc983472 100644
---
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala
+++
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala
@@ -17,12 +17,44 @@
package org.apache.kyuubi.tpcds.benchmark
+import scala.io.{Codec, Source}
+
import org.apache.spark.sql.SparkSession
+import org.apache.kyuubi.tpcds.benchmark.ExecutionMode._
+
/**
* TPC-DS benchmark's dataset.
*/
class TPCDS(@transient sparkSession: SparkSession)
extends Benchmark(sparkSession)
with TPCDS_2_4_Queries
- with Serializable {}
+ with Serializable {
+
+ override val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName =>
+ val in =
getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql")
+ val queryContent =
+ try {
+ Source.fromInputStream(in)(Codec.UTF8).mkString
+ } finally {
+ in.close()
+ }
+
+ val modeName: String =
sparkSession.conf.get("spark.sql.benchmark.executionMode")
+ val resultsLocation: String =
sparkSession.conf.get("spark.sql.perf.results")
+
+ val executionMode: ExecutionMode = modeName match {
+ case "collect" => CollectResults
+ case "foreach" => ForeachResults
+ case "hash" => HashResults
+ case "saveToParquet" => WriteParquet(s"${resultsLocation}_query_results")
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported mode: $modeName")
+ }
+ Query(
+ queryName + "-v2.4",
+ queryContent,
+ description = "TPC-DS 2.4 Query",
+ executionMode = executionMode)
+ }
+}
diff --git
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
index 220caafc22..9c3c0c9623 100644
---
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
+++
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
@@ -17,15 +17,11 @@
package org.apache.kyuubi.tpcds.benchmark
-import scala.io.{Codec, Source}
-
/**
* This implements the official TPCDS v2.4 queries with only cosmetic
modifications.
*/
trait TPCDS_2_4_Queries extends Benchmark {
- import ExecutionMode._
-
val queryNames = Seq(
"q1",
"q2",
@@ -132,18 +128,6 @@ trait TPCDS_2_4_Queries extends Benchmark {
"q99",
"ss_max")
- val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName =>
- val in =
getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql")
- val queryContent: String = Source.fromInputStream(in)(Codec.UTF8).mkString
- in.close()
-
- Query(
- queryName + "-v2.4",
- queryContent,
- description = "TPC-DS 2.4 Query",
- executionMode = CollectResults)
- }
+ val tpcds2_4Queries: Seq[Query]
- val tpcds2_4QueriesMap: Map[String, Query] =
- tpcds2_4Queries.map(q => q.name.split("-").get(0) -> q).toMap
}