This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 46b87e5b7 [VL] CI: Split SF30 job to 4 jobs to speed up execution
(#5526)
46b87e5b7 is described below
commit 46b87e5b7f4d49581bb38b84b7fe7f260c4121e4
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Apr 25 15:02:55 2024 +0800
[VL] CI: Split SF30 job to 4 jobs to speed up execution (#5526)
---
.github/workflows/velox_docker.yml | 9 +-
.../integration/tpc/command/Parameterized.java | 14 +--
.../gluten/integration/tpc/command/Queries.java | 16 +--
.../integration/tpc/command/QueriesCompare.java | 16 +--
.../integration/tpc/command/QueriesMixin.java | 137 +++++++++++++++++++++
.../apache/gluten/integration/tpc/TpcSuite.scala | 33 +----
.../gluten/integration/tpc/action/Actions.scala | 3 +
.../integration/tpc/action/Parameterized.scala | 13 +-
.../gluten/integration/tpc/action/Queries.scala | 7 +-
.../integration/tpc/action/QueriesCompare.scala | 8 +-
10 files changed, 172 insertions(+), 84 deletions(-)
diff --git a/.github/workflows/velox_docker.yml
b/.github/workflows/velox_docker.yml
index 42e102ba7..20b6a5703 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -345,6 +345,7 @@ jobs:
fail-fast: false
matrix:
spark: [ "spark-3.4" ]
+ shard: [ "1/4", "2/4", "3/4", "4/4" ]
runs-on: ubuntu-20.04
steps:
- name: Maximize build disk space
@@ -380,15 +381,15 @@ jobs:
mvn -ntp clean install -P${{ matrix.spark }}
GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh data-gen-only --local
--benchmark-type=h -s=30.0 --threads=12
GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh data-gen-only --local
--benchmark-type=ds -s=30.0 --threads=12
- - name: TPC-H / TPC-DS SF30.0 Parquet local spark3.4
+ - name: TPC-H / TPC-DS SF30.0 Parquet local ${{ matrix.spark }}
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak
-s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1
\
- --skip-data-gen \
+ --skip-data-gen --shard=${{ matrix.shard }} \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak
-s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1
\
- --skip-data-gen
+ --skip-data-gen --shard=${{ matrix.shard }}
run-tpc-test-centos8-uniffle:
needs: build-native-lib
@@ -853,4 +854,4 @@ jobs:
cd $GITHUB_WORKSPACE/
export MAVEN_HOME=/usr/lib/maven
export PATH=${PATH}:${MAVEN_HOME}/bin
- mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Pceleborn
-Piceberg -Pdelta -Pspark-ut
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/"
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
\ No newline at end of file
+ mvn -ntp clean install -Pspark-3.5 -Pbackends-velox -Pceleborn
-Piceberg -Pdelta -Pspark-ut
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/"
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/Parameterized.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/Parameterized.java
index 1f94cb256..bf7d89fe6 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/Parameterized.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/Parameterized.java
@@ -43,14 +43,8 @@ public class Parameterized implements Callable<Integer> {
@CommandLine.Mixin
private DataGenMixin dataGenMixin;
- @CommandLine.Option(names = {"--queries"}, description = "Set a
comma-separated list of query IDs to run, run all queries if not specified.
Example: --queries=q1,q6", split = ",")
- private String[] queries = new String[0];
-
- @CommandLine.Option(names = {"--excluded-queries"}, description = "Set a
comma-separated list of query IDs to exclude. Example:
--exclude-queries=q1,q6", split = ",")
- private String[] excludedQueries = new String[0];
-
- @CommandLine.Option(names = {"--iterations"}, description = "How many
iterations to run", defaultValue = "1")
- private int iterations;
+ @CommandLine.Mixin
+ private QueriesMixin queriesMixin;
@CommandLine.Option(names = {"--warmup-iterations"}, description = "Dry-run
iterations before actually run the test", defaultValue = "0")
private int warmupIterations;
@@ -136,7 +130,9 @@ public class Parameterized implements Callable<Integer> {
)).collect(Collectors.toList())).asScala();
org.apache.gluten.integration.tpc.action.Parameterized parameterized =
- new
org.apache.gluten.integration.tpc.action.Parameterized(dataGenMixin.getScale(),
this.queries, excludedQueries, iterations, warmupIterations, parsedDims,
excludedCombinations, metrics);
+ new
org.apache.gluten.integration.tpc.action.Parameterized(dataGenMixin.getScale(),
queriesMixin.queries(),
+ queriesMixin.explain(), queriesMixin.iterations(),
warmupIterations, parsedDims,
+ excludedCombinations, metrics);
return mixin.runActions(ArrayUtils.addAll(dataGenMixin.makeActions(),
parameterized));
}
}
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/Queries.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/Queries.java
index 81a2bb17b..53d46cc5e 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/Queries.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/Queries.java
@@ -32,17 +32,8 @@ public class Queries implements Callable<Integer> {
@CommandLine.Mixin
private DataGenMixin dataGenMixin;
- @CommandLine.Option(names = {"--queries"}, description = "Set a
comma-separated list of query IDs to run, run all queries if not specified.
Example: --queries=q1,q6", split = ",")
- private String[] queries = new String[0];
-
- @CommandLine.Option(names = {"--excluded-queries"}, description = "Set a
comma-separated list of query IDs to exclude. Example:
--exclude-queries=q1,q6", split = ",")
- private String[] excludedQueries = new String[0];
-
- @CommandLine.Option(names = {"--explain"}, description = "Output explain
result for queries", defaultValue = "false")
- private boolean explain;
-
- @CommandLine.Option(names = {"--iterations"}, description = "How many
iterations to run", defaultValue = "1")
- private int iterations;
+ @CommandLine.Mixin
+ private QueriesMixin queriesMixin;
@CommandLine.Option(names = {"--random-kill-tasks"}, description = "Every
single task will get killed and retried after running for some time",
defaultValue = "false")
private boolean randomKillTasks;
@@ -50,7 +41,8 @@ public class Queries implements Callable<Integer> {
@Override
public Integer call() throws Exception {
org.apache.gluten.integration.tpc.action.Queries queries =
- new
org.apache.gluten.integration.tpc.action.Queries(dataGenMixin.getScale(),
this.queries, this.excludedQueries, explain, iterations, randomKillTasks);
+ new
org.apache.gluten.integration.tpc.action.Queries(dataGenMixin.getScale(),
queriesMixin.queries(),
+ queriesMixin.explain(), queriesMixin.iterations(),
randomKillTasks);
return mixin.runActions(ArrayUtils.addAll(dataGenMixin.makeActions(),
queries));
}
}
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/QueriesCompare.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/QueriesCompare.java
index 702de78b3..d4c0c684d 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/QueriesCompare.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/QueriesCompare.java
@@ -32,22 +32,14 @@ public class QueriesCompare implements Callable<Integer> {
@CommandLine.Mixin
private DataGenMixin dataGenMixin;
- @CommandLine.Option(names = {"--queries"}, description = "Set a
comma-separated list of query IDs to run, run all queries if not specified.
Example: --queries=q1,q6", split = ",")
- private String[] queries = new String[0];
-
- @CommandLine.Option(names = {"--excluded-queries"}, description = "Set a
comma-separated list of query IDs to exclude. Example:
--exclude-queries=q1,q6", split = ",")
- private String[] excludedQueries = new String[0];
-
- @CommandLine.Option(names = {"--explain"}, description = "Output explain
result for queries", defaultValue = "false")
- private boolean explain;
-
- @CommandLine.Option(names = {"--iterations"}, description = "How many
iterations to run", defaultValue = "1")
- private int iterations;
+ @CommandLine.Mixin
+ private QueriesMixin queriesMixin;
@Override
public Integer call() throws Exception {
org.apache.gluten.integration.tpc.action.QueriesCompare queriesCompare =
- new
org.apache.gluten.integration.tpc.action.QueriesCompare(dataGenMixin.getScale(),
this.queries, this.excludedQueries, explain, iterations);
+ new
org.apache.gluten.integration.tpc.action.QueriesCompare(dataGenMixin.getScale(),
queriesMixin.queries(),
+ queriesMixin.explain(), queriesMixin.iterations());
return mixin.runActions(ArrayUtils.addAll(dataGenMixin.makeActions(),
queriesCompare));
}
}
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/QueriesMixin.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/QueriesMixin.java
new file mode 100644
index 000000000..f51488364
--- /dev/null
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/command/QueriesMixin.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.integration.tpc.command;
+
+import com.google.common.base.Preconditions;
+import org.apache.gluten.integration.tpc.TpcSuite;
+import org.apache.gluten.integration.tpc.action.Actions;
+import picocli.CommandLine;
+import scala.collection.Seq;
+import scala.collection.JavaConverters;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class QueriesMixin {
+ @CommandLine.Option(names = {"--queries"}, description = "Set a
comma-separated list of query IDs to run, run all queries if not specified.
Example: --queries=q1,q6", split = ",")
+ private String[] queries = new String[0];
+
+ @CommandLine.Option(names = {"--excluded-queries"}, description = "Set a
comma-separated list of query IDs to exclude. Example:
--exclude-queries=q1,q6", split = ",")
+ private String[] excludedQueries = new String[0];
+
+ @CommandLine.Option(names = {"--shard"}, description = "Divide the queries
to execute into N shards, then pick one single shard and run it. Example:
--shard=1/3", defaultValue = "1/1")
+ private String shard;
+
+ @CommandLine.Option(names = {"--explain"}, description = "Output explain
result for queries", defaultValue = "false")
+ private boolean explain;
+
+ @CommandLine.Option(names = {"--iterations"}, description = "How many
iterations to run", defaultValue = "1")
+ private int iterations;
+
+ public boolean explain() {
+ return explain;
+ }
+
+ public int iterations() {
+ return iterations;
+ }
+
+ public Actions.QuerySelector queries() {
+ return new Actions.QuerySelector() {
+ @Override
+ public Seq<String> select(TpcSuite suite) {
+ final List<String> all = select0(suite);
+ final Division div = Division.parse(shard);
+ final List<String> out = div(all, div);
+ System.out.println("About to run queries: " + out + "... ");
+ return JavaConverters.asScalaBuffer(out);
+ }
+
+ private List<String> div(List<String> from, Division div) {
+ final int queryCount = from.size();
+ final int shardCount = div.shardCount;
+ final int least = queryCount / shardCount;
+ final int shardIdx = div.shard - 1;
+ final int shardStart = shardIdx * least;
+ final int numQueriesInShard;
+ if (shardIdx == shardCount - 1) {
+ final int remaining = queryCount - least * shardCount;
+ numQueriesInShard = least + remaining;
+ } else {
+ numQueriesInShard = least;
+ }
+ final List<String> out = new ArrayList<>();
+ for (int i = shardStart; i < shardStart + numQueriesInShard; i++) {
+ out.add(from.get(i));
+ }
+ return out;
+ }
+
+ private List<String> select0(TpcSuite suite) {
+ final String[] queryIds = queries;
+ final String[] excludedQueryIds = excludedQueries;
+ if (queryIds.length > 0 && excludedQueryIds.length > 0) {
+ throw new IllegalArgumentException(
+ "Should not specify queries and excluded queries at the same
time");
+ }
+ String[] all = suite.allQueryIds();
+ Set<String> allSet = new HashSet<>(Arrays.asList(all));
+ if (queryIds.length > 0) {
+ for (String id : queryIds) {
+ if (!allSet.contains(id)) {
+ throw new IllegalArgumentException("Invalid query ID: " + id);
+ }
+ }
+ return Arrays.asList(queryIds);
+ }
+ if (excludedQueryIds.length > 0) {
+ for (String id : excludedQueryIds) {
+ if (!allSet.contains(id)) {
+ throw new IllegalArgumentException("Invalid query ID to exclude:
" + id);
+ }
+ }
+ Set<String> excludedSet = new
HashSet<>(Arrays.asList(excludedQueryIds));
+ return Arrays.stream(all)
+ .filter(id -> !excludedSet.contains(id))
+ .collect(Collectors.toList());
+ }
+ return Arrays.asList(all);
+ }
+ };
+ }
+
+ private static class Division {
+ private final int shard;
+ private final int shardCount;
+
+ private Division(int shard, int shardCount) {
+ this.shard = shard;
+ this.shardCount = shardCount;
+ }
+
+ private static Division parse(String shard) {
+ String[] parts = shard.split("/");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Invalid shard: " + shard);
+ }
+ int s = Integer.parseInt(parts[0]);
+ int c = Integer.parseInt(parts[1]);
+ Preconditions.checkArgument(s >= 1 && c >= 1 && s <= c, "Invalid picked
shard: " + shard);
+ return new Division(s, c);
+ }
+ }
+}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcSuite.scala
index c612e3c6a..058657976 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcSuite.scala
@@ -180,35 +180,4 @@ abstract class TpcSuite(
}
-object TpcSuite {
- implicit class TpcSuiteImplicits(suite: TpcSuite) {
- def selectQueryIds(queryIds: Array[String], excludedQueryIds:
Array[String]): Array[String] = {
- if (queryIds.nonEmpty && excludedQueryIds.nonEmpty) {
- throw new IllegalArgumentException(
- "Should not specify queries and excluded queries at the same time")
- }
- val all = suite.allQueryIds()
- val allSet = all.toSet
- if (queryIds.nonEmpty) {
- assert(
- queryIds.forall(id => allSet.contains(id)),
- "Invalid query ID: " + queryIds.collectFirst {
- case id if !allSet.contains(id)=>
- id
- }.get)
- return queryIds
- }
- if (excludedQueryIds.nonEmpty) {
- assert(
- excludedQueryIds.forall(id => allSet.contains(id)),
- "Invalid query ID to exclude: " + excludedQueryIds.collectFirst {
- case id if !allSet.contains(id)=>
- id
- }.get)
- val excludedSet = excludedQueryIds.toSet
- return all.filterNot(excludedSet.contains)
- }
- all
- }
- }
-}
+object TpcSuite {}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Actions.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Actions.scala
index 90713c50c..5e49b2888 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Actions.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Actions.scala
@@ -23,4 +23,7 @@ trait Action {
}
object Actions {
+ trait QuerySelector {
+ def select(suite: TpcSuite): Seq[String]
+ }
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
index 0842eb80e..f066659ef 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
@@ -18,11 +18,10 @@ package org.apache.gluten.integration.tpc.action
import org.apache.gluten.integration.stat.RamStat
import org.apache.gluten.integration.tpc.{TpcRunner, TpcSuite}
-
import org.apache.spark.sql.ConfUtils.ConfImplicits._
import org.apache.spark.sql.SparkSessionSwitcher
-
import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.gluten.integration.tpc.action.Actions.QuerySelector
import scala.collection.immutable.Map
import scala.collection.mutable
@@ -30,8 +29,8 @@ import scala.collection.mutable.ArrayBuffer
class Parameterized(
scale: Double,
- queryIds: Array[String],
- excludedQueryIds: Array[String],
+ queries: QuerySelector,
+ explain: Boolean,
iterations: Int,
warmupIterations: Int,
configDimensions: Seq[Dim],
@@ -121,7 +120,7 @@ class Parameterized(
sessionSwitcher.registerSession(coordinate.toString, conf)
}
- val runQueryIds = tpcSuite.selectQueryIds(queryIds, excludedQueryIds)
+ val runQueryIds = queries.select(tpcSuite)
// warm up
(0 until warmupIterations).foreach {
@@ -145,6 +144,7 @@ class Parameterized(
queryId,
coordinate,
tpcSuite.desc(),
+ explain,
metrics)
}
}.toList
@@ -255,6 +255,7 @@ object Parameterized {
id: String,
coordinate: Coordinate,
desc: String,
+ explain: Boolean,
metrics: Array[String]) = {
println(s"Running query: $id...")
try {
@@ -262,7 +263,7 @@ object Parameterized {
sessionSwitcher.useSession(coordinate.toString, testDesc)
runner.createTables(sessionSwitcher.spark())
val result =
- runner.runTpcQuery(sessionSwitcher.spark(), testDesc, id, explain =
false, metrics)
+ runner.runTpcQuery(sessionSwitcher.spark(), testDesc, id, explain,
metrics)
val resultRows = result.rows
println(
s"Successfully ran query $id. " +
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
index c5e9cc9d9..dc4ffe622 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
@@ -18,20 +18,19 @@ package org.apache.gluten.integration.tpc.action
import org.apache.gluten.integration.stat.RamStat
import org.apache.gluten.integration.tpc.{TpcRunner, TpcSuite}
-
import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.gluten.integration.tpc.action.Actions.QuerySelector
case class Queries(
scale: Double,
- queryIds: Array[String],
- excludedQueryIds: Array[String],
+ queries: QuerySelector,
explain: Boolean,
iterations: Int,
randomKillTasks: Boolean)
extends Action {
override def execute(tpcSuite: TpcSuite): Boolean = {
- val runQueryIds = tpcSuite.selectQueryIds(queryIds, excludedQueryIds)
+ val runQueryIds = queries.select(tpcSuite)
val runner: TpcRunner = new TpcRunner(tpcSuite.queryResource(),
tpcSuite.dataWritePath(scale))
val results = (0 until iterations).flatMap {
iteration =>
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
index 14dcfe12c..f841b5827 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
@@ -18,22 +18,20 @@ package org.apache.gluten.integration.tpc.action
import org.apache.gluten.integration.stat.RamStat
import org.apache.gluten.integration.tpc.{TpcRunner, TpcSuite}
-
import org.apache.spark.sql.{SparkSessionSwitcher, TestUtils}
-
import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.gluten.integration.tpc.action.Actions.QuerySelector
case class QueriesCompare(
scale: Double,
- queryIds: Array[String],
- excludedQueryIds: Array[String],
+ queries: QuerySelector,
explain: Boolean,
iterations: Int)
extends Action {
override def execute(tpcSuite: TpcSuite): Boolean = {
val runner: TpcRunner = new TpcRunner(tpcSuite.queryResource(),
tpcSuite.dataWritePath(scale))
- val runQueryIds = tpcSuite.selectQueryIds(queryIds, excludedQueryIds)
+ val runQueryIds = queries.select(tpcSuite)
val results = (0 until iterations).flatMap {
iteration =>
println(s"Running tests (iteration $iteration)...")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]