This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 73882e5421 [KYUUBI #7318] Allow specifying the executionMode when 
executing the tpcds benchmark
73882e5421 is described below

commit 73882e542100c50560577a51346ed5f572e449be
Author: yuqi <[email protected]>
AuthorDate: Tue Feb 3 10:28:22 2026 +0800

    [KYUUBI #7318] Allow specifying the executionMode when executing the tpcds 
benchmark
    
    ### Why are the changes needed?
    
    The underlying system supports specifying the executionMode when running 
the TPCDS benchmark, but this option is not exposed to users through parameters.
    
    ### How was this patch tested?
    
    Manual test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #7318 from ychris78/tpcde_execution_mode.
    
    Closes #7318
    
    f61162a4a [yuqi] reformat code
    76b9578fc [Cheng Pan] Apply suggestion from @pan3793
    bb4fb17af [yuqi] reformat code
    55f72bc5d [yuqi] refine
    f3056949e [yuqi] Allow specifying the executionMode when executing the 
tpcds benchmark
    
    Lead-authored-by: yuqi <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 dev/kyuubi-tpcds/README.md                         | 19 ++++++------
 .../apache/kyuubi/tpcds/benchmark/Benchmark.scala  |  7 ++---
 .../kyuubi/tpcds/benchmark/RunBenchmark.scala      | 10 +++++--
 .../org/apache/kyuubi/tpcds/benchmark/TPCDS.scala  | 34 +++++++++++++++++++++-
 .../kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala | 18 +-----------
 5 files changed, 55 insertions(+), 33 deletions(-)

diff --git a/dev/kyuubi-tpcds/README.md b/dev/kyuubi-tpcds/README.md
index 717c1b0edb..1d2b054989 100644
--- a/dev/kyuubi-tpcds/README.md
+++ b/dev/kyuubi-tpcds/README.md
@@ -48,15 +48,16 @@ $SPARK_HOME/bin/spark-submit \
 
 Support options:
 
-| key         | default                | description                           
                                        |
-|-------------|------------------------|-------------------------------------------------------------------------------|
-| db          | none(required)         | the TPC-DS database                   
                                        |
-| benchmark   | tpcds-v2.4-benchmark   | the name of application               
                                        |
-| iterations  | 3                      | the number of iterations to run       
                                        |
-| breakdown   | false                  | whether to record breakdown results 
of an execution                           |
-| results-dir | /spark/sql/performance | dir to store benchmark results, e.g. 
hdfs://hdfs-nn:9870/pref                 |
-| include     | none(optional)         | name of the queries to run, use comma 
to split multiple names, e.g. q1,q2     |
-| exclude     | none(optional)         | name of the queries to exclude, use 
comma to split multiple names, e.g. q2,q4 |
+| key            | default                 | description                       
                                                                                
         |
+|----------------|-------------------------|----------------------------------------------------------------------------------------------------------------------------|
+| db             | none(required)          | the TPC-DS database               
                                                                                
         |
+| benchmark      | tpcds-v2.4-benchmark    | the name of application           
                                                                                
         |
+| iterations     | 3                       | the number of iterations to run   
                                                                                
         |
+| breakdown      | false                   | whether to record breakdown 
results of an execution                                                         
               |
+| results-dir    | /spark/sql/performance  | dir to store benchmark results, 
e.g. hdfs://hdfs-nn:9870/pref                                                   
           |
+| include        | none(optional)          | name of the queries to run, use 
comma to split multiple names, e.g. q1,q2                                       
           |
+| exclude        | none(optional)          | name of the queries to exclude, 
use comma to split multiple names, e.g. q2,q4                                   
           |
+| execution-mode | collect                 | how a given Spark benchmark 
should be run, only the following four modes are supported: 
collect,foreach,saveToParquet,hash |
 
 Example: the following command to benchmark TPC-DS sf10 with exists database 
`tpcds_sf10`.
 
diff --git 
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala
 
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala
index 8071bca1b8..ca5d79953b 100644
--- 
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala
+++ 
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala
@@ -79,7 +79,7 @@ abstract class Benchmark(
       variations: Seq[Variation[_]] = Seq(Variation("StandardRun", 
Seq("true")) { _ => {} }),
       tags: Map[String, String] = Map.empty,
       timeout: Long = 0L,
-      resultsDir: String = resultsLocation,
+      resultPath: String = resultsLocation,
       forkThread: Boolean = true): ExperimentStatus = {
 
     new ExperimentStatus(
@@ -89,7 +89,7 @@ abstract class Benchmark(
       variations,
       tags,
       timeout,
-      resultsDir,
+      resultPath,
       sparkSession,
       currentConfiguration,
       forkThread = forkThread)
@@ -140,7 +140,7 @@ object Benchmark {
       variations: Seq[Variation[_]],
       tags: Map[String, String],
       timeout: Long,
-      resultsDir: String,
+      val resultPath: String,
       sparkSession: SparkSession,
       currentConfiguration: BenchmarkConfiguration,
       forkThread: Boolean = true) {
@@ -169,7 +169,6 @@ object Benchmark {
     }
 
     val timestamp: Long = System.currentTimeMillis()
-    val resultPath = s"$resultsDir/timestamp=$timestamp"
     val combinations: Seq[List[Int]] =
       cartesianProduct(variations.map(l => l.options.indices.toList).toList)
     val resultsFuture: Future[Unit] = Future {
diff --git 
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala
 
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala
index 80f7422941..3692b9453b 100644
--- 
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala
+++ 
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala
@@ -30,7 +30,8 @@ case class RunConfig(
     breakdown: Boolean = false,
     resultsDir: String = "/spark/sql/performance",
     include: Set[String] = Set.empty,
-    exclude: Set[String] = Set.empty)
+    exclude: Set[String] = Set.empty,
+    executionMode: String = "collect")
 
 // scalastyle:off
 /**
@@ -73,6 +74,9 @@ object RunBenchmark {
           c.copy(exclude = x.split(",").map(_.trim).filter(_.nonEmpty).toSet)
         }
         .text("name of the queries to exclude, use comma to split multiple 
names, e.g. q2,q4")
+      opt[String]("execution-mode")
+        .action((x, c) => c.copy(executionMode = x))
+        .text("how a given Spark benchmark should be run, only the following 
four modes are supported: collect,foreach,saveToParquet,hash")
       help("help")
         .text("prints this usage text")
     }
@@ -90,7 +94,9 @@ object RunBenchmark {
     val sparkSession = 
SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
     import sparkSession.implicits._
 
-    sparkSession.conf.set("spark.sql.perf.results", config.resultsDir)
+    val timestamp: Long = System.currentTimeMillis()
+    sparkSession.conf.set("spark.sql.perf.results", 
s"${config.resultsDir}/timestamp=$timestamp")
+    sparkSession.conf.set("spark.sql.benchmark.executionMode", 
config.executionMode)
 
     val benchmark = new TPCDS(sparkSession)
 
diff --git 
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala 
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala
index ba25ec0f8b..a6bc983472 100644
--- 
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala
+++ 
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala
@@ -17,12 +17,44 @@
 
 package org.apache.kyuubi.tpcds.benchmark
 
+import scala.io.{Codec, Source}
+
 import org.apache.spark.sql.SparkSession
 
+import org.apache.kyuubi.tpcds.benchmark.ExecutionMode._
+
 /**
  * TPC-DS benchmark's dataset.
  */
 class TPCDS(@transient sparkSession: SparkSession)
   extends Benchmark(sparkSession)
   with TPCDS_2_4_Queries
-  with Serializable {}
+  with Serializable {
+
+  override val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName =>
+    val in = 
getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql")
+    val queryContent =
+      try {
+        Source.fromInputStream(in)(Codec.UTF8).mkString
+      } finally {
+        in.close()
+      }
+
+    val modeName: String = 
sparkSession.conf.get("spark.sql.benchmark.executionMode")
+    val resultsLocation: String = 
sparkSession.conf.get("spark.sql.perf.results")
+
+    val executionMode: ExecutionMode = modeName match {
+      case "collect" => CollectResults
+      case "foreach" => ForeachResults
+      case "hash" => HashResults
+      case "saveToParquet" => WriteParquet(s"${resultsLocation}_query_results")
+      case _ =>
+        throw new IllegalArgumentException(s"Unsupported mode: $modeName")
+    }
+    Query(
+      queryName + "-v2.4",
+      queryContent,
+      description = "TPC-DS 2.4 Query",
+      executionMode = executionMode)
+  }
+}
diff --git 
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
 
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
index 220caafc22..9c3c0c9623 100644
--- 
a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
+++ 
b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
@@ -17,15 +17,11 @@
 
 package org.apache.kyuubi.tpcds.benchmark
 
-import scala.io.{Codec, Source}
-
 /**
  * This implements the official TPCDS v2.4 queries with only cosmetic 
modifications.
  */
 trait TPCDS_2_4_Queries extends Benchmark {
 
-  import ExecutionMode._
-
   val queryNames = Seq(
     "q1",
     "q2",
@@ -132,18 +128,6 @@ trait TPCDS_2_4_Queries extends Benchmark {
     "q99",
     "ss_max")
 
-  val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName =>
-    val in = 
getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql")
-    val queryContent: String = Source.fromInputStream(in)(Codec.UTF8).mkString
-    in.close()
-
-    Query(
-      queryName + "-v2.4",
-      queryContent,
-      description = "TPC-DS 2.4 Query",
-      executionMode = CollectResults)
-  }
+  val tpcds2_4Queries: Seq[Query]
 
-  val tpcds2_4QueriesMap: Map[String, Query] =
-    tpcds2_4Queries.map(q => q.name.split("-").get(0) -> q).toMap
 }

Reply via email to