This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 736e71b361 [GLUTEN-10618][VL] Update input iterator metrics name to
include more details (#10619)
736e71b361 is described below
commit 736e71b3611a861127b5cf7607772b071ea34bdc
Author: Rong Ma <[email protected]>
AuthorDate: Mon Sep 8 18:51:15 2025 +0800
[GLUTEN-10618][VL] Update input iterator metrics name to include more
details (#10619)
---
.../backendsapi/clickhouse/CHMetricsApi.scala | 3 ++-
.../gluten/backendsapi/velox/VeloxMetricsApi.scala | 16 ++++++++++++++--
.../org/apache/gluten/backendsapi/MetricsApi.scala | 3 ++-
.../ColumnarCollapseTransformStages.scala | 22 +++++++++++++++++-----
4 files changed, 35 insertions(+), 9 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 7d370a2a01..c8037d8c9e 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -43,7 +43,8 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genInputIteratorTransformerMetrics(
child: SparkPlan,
sparkContext: SparkContext,
- forBroadcast: Boolean): Map[String, SQLMetric] = {
+ forBroadcast: Boolean,
+ forShuffle: Boolean): Map[String, SQLMetric] = {
def metricsPlan(plan: SparkPlan): SparkPlan = {
plan match {
case ColumnarInputAdapter(child) => metricsPlan(child)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index f74e6c08be..901bad8be4 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -42,7 +42,8 @@ class VeloxMetricsApi extends MetricsApi with Logging {
override def genInputIteratorTransformerMetrics(
child: SparkPlan,
sparkContext: SparkContext,
- forBroadcast: Boolean): Map[String, SQLMetric] = {
+ forBroadcast: Boolean,
+ forShuffle: Boolean): Map[String, SQLMetric] = {
def metricsPlan(plan: SparkPlan): SparkPlan = {
plan match {
case ColumnarInputAdapter(child) => metricsPlan(child)
@@ -61,9 +62,20 @@ class VeloxMetricsApi extends MetricsApi with Logging {
)
}
+ val wallNanosMetric = if (forShuffle) {
+ // For input from shuffle, the time of shuffle read is inclusive to the
metrics.
+ SQLMetrics.createNanoTimingMetric(sparkContext, "time of reducer input")
+ } else if (forBroadcast) {
+ // For input from broadcast, the time of broadcasting is exclusive.
+ SQLMetrics.createNanoTimingMetric(sparkContext, "time of broadcast
input")
+ } else {
+ // For other occasions, e.g. fallback, union, the time of the previous
pipeline is inclusive.
+ SQLMetrics.createNanoTimingMetric(sparkContext, "time of operator input")
+ }
+
Map(
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time
count"),
- "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of
input iterator")
+ "wallNanos" -> wallNanosMetric
) ++ outputMetrics
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
index a5944f50d8..0bd4144cd1 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
@@ -37,7 +37,8 @@ trait MetricsApi extends Serializable {
def genInputIteratorTransformerMetrics(
child: SparkPlan,
sparkContext: SparkContext,
- forBroadcast: Boolean): Map[String, SQLMetric]
+ forBroadcast: Boolean,
+ forShuffle: Boolean): Map[String, SQLMetric]
def genInputIteratorTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index da9def0e9e..381fbb9b70 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -31,8 +31,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
+import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec,
ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ShuffleExchangeLike}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -52,7 +52,8 @@ case class InputIteratorTransformer(child: SparkPlan) extends
UnaryTransformSupp
BackendsApiManager.getMetricsApiInstance.genInputIteratorTransformerMetrics(
child,
sparkContext,
- forBroadcast())
+ forBroadcast(),
+ forShuffle())
override def simpleString(maxFields: Int): String = {
s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)}"
@@ -84,8 +85,19 @@ case class InputIteratorTransformer(child: SparkPlan)
extends UnaryTransformSupp
private def forBroadcast(): Boolean = {
child match {
- case ColumnarInputAdapter(c) if c.isInstanceOf[BroadcastQueryStageExec]
=> true
- case ColumnarInputAdapter(c) if c.isInstanceOf[BroadcastExchangeLike] =>
true
+ case ColumnarInputAdapter(c)
+ if c.isInstanceOf[BroadcastQueryStageExec] ||
+ c.isInstanceOf[BroadcastExchangeLike] =>
+ true
+ case _ => false
+ }
+ }
+
+ private def forShuffle(): Boolean = {
+ child match {
+ case ColumnarInputAdapter(c)
+ if c.isInstanceOf[ShuffleQueryStageExec] ||
c.isInstanceOf[ShuffleExchangeLike] =>
+ true
case _ => false
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]