This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b2fc18654c [VL] Refactor gluten-it to pass structured query
information to runner (#10623)
b2fc18654c is described below
commit b2fc18654c3b9c204a3fc8168e479363d5899f7d
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Sep 5 15:08:58 2025 +0200
[VL] Refactor gluten-it to pass structured query information to runner
(#10623)
---
.../gluten/integration/command/QueriesMixin.java | 58 ++---------
.../apache/gluten/integration/QueryRunner.scala | 15 ++-
.../org/apache/gluten/integration/QuerySet.scala | 106 +++++++++++++++++++++
.../org/apache/gluten/integration/Suite.scala | 4 +-
.../apache/gluten/integration/action/Actions.scala | 4 +-
.../gluten/integration/action/Parameterized.scala | 21 ++--
.../apache/gluten/integration/action/Queries.scala | 20 ++--
.../gluten/integration/action/QueriesCompare.scala | 34 +++----
.../gluten/integration/action/SparkShell.scala | 2 +-
.../integration/clickbench/ClickBenchSuite.scala | 8 +-
.../apache/gluten/integration/ds/TpcdsSuite.scala | 10 +-
.../apache/gluten/integration/h/TpchSuite.scala | 9 +-
.../org/apache/spark/sql/SparkQueryRunner.scala | 30 +-----
13 files changed, 180 insertions(+), 141 deletions(-)
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/QueriesMixin.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/QueriesMixin.java
index 567aec4c89..8c47bc8bf0 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/QueriesMixin.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/QueriesMixin.java
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.integration.command;
+import org.apache.gluten.integration.QuerySet;
import org.apache.gluten.integration.Suite;
import org.apache.gluten.integration.action.Actions;
import org.apache.gluten.integration.collections.JavaCollectionConverter;
@@ -24,7 +25,6 @@ import com.google.common.base.Preconditions;
import picocli.CommandLine;
import java.util.*;
-import java.util.stream.Collectors;
public class QueriesMixin {
@CommandLine.Option(
@@ -91,63 +91,25 @@ public class QueriesMixin {
public Actions.QuerySelector queries() {
return new Actions.QuerySelector() {
@Override
- public scala.collection.immutable.Seq<String> select(Suite suite) {
- final List<String> all = select0(suite);
- final Division div = Division.parse(shard);
- final List<String> out = div(all, div);
- System.out.println("About to run queries: " + out + "... ");
- return JavaCollectionConverter.asScalaSeq(out);
- }
-
- private List<String> div(List<String> from, Division div) {
- final int queryCount = from.size();
- final int shardCount = div.shardCount;
- final int least = queryCount / shardCount;
- final int shardIdx = div.shard - 1;
- final int shardStart = shardIdx * least;
- final int numQueriesInShard;
- if (shardIdx == shardCount - 1) {
- final int remaining = queryCount - least * shardCount;
- numQueriesInShard = least + remaining;
- } else {
- numQueriesInShard = least;
- }
- final List<String> out = new ArrayList<>();
- for (int i = shardStart; i < shardStart + numQueriesInShard; i++) {
- out.add(from.get(i));
- }
- return out;
- }
-
- private List<String> select0(Suite suite) {
+ public QuerySet select(Suite suite) {
final String[] queryIds = queries;
final String[] excludedQueryIds = excludedQueries;
if (queryIds.length > 0 && excludedQueryIds.length > 0) {
throw new IllegalArgumentException(
"Should not specify queries and excluded queries at the same
time");
}
- String[] all = suite.allQueryIds();
- Set<String> allSet = new HashSet<>(Arrays.asList(all));
+ QuerySet querySet = suite.allQueries();
if (queryIds.length > 0) {
- for (String id : queryIds) {
- if (!allSet.contains(id)) {
- throw new IllegalArgumentException("Invalid query ID: " + id);
- }
- }
- return Arrays.asList(queryIds);
+ querySet =
querySet.filter(JavaCollectionConverter.asScalaSeq(Arrays.asList(queryIds)));
}
if (excludedQueryIds.length > 0) {
- for (String id : excludedQueryIds) {
- if (!allSet.contains(id)) {
- throw new IllegalArgumentException("Invalid query ID to exclude:
" + id);
- }
- }
- Set<String> excludedSet = new
HashSet<>(Arrays.asList(excludedQueryIds));
- return Arrays.stream(all)
- .filter(id -> !excludedSet.contains(id))
- .collect(Collectors.toList());
+ querySet =
+
querySet.exclude(JavaCollectionConverter.asScalaSeq(Arrays.asList(excludedQueryIds)));
}
- return Arrays.asList(all);
+ final Division div = Division.parse(shard);
+ querySet = querySet.getShard(div.shard - 1, div.shardCount);
+ System.out.println("About to run queries: " + querySet.queryIds() +
"... ");
+ return querySet;
}
};
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QueryRunner.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QueryRunner.scala
index 40fa01a04a..04685320a0 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QueryRunner.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QueryRunner.scala
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils
import java.io.File
-class QueryRunner(val queryResourceFolder: String, val source: String, val
dataPath: String) {
+class QueryRunner(val source: String, val dataPath: String) {
import QueryRunner._
Preconditions.checkState(
@@ -40,28 +40,27 @@ class QueryRunner(val queryResourceFolder: String, val
source: String, val dataP
def runQuery(
spark: SparkSession,
desc: String,
- caseId: String,
+ query: Query,
explain: Boolean = false,
sqlMetricMapper: MetricMapper = MetricMapper.dummy,
executorMetrics: Seq[String] = Nil,
randomKillTasks: Boolean = false): QueryResult = {
- val path = "%s/%s.sql".format(queryResourceFolder, caseId)
try {
val r =
SparkQueryRunner.runQuery(
spark,
desc,
- path,
+ query,
explain,
sqlMetricMapper,
executorMetrics,
randomKillTasks)
- println(s"Successfully ran query $caseId. Returned row count:
${r.rows.length}")
- Success(caseId, r)
+ println(s"Successfully ran query ${query.id}. Returned row count:
${r.rows.length}")
+ Success(query.id, r)
} catch {
case e: Exception =>
- println(s"Error running query $caseId. Error:
${ExceptionUtils.getStackTrace(e)}")
- Failure(caseId, e)
+ println(s"Error running query ${query.id}. Error:
${ExceptionUtils.getStackTrace(e)}")
+ Failure(query.id, e)
}
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QuerySet.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QuerySet.scala
new file mode 100644
index 0000000000..12919e7448
--- /dev/null
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/QuerySet.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.integration
+
+import java.io.ByteArrayOutputStream
+import java.nio.charset.StandardCharsets
+
+import scala.collection.mutable
+
+/** A set of SQL queries. */
+case class Query(id: String, path: String, sql: String)
+
+case class QuerySet(queryIds: Seq[String], queryMap: Map[String, Query]) {
+ assert(queryIds.size == queryMap.size)
+ private val queryCount = queryIds.size
+ private val queryIdSet = queryIds.toSet
+ require(queryIdSet.size == queryCount, s"Duplicated query IDs found in the
set: $queryIds")
+
+ val queries: Seq[Query] = queryIds.map(queryMap(_))
+
+ def filter(filteredQueryIds: Seq[String]): QuerySet = {
+ filteredQueryIds.foreach {
+ qid => require(queryIdSet.contains(qid), s"Query ID $qid is not found in
the set: $queryIds")
+ }
+ val filteredQueryIdSet = filteredQueryIds.toSet
+ QuerySet(
+ filteredQueryIds,
+ queryMap.filter { case (qid, _) => filteredQueryIdSet.contains(qid) })
+ }
+
+ def exclude(excludedQueryIds: Seq[String]): QuerySet = {
+ excludedQueryIds.foreach {
+ qid => require(queryIdSet.contains(qid), s"Query ID $qid is not found in
the set: $queryIds")
+ }
+
+ val excludedQueryIdSet = excludedQueryIds.toSet
+ val remainingQueryIds = queryIds.filter(!excludedQueryIdSet.contains(_))
+ filter(remainingQueryIds)
+ }
+
+ def getShard(shardId: Int, shardCount: Int): QuerySet = {
+ val least: Int = queryCount / shardCount
+ val shardStart: Int = shardId * least
+ var numQueriesInShard: Int = 0
+ if (shardId == shardCount - 1) {
+ val remaining: Int = queryCount - least * shardCount
+ numQueriesInShard = least + remaining
+ } else {
+ numQueriesInShard = least
+ }
+ val shardQueryIds = mutable.ArrayBuffer[String]()
+ for (i <- shardStart until shardStart + numQueriesInShard) {
+ shardQueryIds += queryIds(i)
+ }
+ filter(shardQueryIds.toSeq)
+ }
+
+ def getQuery(queryId: String): Query = {
+ queryMap(queryId)
+ }
+}
+
+object QuerySet {
+ private def resourceToString(resource: String): String = {
+ val inStream = QuerySet.getClass.getResourceAsStream(resource)
+ require(inStream != null, s"Resource not found: $resource")
+ val outStream = new ByteArrayOutputStream
+ try {
+ var reading = true
+ while (reading) {
+ inStream.read() match {
+ case -1 => reading = false
+ case c => outStream.write(c)
+ }
+ }
+ outStream.flush()
+ } finally {
+ inStream.close()
+ }
+ new String(outStream.toByteArray, StandardCharsets.UTF_8)
+ }
+
+ def readFromResource(folder: String, queryIds: Seq[String]): QuerySet = {
+ val queries = queryIds.map {
+ qid =>
+ val path = s"$folder/$qid.sql"
+ val sql = resourceToString(path)
+ qid -> Query(qid, path, sql)
+ }.toMap
+ QuerySet(queryIds, queries)
+ }
+}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index c1ca9b575c..2ea814df27 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -192,9 +192,7 @@ abstract class Suite(
private[integration] def genPartitionedData(): Boolean
- private[integration] def queryResource(): String
-
- private[integration] def allQueryIds(): Array[String]
+ private[integration] def allQueries(): QuerySet
private[integration] def desc(): String
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Actions.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Actions.scala
index 4977dda708..14a6b21eff 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Actions.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Actions.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.integration.action
-import org.apache.gluten.integration.Suite
+import org.apache.gluten.integration.{QuerySet, Suite}
trait Action {
def execute(suite: Suite): Boolean
@@ -24,6 +24,6 @@ trait Action {
object Actions {
trait QuerySelector {
- def select(suite: Suite): Seq[String]
+ def select(suite: Suite): QuerySet
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
index 422b185888..91a7391695 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Parameterized.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.integration.action
-import org.apache.gluten.integration.{QueryRunner, Suite}
+import org.apache.gluten.integration.{Query, QueryRunner, Suite}
import org.apache.gluten.integration.QueryRunner.QueryResult
import org.apache.gluten.integration.action.Actions.QuerySelector
import org.apache.gluten.integration.action.TableRender.Field
@@ -108,7 +108,7 @@ class Parameterized(
override def execute(suite: Suite): Boolean = {
val runner: QueryRunner =
- new QueryRunner(suite.queryResource(), suite.dataSource(),
suite.dataWritePath())
+ new QueryRunner(suite.dataSource(), suite.dataWritePath())
val sessionSwitcher = suite.sessionSwitcher
val testConf = suite.getTestConf()
@@ -125,7 +125,8 @@ class Parameterized(
sessionSwitcher.registerSession(coordinate.toString, conf)
}
- val runQueryIds = queries.select(suite).map(TestResultLine.QueryId(_))
+ val querySet = queries.select(suite)
+ val runQueryIds = querySet.queryIds.map(TestResultLine.QueryId(_))
val marks: Seq[TestResultLine.CoordMark] = coordinates.flatMap {
entry =>
@@ -143,7 +144,7 @@ class Parameterized(
Parameterized.warmUp(
runner,
sessionSwitcher.spark(),
- queryId.id,
+ querySet.getQuery(queryId.id),
coordinate,
suite.desc())
} finally {
@@ -164,7 +165,7 @@ class Parameterized(
Parameterized.runQuery(
runner,
sessionSwitcher.spark(),
- queryId.id,
+ querySet.getQuery(queryId.id),
coordinate,
suite.desc(),
explain,
@@ -363,22 +364,22 @@ object Parameterized {
private def runQuery(
runner: QueryRunner,
spark: SparkSession,
- id: String,
+ query: Query,
coordinate: Coordinate,
desc: String,
explain: Boolean,
metrics: Seq[String]): TestResultLine.Coord = {
- val testDesc = "Query %s [%s] %s".format(desc, id, coordinate)
- val result = runner.runQuery(spark, testDesc, id, explain, executorMetrics
= metrics)
+ val testDesc = "Query %s [%s] %s".format(desc, query.id, coordinate)
+ val result = runner.runQuery(spark, testDesc, query, explain,
executorMetrics = metrics)
TestResultLine.Coord(coordinate, result)
}
private def warmUp(
runner: QueryRunner,
session: SparkSession,
- id: String,
+ query: Query,
coordinate: Coordinate,
desc: String): Unit = {
- runQuery(runner, session, id, coordinate, desc, explain = false, Nil)
+ runQuery(runner, session, query, coordinate, desc, explain = false, Nil)
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Queries.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Queries.scala
index baba2fc793..4cf308835e 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Queries.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/Queries.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.integration.action
-import org.apache.gluten.integration.{QueryRunner, Suite, TableCreator}
+import org.apache.gluten.integration.{Query, QueryRunner, Suite, TableCreator}
import org.apache.gluten.integration.QueryRunner.QueryResult
import org.apache.gluten.integration.action.Actions.QuerySelector
import
org.apache.gluten.integration.action.TableRender.RowParser.FieldAppender.RowAppender
@@ -37,23 +37,23 @@ case class Queries(
import Queries._
override def execute(suite: Suite): Boolean = {
- val runQueryIds = queries.select(suite)
+ val querySet = queries.select(suite)
val runner: QueryRunner =
- new QueryRunner(suite.queryResource(), suite.dataSource(),
suite.dataWritePath())
+ new QueryRunner(suite.dataSource(), suite.dataWritePath())
val sessionSwitcher = suite.sessionSwitcher
sessionSwitcher.useSession("test", "Run Queries")
runner.createTables(suite.tableCreator(), sessionSwitcher.spark())
val results = (0 until iterations).flatMap {
iteration =>
println(s"Running tests (iteration $iteration)...")
- runQueryIds.map {
- queryId =>
+ querySet.queries.map {
+ query =>
try {
Queries.runQuery(
runner,
suite.tableCreator(),
sessionSwitcher.spark(),
- queryId,
+ query,
suite.desc(),
explain,
suite.getTestMetricMapper(),
@@ -172,18 +172,18 @@ object Queries {
runner: QueryRunner,
creator: TableCreator,
session: SparkSession,
- id: String,
+ query: Query,
desc: String,
explain: Boolean,
metricMapper: MetricMapper,
randomKillTasks: Boolean): TestResultLine = {
- println(s"Running query: $id...")
- val testDesc = "Query %s [%s]".format(desc, id)
+ println(s"Running query: ${query.id}...")
+ val testDesc = "Query %s [%s]".format(desc, query.id)
val result =
runner.runQuery(
session,
testDesc,
- id,
+ query,
explain = explain,
sqlMetricMapper = metricMapper,
randomKillTasks = randomKillTasks)
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/QueriesCompare.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/QueriesCompare.scala
index f8dabebd79..5f45d549b6 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/QueriesCompare.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/QueriesCompare.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.integration.action
-import org.apache.gluten.integration.{QueryRunner, Suite}
+import org.apache.gluten.integration.{Query, QueryRunner, Suite}
import org.apache.gluten.integration.QueryRunner.QueryResult
import org.apache.gluten.integration.action.Actions.QuerySelector
import org.apache.gluten.integration.action.QueriesCompare.TestResultLine
@@ -35,23 +35,23 @@ case class QueriesCompare(
override def execute(suite: Suite): Boolean = {
val runner: QueryRunner =
- new QueryRunner(suite.queryResource(), suite.dataSource(),
suite.dataWritePath())
- val runQueryIds = queries.select(suite)
+ new QueryRunner(suite.dataSource(), suite.dataWritePath())
+ val querySet = queries.select(suite)
val sessionSwitcher = suite.sessionSwitcher
sessionSwitcher.useSession("baseline", "Run Baseline Queries")
runner.createTables(suite.tableCreator(), sessionSwitcher.spark())
val baselineResults = (0 until iterations).flatMap {
iteration =>
- runQueryIds.map {
- queryId =>
- println(s"Running baseline query $queryId (iteration
$iteration)...")
+ querySet.queries.map {
+ query =>
+ println(s"Running baseline query ${query.id} (iteration
$iteration)...")
try {
QueriesCompare.runBaselineQuery(
runner,
sessionSwitcher.spark(),
suite.desc(),
- queryId,
+ query,
explain)
} finally {
if (noSessionReuse) {
@@ -66,15 +66,15 @@ case class QueriesCompare(
runner.createTables(suite.tableCreator(), sessionSwitcher.spark())
val testResults = (0 until iterations).flatMap {
iteration =>
- runQueryIds.map {
- queryId =>
- println(s"Running test query $queryId (iteration $iteration)...")
+ querySet.queries.map {
+ query =>
+ println(s"Running test query ${query.id} (iteration
$iteration)...")
try {
QueriesCompare.runTestQuery(
runner,
sessionSwitcher.spark(),
suite.desc(),
- queryId,
+ query,
explain)
} finally {
if (noSessionReuse) {
@@ -225,10 +225,10 @@ object QueriesCompare {
runner: QueryRunner,
session: SparkSession,
desc: String,
- id: String,
+ query: Query,
explain: Boolean): QueryResult = {
- val testDesc = "Baseline %s [%s]".format(desc, id)
- val result = runner.runQuery(session, testDesc, id, explain = explain)
+ val testDesc = "Baseline %s [%s]".format(desc, query.id)
+ val result = runner.runQuery(session, testDesc, query, explain = explain)
result
}
@@ -236,10 +236,10 @@ object QueriesCompare {
runner: QueryRunner,
session: SparkSession,
desc: String,
- id: String,
+ query: Query,
explain: Boolean): QueryResult = {
- val testDesc = "Query %s [%s]".format(desc, id)
- val result = runner.runQuery(session, testDesc, id, explain = explain)
+ val testDesc = "Query %s [%s]".format(desc, query.id)
+ val result = runner.runQuery(session, testDesc, query, explain = explain)
result
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/SparkShell.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/SparkShell.scala
index 71f99f80a6..f920977eea 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/SparkShell.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/SparkShell.scala
@@ -24,7 +24,7 @@ case class SparkShell() extends Action {
override def execute(suite: Suite): Boolean = {
suite.sessionSwitcher.useSession("test", "Spark CLI")
val runner: QueryRunner =
- new QueryRunner(suite.queryResource(), suite.dataSource(),
suite.dataWritePath())
+ new QueryRunner(suite.dataSource(), suite.dataWritePath())
runner.createTables(suite.tableCreator(), suite.sessionSwitcher.spark())
Main.sparkSession = suite.sessionSwitcher.spark()
Main.sparkContext = suite.sessionSwitcher.spark().sparkContext
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
index ee32d47861..5e07211af3 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.integration.clickbench
-import org.apache.gluten.integration.{DataGen, Suite, TableCreator}
+import org.apache.gluten.integration.{DataGen, QuerySet, Suite, TableCreator}
import org.apache.gluten.integration.action.Action
import org.apache.gluten.integration.metrics.MetricMapper
@@ -87,9 +87,9 @@ class ClickBenchSuite(
new ClickBenchDataGen(sessionSwitcher.spark(), dataWritePath())
}
- override private[integration] def queryResource(): String =
"/clickbench-queries"
-
- override private[integration] def allQueryIds(): Array[String] =
ALL_QUERY_IDS
+ override private[integration] def allQueries(): QuerySet = {
+ QuerySet.readFromResource("/clickbench-queries",
ClickBenchSuite.ALL_QUERY_IDS)
+ }
override private[integration] def desc(): String = "ClickBench"
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
index 869041e268..3372856ba0 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
@@ -16,10 +16,8 @@
*/
package org.apache.gluten.integration.ds
-import org.apache.gluten.integration.{DataGen, Suite, TableCreator}
+import org.apache.gluten.integration.{DataGen, QuerySet, Suite, TableCreator}
import org.apache.gluten.integration.action.Action
-import org.apache.gluten.integration.ds.TpcdsSuite.{ALL_QUERY_IDS,
HISTORY_WRITE_PATH, TPCDS_WRITE_RELATIVE_PATH}
-import org.apache.gluten.integration.h.TpchSuite.checkDataGenArgs
import org.apache.gluten.integration.metrics.MetricMapper
import org.apache.spark.SparkConf
@@ -99,12 +97,10 @@ class TpcdsSuite(
genPartitionedData)
}
- override private[integration] def queryResource(): String = {
- "/tpcds-queries"
+ override private[integration] def allQueries(): QuerySet = {
+ QuerySet.readFromResource("/tpcds-queries", TpcdsSuite.ALL_QUERY_IDS)
}
- override private[integration] def allQueryIds(): Array[String] =
ALL_QUERY_IDS
-
override private[integration] def desc(): String = "TPC-DS"
override def tableCreator(): TableCreator = TableCreator.discoverSchema()
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
index a381550b32..2894d359ac 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
@@ -16,9 +16,8 @@
*/
package org.apache.gluten.integration.h
-import org.apache.gluten.integration.{DataGen, Suite, TableCreator}
+import org.apache.gluten.integration.{DataGen, QuerySet, Suite, TableCreator}
import org.apache.gluten.integration.action.Action
-import org.apache.gluten.integration.h.TpchSuite.{HISTORY_WRITE_PATH,
TPCH_WRITE_RELATIVE_PATH}
import org.apache.gluten.integration.metrics.MetricMapper
import org.apache.spark.SparkConf
@@ -91,12 +90,10 @@ class TpchSuite(
typeModifiers())
}
- override private[integration] def queryResource(): String = {
- "/tpch-queries"
+ override private[integration] def allQueries(): QuerySet = {
+ QuerySet.readFromResource("/tpch-queries", TpchSuite.ALL_QUERY_IDS)
}
- override private[integration] def allQueryIds(): Array[String] =
TpchSuite.ALL_QUERY_IDS
-
override private[integration] def desc(): String = "TPC-H"
override def tableCreator(): TableCreator = TableCreator.discoverSchema()
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
index 09898b73bf..ed84746cb7 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql
+import org.apache.gluten.integration.Query
import org.apache.gluten.integration.metrics.{MetricMapper, MetricTag,
PlanMetric}
import org.apache.spark.{SparkContext, Success, TaskKilled}
@@ -59,7 +60,7 @@ object SparkQueryRunner {
def runQuery(
spark: SparkSession,
desc: String,
- queryPath: String,
+ query: Query,
explain: Boolean,
metricMapper: MetricMapper,
executorMetrics: Seq[String],
@@ -86,12 +87,11 @@ object SparkQueryRunner {
}
killTaskListener.foreach(sc.addSparkListener(_))
- println(s"Executing SQL query from resource path $queryPath...")
+ println(s"Executing SQL query from resource path ${query.path}...")
try {
val tracker = new QueryPlanningTracker
- val sql = resourceToString(queryPath)
val prev = System.nanoTime()
- val df = spark.sql(sql)
+ val df = spark.sql(query.sql)
val rows = QueryPlanningTracker.withTracker(tracker) {
df.collect()
}
@@ -107,7 +107,7 @@ object SparkQueryRunner {
val planMillis = sparkRulesMillis + otherRulesMillis
val collectedExecutorMetrics =
executorMetrics.map(name => (name, em.getMetricValue(name))).toMap
- val collectedSQLMetrics = collectSQLMetrics(queryPath, metricMapper,
df.queryExecution)
+ val collectedSQLMetrics = collectSQLMetrics(query.path, metricMapper,
df.queryExecution)
RunResult(
rows,
planMillis,
@@ -166,26 +166,6 @@ object SparkQueryRunner {
}
all.toSeq
}
-
- private def resourceToString(resource: String): String = {
- val inStream = SparkQueryRunner.getClass.getResourceAsStream(resource)
- Preconditions.checkNotNull(inStream)
- val outStream = new ByteArrayOutputStream
- try {
- var reading = true
- while (reading) {
- inStream.read() match {
- case -1 => reading = false
- case c => outStream.write(c)
- }
- }
- outStream.flush()
- } finally {
- inStream.close()
- }
- new String(outStream.toByteArray, StandardCharsets.UTF_8)
- }
-
}
case class RunResult(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]