This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 717b263d0 [VL] CI: Gluten-it: Print planning time as well as execution 
time in test report (#5616)
717b263d0 is described below

commit 717b263d01310d94c0fa4ec506d148ef00367448
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue May 7 09:25:38 2024 +0800

    [VL] CI: Gluten-it: Print planning time as well as execution time in test 
report (#5616)
---
 .../integration/tpc/action/Parameterized.scala     | 55 +++++++++-------
 .../gluten/integration/tpc/action/Queries.scala    | 34 +++++++---
 .../integration/tpc/action/QueriesCompare.scala    | 61 ++++++++++++------
 .../integration/tpc/action/TableFormatter.scala    | 74 ++++++++++++++++++++++
 .../scala/org/apache/spark/sql/QueryRunner.scala   | 12 +++-
 5 files changed, 183 insertions(+), 53 deletions(-)

diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
index b4f7a5394..6fc4e66d6 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala
@@ -25,7 +25,7 @@ import 
org.apache.gluten.integration.tpc.action.Actions.QuerySelector
 
 import scala.collection.immutable.Map
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 class Parameterized(
     scale: Double,
@@ -189,30 +189,15 @@ case class TestResultLine(
     succeed: Boolean,
     coordinate: Coordinate,
     rowCount: Option[Long],
+    planningTimeMillis: Option[Long],
     executionTimeMillis: Option[Long],
     metrics: Map[String, Long],
     errorMessage: Option[String])
 
-case class TestResultLines(
-    dimNames: Seq[String],
-    metricNames: Seq[String],
-    lines: Iterable[TestResultLine]) {
-  def print(): Unit = {
-    var fmt = "|%15s|%15s"
-    for (_ <- dimNames.indices) {
-      fmt = fmt + "|%20s"
-    }
-    for (_ <- metricNames.indices) {
-      fmt = fmt + "|%35s"
-    }
-    fmt = fmt + "|%30s|%30s|\n"
-    val fields = ArrayBuffer[String]("Query ID", "Succeed")
-    dimNames.foreach(dimName => fields.append(dimName))
-    metricNames.foreach(metricName => fields.append(metricName))
-    fields.append("Row Count")
-    fields.append("Query Time (Millis)")
-    printf(fmt, fields: _*)
-    lines.foreach { line =>
+object TestResultLine {
+  class Parser(dimNames: Seq[String], metricNames: Seq[String])
+      extends TableFormatter.RowParser[TestResultLine] {
+    override def parse(line: TestResultLine): Seq[Any] = {
       val values = ArrayBuffer[Any](line.queryId, line.succeed)
       dimNames.foreach { dimName =>
         val coordinate = line.coordinate.coordinate
@@ -226,9 +211,32 @@ case class TestResultLines(
         values.append(metrics.getOrElse(metricName, "N/A"))
       }
       values.append(line.rowCount.getOrElse("N/A"))
+      values.append(line.planningTimeMillis.getOrElse("N/A"))
       values.append(line.executionTimeMillis.getOrElse("N/A"))
-      printf(fmt, values: _*)
+      values
+    }
+  }
+}
+
+case class TestResultLines(
+    dimNames: Seq[String],
+    metricNames: Seq[String],
+    lines: Iterable[TestResultLine]) {
+  def print(): Unit = {
+    val fields = ListBuffer[String]("Query ID", "Succeed")
+    dimNames.foreach(dimName => fields.append(dimName))
+    metricNames.foreach(metricName => fields.append(metricName))
+    fields.append("Row Count")
+    fields.append("Planning Time (Millis)")
+    fields.append("Query Time (Millis)")
+    val formatter = TableFormatter.create[TestResultLine](fields: _*)(
+      new TestResultLine.Parser(dimNames, metricNames))
+
+    lines.foreach { line =>
+      formatter.appendRow(line)
     }
+
+    formatter.print(System.out)
   }
 }
 
@@ -257,6 +265,7 @@ object Parameterized {
         succeed = true,
         coordinate,
         Some(resultRows.length),
+        Some(result.planningTimeMillis),
         Some(result.executionTimeMillis),
         result.metrics,
         None)
@@ -266,7 +275,7 @@ object Parameterized {
         println(
           s"Error running query $id. " +
             s" Error: ${error.get}")
-        TestResultLine(id, succeed = false, coordinate, None, None, Map.empty, 
error)
+        TestResultLine(id, succeed = false, coordinate, None, None, None, 
Map.empty, error)
     }
   }
 
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
index c5f883189..edeb960fc 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala
@@ -100,24 +100,36 @@ object Queries {
       queryId: String,
       testPassed: Boolean,
       rowCount: Option[Long],
+      planningTimeMillis: Option[Long],
       executionTimeMillis: Option[Long],
       errorMessage: Option[String])
 
+  object TestResultLine {
+    implicit object Parser extends TableFormatter.RowParser[TestResultLine] {
+      override def parse(line: TestResultLine): Seq[Any] = {
+        Seq(
+          line.queryId,
+          line.testPassed,
+          line.rowCount.getOrElse("N/A"),
+          line.planningTimeMillis.getOrElse("N/A"),
+          line.executionTimeMillis.getOrElse("N/A"))
+      }
+    }
+  }
+
   private def printResults(results: List[TestResultLine]): Unit = {
-    printf(
-      "|%15s|%15s|%30s|%30s|\n",
+    val formatter = TableFormatter.create[TestResultLine](
       "Query ID",
       "Was Passed",
       "Row Count",
+      "Plan Time (Millis)",
       "Query Time (Millis)")
+
     results.foreach { line =>
-      printf(
-        "|%15s|%15s|%30s|%30s|\n",
-        line.queryId,
-        line.testPassed,
-        line.rowCount.getOrElse("N/A"),
-        line.executionTimeMillis.getOrElse("N/A"))
+      formatter.appendRow(line)
     }
+
+    formatter.print(System.out)
   }
 
   private def aggregate(succeed: List[TestResultLine], name: String): 
List[TestResultLine] = {
@@ -132,6 +144,9 @@ object Queries {
           if (r1.rowCount.nonEmpty && r2.rowCount.nonEmpty)
             Some(r1.rowCount.get + r2.rowCount.get)
           else None,
+          if (r1.planningTimeMillis.nonEmpty && r2.planningTimeMillis.nonEmpty)
+            Some(r1.planningTimeMillis.get + r2.planningTimeMillis.get)
+          else None,
           if (r1.executionTimeMillis.nonEmpty && 
r2.executionTimeMillis.nonEmpty)
             Some(r1.executionTimeMillis.get + r2.executionTimeMillis.get)
           else None,
@@ -164,6 +179,7 @@ object Queries {
         id,
         testPassed = true,
         Some(resultRows.length),
+        Some(result.planningTimeMillis),
         Some(result.executionTimeMillis),
         None)
     } catch {
@@ -172,7 +188,7 @@ object Queries {
         println(
           s"Error running query $id. " +
             s" Error: ${error.get}")
-        TestResultLine(id, testPassed = false, None, None, error)
+        TestResultLine(id, testPassed = false, None, None, None, error)
     }
   }
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
index 5e8e2d613..cfb3e7dc5 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala
@@ -101,37 +101,52 @@ object QueriesCompare {
       testPassed: Boolean,
       expectedRowCount: Option[Long],
       actualRowCount: Option[Long],
+      expectedPlanningTimeMillis: Option[Long],
+      actualPlanningTimeMillis: Option[Long],
       expectedExecutionTimeMillis: Option[Long],
       actualExecutionTimeMillis: Option[Long],
       errorMessage: Option[String])
 
+  object TestResultLine {
+    implicit object Parser extends TableFormatter.RowParser[TestResultLine] {
+      override def parse(line: TestResultLine): Seq[Any] = {
+        val timeVariation =
+          if (line.expectedExecutionTimeMillis.nonEmpty && 
line.actualExecutionTimeMillis.nonEmpty) {
+            Some(
+              ((line.expectedExecutionTimeMillis.get - 
line.actualExecutionTimeMillis.get).toDouble
+                / line.actualExecutionTimeMillis.get.toDouble) * 100)
+          } else None
+        Seq(
+          line.queryId,
+          line.testPassed,
+          line.expectedRowCount.getOrElse("N/A"),
+          line.actualRowCount.getOrElse("N/A"),
+          line.expectedPlanningTimeMillis.getOrElse("N/A"),
+          line.actualPlanningTimeMillis.getOrElse("N/A"),
+          line.expectedExecutionTimeMillis.getOrElse("N/A"),
+          line.actualExecutionTimeMillis.getOrElse("N/A"),
+          timeVariation.map("%15.2f%%".format(_)).getOrElse("N/A"))
+      }
+    }
+  }
+
   private def printResults(results: List[TestResultLine]): Unit = {
-    printf(
-      "|%15s|%15s|%30s|%30s|%30s|%30s|%30s|\n",
+    val formatter = TableFormatter.create[TestResultLine](
       "Query ID",
       "Was Passed",
       "Expected Row Count",
       "Actual Row Count",
+      "Baseline Planning Time (Millis)",
+      "Planning Time (Millis)",
       "Baseline Query Time (Millis)",
       "Query Time (Millis)",
       "Query Time Variation")
+
     results.foreach { line =>
-      val timeVariation =
-        if (line.expectedExecutionTimeMillis.nonEmpty && 
line.actualExecutionTimeMillis.nonEmpty) {
-          Some(
-            ((line.expectedExecutionTimeMillis.get - 
line.actualExecutionTimeMillis.get).toDouble
-              / line.actualExecutionTimeMillis.get.toDouble) * 100)
-        } else None
-      printf(
-        "|%15s|%15s|%30s|%30s|%30s|%30s|%30s|\n",
-        line.queryId,
-        line.testPassed,
-        line.expectedRowCount.getOrElse("N/A"),
-        line.actualRowCount.getOrElse("N/A"),
-        line.expectedExecutionTimeMillis.getOrElse("N/A"),
-        line.actualExecutionTimeMillis.getOrElse("N/A"),
-        timeVariation.map("%15.2f%%".format(_)).getOrElse("N/A"))
+      formatter.appendRow(line)
     }
+
+    formatter.print(System.out)
   }
 
   private def aggregate(succeed: List[TestResultLine], name: String): 
List[TestResultLine] = {
@@ -149,6 +164,12 @@ object QueriesCompare {
           if (r1.actualRowCount.nonEmpty && r2.actualRowCount.nonEmpty)
             Some(r1.actualRowCount.get + r2.actualRowCount.get)
           else None,
+          if (r1.expectedPlanningTimeMillis.nonEmpty && 
r2.expectedPlanningTimeMillis.nonEmpty)
+            Some(r1.expectedPlanningTimeMillis.get + 
r2.expectedPlanningTimeMillis.get)
+          else None,
+          if (r1.actualPlanningTimeMillis.nonEmpty && 
r2.actualPlanningTimeMillis.nonEmpty)
+            Some(r1.actualPlanningTimeMillis.get + 
r2.actualPlanningTimeMillis.get)
+          else None,
           if (r1.expectedExecutionTimeMillis.nonEmpty && 
r2.expectedExecutionTimeMillis.nonEmpty)
             Some(r1.expectedExecutionTimeMillis.get + 
r2.expectedExecutionTimeMillis.get)
           else None,
@@ -187,6 +208,8 @@ object QueriesCompare {
           testPassed = true,
           Some(expectedRows.length),
           Some(resultRows.length),
+          Some(expected.planningTimeMillis),
+          Some(result.planningTimeMillis),
           Some(expected.executionTimeMillis),
           Some(result.executionTimeMillis),
           None)
@@ -198,6 +221,8 @@ object QueriesCompare {
         testPassed = false,
         Some(expectedRows.length),
         Some(resultRows.length),
+        Some(expected.planningTimeMillis),
+        Some(result.planningTimeMillis),
         Some(expected.executionTimeMillis),
         Some(result.executionTimeMillis),
         error)
@@ -207,7 +232,7 @@ object QueriesCompare {
         println(
           s"Error running query $id. " +
             s" Error: ${error.get}")
-        TestResultLine(id, testPassed = false, None, None, None, None, error)
+        TestResultLine(id, testPassed = false, None, None, None, None, None, 
None, error)
     }
   }
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/TableFormatter.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/TableFormatter.scala
new file mode 100644
index 000000000..cb6ab7ebd
--- /dev/null
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/TableFormatter.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.integration.tpc.action
+
+import java.io.{OutputStream, PrintStream}
+import scala.collection.mutable
+
+trait TableFormatter[ROW <: Any] {
+  import TableFormatter._
+  def appendRow(row: ROW): Unit
+  def print(s: OutputStream): Unit
+}
+
+object TableFormatter {
+  def create[ROW <: Any](fields: String*)(
+      implicit parser: RowParser[ROW]): TableFormatter[ROW] = {
+    assert(fields.nonEmpty)
+    new Impl[ROW](Schema(fields), parser)
+  }
+
+  private case class Schema(fields: Seq[String])
+
+  private class Impl[ROW <: Any](schema: Schema, parser: RowParser[ROW])
+      extends TableFormatter[ROW] {
+    private val rows = mutable.ListBuffer[Seq[String]]()
+
+    override def appendRow(row: ROW): Unit = {
+      val parsed = parser.parse(row)
+      assert(parsed.size == schema.fields.size)
+      rows += parsed.map(_.toString)
+    }
+
+    override def print(s: OutputStream): Unit = {
+      val numFields = schema.fields.size
+      val widths = (0 until numFields)
+        .map { i =>
+          rows.map(_(i).length).max max schema.fields(i).length
+        }
+        .map(_ + 1)
+      val pBuilder = StringBuilder.newBuilder
+      pBuilder ++= "|"
+      widths.foreach { w =>
+        pBuilder ++= s"%${w}s|"
+      }
+      val pattern = pBuilder.toString()
+      val printer = new PrintStream(s)
+      printer.println(String.format(pattern, schema.fields: _*))
+      rows.foreach { r =>
+        printer.println(String.format(pattern, r: _*))
+      }
+      printer.flush()
+      printer.close()
+    }
+  }
+
+  trait RowParser[ROW <: Any] {
+    def parse(row: ROW): Seq[Any]
+  }
+}
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
index 332e56043..a5b699a1a 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
@@ -89,9 +89,11 @@ object QueryRunner {
       if (explain) {
         df.explain(extended = true)
       }
-      val millis = (System.nanoTime() - prev) / 1000000L
+      val planMillis =
+        df.queryExecution.tracker.phases.values.map(p => p.endTimeMs - 
p.startTimeMs).sum
+      val totalMillis = (System.nanoTime() - prev) / 1000000L
       val collectedMetrics = metrics.map(name => (name, 
em.getMetricValue(name))).toMap
-      RunResult(rows, millis, collectedMetrics)
+      RunResult(rows, planMillis, totalMillis - planMillis, collectedMetrics)
     } finally {
       sc.removeSparkListener(metricsListener)
       killTaskListener.foreach(l => {
@@ -124,7 +126,11 @@ object QueryRunner {
 
 }
 
-case class RunResult(rows: Seq[Row], executionTimeMillis: Long, metrics: 
Map[String, Long])
+case class RunResult(
+    rows: Seq[Row],
+    planningTimeMillis: Long,
+    executionTimeMillis: Long,
+    metrics: Map[String, Long])
 
 class MetricsListener(em: ExecutorMetrics) extends SparkListener {
   override def onExecutorMetricsUpdate(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to