This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 717b263d0 [VL] CI: Gluten-it: Print planning time as well as execution
time in test report (#5616)
717b263d0 is described below
commit 717b263d01310d94c0fa4ec506d148ef00367448
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue May 7 09:25:38 2024 +0800
[VL] CI: Gluten-it: Print planning time as well as execution time in test
report (#5616)
---
.../integration/tpc/action/Parameterized.scala | 55 +++++++++-------
.../gluten/integration/tpc/action/Queries.scala | 34 +++++++---
.../integration/tpc/action/QueriesCompare.scala | 61 ++++++++++++------
.../integration/tpc/action/TableFormatter.scala | 74 ++++++++++++++++++++++
.../scala/org/apache/spark/sql/QueryRunner.scala | 12 +++-
5 files changed, 183 insertions(+), 53 deletions(-)
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
index b4f7a5394..6fc4e66d6 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
@@ -25,7 +25,7 @@ import
org.apache.gluten.integration.tpc.action.Actions.QuerySelector
import scala.collection.immutable.Map
import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
class Parameterized(
scale: Double,
@@ -189,30 +189,15 @@ case class TestResultLine(
succeed: Boolean,
coordinate: Coordinate,
rowCount: Option[Long],
+ planningTimeMillis: Option[Long],
executionTimeMillis: Option[Long],
metrics: Map[String, Long],
errorMessage: Option[String])
-case class TestResultLines(
- dimNames: Seq[String],
- metricNames: Seq[String],
- lines: Iterable[TestResultLine]) {
- def print(): Unit = {
- var fmt = "|%15s|%15s"
- for (_ <- dimNames.indices) {
- fmt = fmt + "|%20s"
- }
- for (_ <- metricNames.indices) {
- fmt = fmt + "|%35s"
- }
- fmt = fmt + "|%30s|%30s|\n"
- val fields = ArrayBuffer[String]("Query ID", "Succeed")
- dimNames.foreach(dimName => fields.append(dimName))
- metricNames.foreach(metricName => fields.append(metricName))
- fields.append("Row Count")
- fields.append("Query Time (Millis)")
- printf(fmt, fields: _*)
- lines.foreach { line =>
+object TestResultLine {
+ class Parser(dimNames: Seq[String], metricNames: Seq[String])
+ extends TableFormatter.RowParser[TestResultLine] {
+ override def parse(line: TestResultLine): Seq[Any] = {
val values = ArrayBuffer[Any](line.queryId, line.succeed)
dimNames.foreach { dimName =>
val coordinate = line.coordinate.coordinate
@@ -226,9 +211,32 @@ case class TestResultLines(
values.append(metrics.getOrElse(metricName, "N/A"))
}
values.append(line.rowCount.getOrElse("N/A"))
+ values.append(line.planningTimeMillis.getOrElse("N/A"))
values.append(line.executionTimeMillis.getOrElse("N/A"))
- printf(fmt, values: _*)
+ values
+ }
+ }
+}
+
+case class TestResultLines(
+ dimNames: Seq[String],
+ metricNames: Seq[String],
+ lines: Iterable[TestResultLine]) {
+ def print(): Unit = {
+ val fields = ListBuffer[String]("Query ID", "Succeed")
+ dimNames.foreach(dimName => fields.append(dimName))
+ metricNames.foreach(metricName => fields.append(metricName))
+ fields.append("Row Count")
+ fields.append("Planning Time (Millis)")
+ fields.append("Query Time (Millis)")
+ val formatter = TableFormatter.create[TestResultLine](fields: _*)(
+ new TestResultLine.Parser(dimNames, metricNames))
+
+ lines.foreach { line =>
+ formatter.appendRow(line)
}
+
+ formatter.print(System.out)
}
}
@@ -257,6 +265,7 @@ object Parameterized {
succeed = true,
coordinate,
Some(resultRows.length),
+ Some(result.planningTimeMillis),
Some(result.executionTimeMillis),
result.metrics,
None)
@@ -266,7 +275,7 @@ object Parameterized {
println(
s"Error running query $id. " +
s" Error: ${error.get}")
- TestResultLine(id, succeed = false, coordinate, None, None, Map.empty,
error)
+ TestResultLine(id, succeed = false, coordinate, None, None, None,
Map.empty, error)
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
index c5f883189..edeb960fc 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
@@ -100,24 +100,36 @@ object Queries {
queryId: String,
testPassed: Boolean,
rowCount: Option[Long],
+ planningTimeMillis: Option[Long],
executionTimeMillis: Option[Long],
errorMessage: Option[String])
+ object TestResultLine {
+ implicit object Parser extends TableFormatter.RowParser[TestResultLine] {
+ override def parse(line: TestResultLine): Seq[Any] = {
+ Seq(
+ line.queryId,
+ line.testPassed,
+ line.rowCount.getOrElse("N/A"),
+ line.planningTimeMillis.getOrElse("N/A"),
+ line.executionTimeMillis.getOrElse("N/A"))
+ }
+ }
+ }
+
private def printResults(results: List[TestResultLine]): Unit = {
- printf(
- "|%15s|%15s|%30s|%30s|\n",
+ val formatter = TableFormatter.create[TestResultLine](
"Query ID",
"Was Passed",
"Row Count",
+ "Plan Time (Millis)",
"Query Time (Millis)")
+
results.foreach { line =>
- printf(
- "|%15s|%15s|%30s|%30s|\n",
- line.queryId,
- line.testPassed,
- line.rowCount.getOrElse("N/A"),
- line.executionTimeMillis.getOrElse("N/A"))
+ formatter.appendRow(line)
}
+
+ formatter.print(System.out)
}
private def aggregate(succeed: List[TestResultLine], name: String):
List[TestResultLine] = {
@@ -132,6 +144,9 @@ object Queries {
if (r1.rowCount.nonEmpty && r2.rowCount.nonEmpty)
Some(r1.rowCount.get + r2.rowCount.get)
else None,
+ if (r1.planningTimeMillis.nonEmpty && r2.planningTimeMillis.nonEmpty)
+ Some(r1.planningTimeMillis.get + r2.planningTimeMillis.get)
+ else None,
if (r1.executionTimeMillis.nonEmpty &&
r2.executionTimeMillis.nonEmpty)
Some(r1.executionTimeMillis.get + r2.executionTimeMillis.get)
else None,
@@ -164,6 +179,7 @@ object Queries {
id,
testPassed = true,
Some(resultRows.length),
+ Some(result.planningTimeMillis),
Some(result.executionTimeMillis),
None)
} catch {
@@ -172,7 +188,7 @@ object Queries {
println(
s"Error running query $id. " +
s" Error: ${error.get}")
- TestResultLine(id, testPassed = false, None, None, error)
+ TestResultLine(id, testPassed = false, None, None, None, error)
}
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
index 5e8e2d613..cfb3e7dc5 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
@@ -101,37 +101,52 @@ object QueriesCompare {
testPassed: Boolean,
expectedRowCount: Option[Long],
actualRowCount: Option[Long],
+ expectedPlanningTimeMillis: Option[Long],
+ actualPlanningTimeMillis: Option[Long],
expectedExecutionTimeMillis: Option[Long],
actualExecutionTimeMillis: Option[Long],
errorMessage: Option[String])
+ object TestResultLine {
+ implicit object Parser extends TableFormatter.RowParser[TestResultLine] {
+ override def parse(line: TestResultLine): Seq[Any] = {
+ val timeVariation =
+ if (line.expectedExecutionTimeMillis.nonEmpty &&
line.actualExecutionTimeMillis.nonEmpty) {
+ Some(
+ ((line.expectedExecutionTimeMillis.get -
line.actualExecutionTimeMillis.get).toDouble
+ / line.actualExecutionTimeMillis.get.toDouble) * 100)
+ } else None
+ Seq(
+ line.queryId,
+ line.testPassed,
+ line.expectedRowCount.getOrElse("N/A"),
+ line.actualRowCount.getOrElse("N/A"),
+ line.expectedPlanningTimeMillis.getOrElse("N/A"),
+ line.actualPlanningTimeMillis.getOrElse("N/A"),
+ line.expectedExecutionTimeMillis.getOrElse("N/A"),
+ line.actualExecutionTimeMillis.getOrElse("N/A"),
+ timeVariation.map("%15.2f%%".format(_)).getOrElse("N/A"))
+ }
+ }
+ }
+
private def printResults(results: List[TestResultLine]): Unit = {
- printf(
- "|%15s|%15s|%30s|%30s|%30s|%30s|%30s|\n",
+ val formatter = TableFormatter.create[TestResultLine](
"Query ID",
"Was Passed",
"Expected Row Count",
"Actual Row Count",
+ "Baseline Planning Time (Millis)",
+ "Planning Time (Millis)",
"Baseline Query Time (Millis)",
"Query Time (Millis)",
"Query Time Variation")
+
results.foreach { line =>
- val timeVariation =
- if (line.expectedExecutionTimeMillis.nonEmpty &&
line.actualExecutionTimeMillis.nonEmpty) {
- Some(
- ((line.expectedExecutionTimeMillis.get -
line.actualExecutionTimeMillis.get).toDouble
- / line.actualExecutionTimeMillis.get.toDouble) * 100)
- } else None
- printf(
- "|%15s|%15s|%30s|%30s|%30s|%30s|%30s|\n",
- line.queryId,
- line.testPassed,
- line.expectedRowCount.getOrElse("N/A"),
- line.actualRowCount.getOrElse("N/A"),
- line.expectedExecutionTimeMillis.getOrElse("N/A"),
- line.actualExecutionTimeMillis.getOrElse("N/A"),
- timeVariation.map("%15.2f%%".format(_)).getOrElse("N/A"))
+ formatter.appendRow(line)
}
+
+ formatter.print(System.out)
}
private def aggregate(succeed: List[TestResultLine], name: String):
List[TestResultLine] = {
@@ -149,6 +164,12 @@ object QueriesCompare {
if (r1.actualRowCount.nonEmpty && r2.actualRowCount.nonEmpty)
Some(r1.actualRowCount.get + r2.actualRowCount.get)
else None,
+ if (r1.expectedPlanningTimeMillis.nonEmpty &&
r2.expectedPlanningTimeMillis.nonEmpty)
+ Some(r1.expectedPlanningTimeMillis.get +
r2.expectedPlanningTimeMillis.get)
+ else None,
+ if (r1.actualPlanningTimeMillis.nonEmpty &&
r2.actualPlanningTimeMillis.nonEmpty)
+ Some(r1.actualPlanningTimeMillis.get +
r2.actualPlanningTimeMillis.get)
+ else None,
if (r1.expectedExecutionTimeMillis.nonEmpty &&
r2.expectedExecutionTimeMillis.nonEmpty)
Some(r1.expectedExecutionTimeMillis.get +
r2.expectedExecutionTimeMillis.get)
else None,
@@ -187,6 +208,8 @@ object QueriesCompare {
testPassed = true,
Some(expectedRows.length),
Some(resultRows.length),
+ Some(expected.planningTimeMillis),
+ Some(result.planningTimeMillis),
Some(expected.executionTimeMillis),
Some(result.executionTimeMillis),
None)
@@ -198,6 +221,8 @@ object QueriesCompare {
testPassed = false,
Some(expectedRows.length),
Some(resultRows.length),
+ Some(expected.planningTimeMillis),
+ Some(result.planningTimeMillis),
Some(expected.executionTimeMillis),
Some(result.executionTimeMillis),
error)
@@ -207,7 +232,7 @@ object QueriesCompare {
println(
s"Error running query $id. " +
s" Error: ${error.get}")
- TestResultLine(id, testPassed = false, None, None, None, None, error)
+ TestResultLine(id, testPassed = false, None, None, None, None, None,
None, error)
}
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/TableFormatter.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/TableFormatter.scala
new file mode 100644
index 000000000..cb6ab7ebd
--- /dev/null
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/TableFormatter.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.integration.tpc.action
+
+import java.io.{OutputStream, PrintStream}
+import scala.collection.mutable
+
+trait TableFormatter[ROW <: Any] {
+ import TableFormatter._
+ def appendRow(row: ROW): Unit
+ def print(s: OutputStream): Unit
+}
+
+object TableFormatter {
+ def create[ROW <: Any](fields: String*)(
+ implicit parser: RowParser[ROW]): TableFormatter[ROW] = {
+ assert(fields.nonEmpty)
+ new Impl[ROW](Schema(fields), parser)
+ }
+
+ private case class Schema(fields: Seq[String])
+
+ private class Impl[ROW <: Any](schema: Schema, parser: RowParser[ROW])
+ extends TableFormatter[ROW] {
+ private val rows = mutable.ListBuffer[Seq[String]]()
+
+ override def appendRow(row: ROW): Unit = {
+ val parsed = parser.parse(row)
+ assert(parsed.size == schema.fields.size)
+ rows += parsed.map(_.toString)
+ }
+
+ override def print(s: OutputStream): Unit = {
+ val numFields = schema.fields.size
+ val widths = (0 until numFields)
+ .map { i =>
+ rows.map(_(i).length).max max schema.fields(i).length
+ }
+ .map(_ + 1)
+ val pBuilder = StringBuilder.newBuilder
+ pBuilder ++= "|"
+ widths.foreach { w =>
+ pBuilder ++= s"%${w}s|"
+ }
+ val pattern = pBuilder.toString()
+ val printer = new PrintStream(s)
+ printer.println(String.format(pattern, schema.fields: _*))
+ rows.foreach { r =>
+ printer.println(String.format(pattern, r: _*))
+ }
+ printer.flush()
+ printer.close()
+ }
+ }
+
+ trait RowParser[ROW <: Any] {
+ def parse(row: ROW): Seq[Any]
+ }
+}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
index 332e56043..a5b699a1a 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
@@ -89,9 +89,11 @@ object QueryRunner {
if (explain) {
df.explain(extended = true)
}
- val millis = (System.nanoTime() - prev) / 1000000L
+ val planMillis =
+ df.queryExecution.tracker.phases.values.map(p => p.endTimeMs -
p.startTimeMs).sum
+ val totalMillis = (System.nanoTime() - prev) / 1000000L
val collectedMetrics = metrics.map(name => (name,
em.getMetricValue(name))).toMap
- RunResult(rows, millis, collectedMetrics)
+ RunResult(rows, planMillis, totalMillis - planMillis, collectedMetrics)
} finally {
sc.removeSparkListener(metricsListener)
killTaskListener.foreach(l => {
@@ -124,7 +126,11 @@ object QueryRunner {
}
-case class RunResult(rows: Seq[Row], executionTimeMillis: Long, metrics:
Map[String, Long])
+case class RunResult(
+ rows: Seq[Row],
+ planningTimeMillis: Long,
+ executionTimeMillis: Long,
+ metrics: Map[String, Long])
class MetricsListener(em: ExecutorMetrics) extends SparkListener {
override def onExecutorMetricsUpdate(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]