This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 736e71b361 [GLUTEN-10618][VL] Update input iterator metrics name to 
include more details (#10619)
736e71b361 is described below

commit 736e71b3611a861127b5cf7607772b071ea34bdc
Author: Rong Ma <[email protected]>
AuthorDate: Mon Sep 8 18:51:15 2025 +0800

    [GLUTEN-10618][VL] Update input iterator metrics name to include more 
details (#10619)
---
 .../backendsapi/clickhouse/CHMetricsApi.scala      |  3 ++-
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala | 16 ++++++++++++++--
 .../org/apache/gluten/backendsapi/MetricsApi.scala |  3 ++-
 .../ColumnarCollapseTransformStages.scala          | 22 +++++++++++++++++-----
 4 files changed, 35 insertions(+), 9 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 7d370a2a01..c8037d8c9e 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -43,7 +43,8 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
   override def genInputIteratorTransformerMetrics(
       child: SparkPlan,
       sparkContext: SparkContext,
-      forBroadcast: Boolean): Map[String, SQLMetric] = {
+      forBroadcast: Boolean,
+      forShuffle: Boolean): Map[String, SQLMetric] = {
     def metricsPlan(plan: SparkPlan): SparkPlan = {
       plan match {
         case ColumnarInputAdapter(child) => metricsPlan(child)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index f74e6c08be..901bad8be4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -42,7 +42,8 @@ class VeloxMetricsApi extends MetricsApi with Logging {
   override def genInputIteratorTransformerMetrics(
       child: SparkPlan,
       sparkContext: SparkContext,
-      forBroadcast: Boolean): Map[String, SQLMetric] = {
+      forBroadcast: Boolean,
+      forShuffle: Boolean): Map[String, SQLMetric] = {
     def metricsPlan(plan: SparkPlan): SparkPlan = {
       plan match {
         case ColumnarInputAdapter(child) => metricsPlan(child)
@@ -61,9 +62,20 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       )
     }
 
+    val wallNanosMetric = if (forShuffle) {
+      // For input from shuffle, the time of shuffle read is inclusive to the 
metrics.
+      SQLMetrics.createNanoTimingMetric(sparkContext, "time of reducer input")
+    } else if (forBroadcast) {
+      // For input from broadcast, the time of broadcasting is exclusive.
+      SQLMetrics.createNanoTimingMetric(sparkContext, "time of broadcast 
input")
+    } else {
+      // For other occasions, e.g. fallback, union, the time of the previous 
pipeline is inclusive.
+      SQLMetrics.createNanoTimingMetric(sparkContext, "time of operator input")
+    }
+
     Map(
       "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time 
count"),
-      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of 
input iterator")
+      "wallNanos" -> wallNanosMetric
     ) ++ outputMetrics
   }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
index a5944f50d8..0bd4144cd1 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
@@ -37,7 +37,8 @@ trait MetricsApi extends Serializable {
   def genInputIteratorTransformerMetrics(
       child: SparkPlan,
       sparkContext: SparkContext,
-      forBroadcast: Boolean): Map[String, SQLMetric]
+      forBroadcast: Boolean,
+      forShuffle: Boolean): Map[String, SQLMetric]
 
   def genInputIteratorTransformerMetricsUpdater(
       metrics: Map[String, SQLMetric],
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index da9def0e9e..381fbb9b70 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -31,8 +31,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
+import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, 
ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ShuffleExchangeLike}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
@@ -52,7 +52,8 @@ case class InputIteratorTransformer(child: SparkPlan) extends 
UnaryTransformSupp
     
BackendsApiManager.getMetricsApiInstance.genInputIteratorTransformerMetrics(
       child,
       sparkContext,
-      forBroadcast())
+      forBroadcast(),
+      forShuffle())
 
   override def simpleString(maxFields: Int): String = {
     s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)}"
@@ -84,8 +85,19 @@ case class InputIteratorTransformer(child: SparkPlan) 
extends UnaryTransformSupp
 
   private def forBroadcast(): Boolean = {
     child match {
-      case ColumnarInputAdapter(c) if c.isInstanceOf[BroadcastQueryStageExec] 
=> true
-      case ColumnarInputAdapter(c) if c.isInstanceOf[BroadcastExchangeLike] => 
true
+      case ColumnarInputAdapter(c)
+          if c.isInstanceOf[BroadcastQueryStageExec] ||
+            c.isInstanceOf[BroadcastExchangeLike] =>
+        true
+      case _ => false
+    }
+  }
+
+  private def forShuffle(): Boolean = {
+    child match {
+      case ColumnarInputAdapter(c)
+          if c.isInstanceOf[ShuffleQueryStageExec] || 
c.isInstanceOf[ShuffleExchangeLike] =>
+        true
       case _ => false
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to