This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 15c7c5808 [VL] Do not skip updating children's metrics while visiting
an operator with NoopMetricsUpdater (#5933)
15c7c5808 is described below
commit 15c7c5808eb26468b9fe0e237d5e5edf26490fa6
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Jun 5 16:20:58 2024 +0800
[VL] Do not skip updating children's metrics while visiting an operator
with NoopMetricsUpdater (#5933)
---
.../metrics/HashAggregateMetricsUpdater.scala | 2 +-
.../gluten/metrics/HashJoinMetricsUpdater.scala | 2 +-
.../org/apache/gluten/metrics/MetricsUtil.scala | 6 ++--
.../apache/gluten/execution/TopNTransformer.scala | 4 +--
.../gluten/execution/VeloxMetricsSuite.scala | 36 ++++++++++++++++++++++
.../gluten/execution/WholeStageTransformer.scala | 6 ++--
.../columnar/enumerated/RemoveFilter.scala | 6 ++--
.../org/apache/gluten/metrics/MetricsUpdater.scala | 31 ++++++++++++++-----
.../org/apache/gluten/metrics/MetricsUtil.scala | 36 ++++++++++++----------
9 files changed, 93 insertions(+), 36 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
index e2014e5b8..b035d7a04 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
@@ -65,7 +65,7 @@ class HashAggregateMetricsUpdater(val metrics: Map[String,
SQLMetric])
}
}
} catch {
- case e: Throwable =>
+ case e: Exception =>
logError(s"Updating native metrics failed due to ${e.getCause}.")
throw e
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala
index 3c35286c1..ca891bac2 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala
@@ -104,7 +104,7 @@ class HashJoinMetricsUpdater(val metrics: Map[String,
SQLMetric])
}
}
} catch {
- case e: Throwable =>
+ case e: Exception =>
logError(s"Updating native metrics failed due to ${e.getCause}.")
throw e
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index a6dfb3dbc..1376dc6a8 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -41,7 +41,7 @@ object MetricsUtil extends Logging {
case t: TransformSupport =>
MetricsUpdaterTree(t.metricsUpdater(),
t.children.map(treeifyMetricsUpdaters))
case _ =>
- MetricsUpdaterTree(NoopMetricsUpdater, Seq())
+ MetricsUpdaterTree(MetricsUpdater.Terminate, Seq())
}
}
@@ -107,7 +107,7 @@ object MetricsUtil extends Logging {
s"Updating native metrics failed due to the wrong size of metrics
data: " +
s"$numNativeMetrics")
()
- } else if (mutNode.updater == NoopMetricsUpdater) {
+ } else if (mutNode.updater == MetricsUpdater.Terminate) {
()
} else {
updateTransformerMetricsInternal(
@@ -159,7 +159,7 @@ object MetricsUtil extends Logging {
mutNode.children.foreach {
child =>
- if (child.updater != NoopMetricsUpdater) {
+ if (child.updater != MetricsUpdater.Terminate) {
val result = updateTransformerMetricsInternal(
child,
relMap,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala
index c2d12415c..01c89bee2 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/TopNTransformer.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
import org.apache.gluten.extension.ValidationResult
-import org.apache.gluten.metrics.{MetricsUpdater, NoopMetricsUpdater}
+import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.`type`.TypeBuilder
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
@@ -114,5 +114,5 @@ case class TopNTransformer(
}
}
- override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater // TODO
+ override def metricsUpdater(): MetricsUpdater = MetricsUpdater.Todo // TODO
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index ce8450fea..468f26259 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.CommandResultExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.internal.SQLConf
@@ -52,6 +53,11 @@ class VeloxMetricsSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
super.afterAll()
}
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ }
+
test("test sort merge join metrics") {
withSQLConf(
GlutenConfig.COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false",
@@ -143,6 +149,36 @@ class VeloxMetricsSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
+ test("Metrics of window") {
+ runQueryAndCompare("SELECT c1, c2, sum(c2) over (partition by c1) as s
FROM metrics_t1") {
+ df =>
+ val window = find(df.queryExecution.executedPlan) {
+ case _: WindowExecTransformer => true
+ case _ => false
+ }
+ assert(window.isDefined)
+ val metrics = window.get.metrics
+ assert(metrics("numOutputRows").value == 100)
+ assert(metrics("outputVectors").value == 2)
+ }
+ }
+
+ test("Metrics of noop filter's children") {
+ withSQLConf("spark.gluten.ras.enabled" -> "true") {
+ runQueryAndCompare("SELECT c1, c2 FROM metrics_t1 where c1 < 50") {
+ df =>
+ val scan = find(df.queryExecution.executedPlan) {
+ case _: FileSourceScanExecTransformer => true
+ case _ => false
+ }
+ assert(scan.isDefined)
+ val metrics = scan.get.metrics
+ assert(metrics("rawInputRows").value == 100)
+ assert(metrics("outputVectors").value == 1)
+ }
+ }
+ }
+
test("Write metrics") {
if (SparkShimLoader.getSparkVersion.startsWith("3.4")) {
withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index ed691fc09..7dfa0563d 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.expression._
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater,
NoopMetricsUpdater}
+import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater}
import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode}
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode}
@@ -350,7 +350,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
override def metricsUpdater(): MetricsUpdater = {
child match {
case transformer: TransformSupport => transformer.metricsUpdater()
- case _ => NoopMetricsUpdater
+ case _ => MetricsUpdater.None
}
}
@@ -361,7 +361,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
case _ => false
}
.map(_.asInstanceOf[TransformSupport].metricsUpdater())
- .getOrElse(NoopMetricsUpdater)
+ .getOrElse(MetricsUpdater.None)
}
override protected def withNewChildInternal(newChild: SparkPlan):
WholeStageTransformer =
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala
index b980c2422..5d7209dfb 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.extension.columnar.enumerated
import org.apache.gluten.execution._
-import org.apache.gluten.metrics.{MetricsUpdater, NoopMetricsUpdater}
+import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.ras.path.Pattern._
import org.apache.gluten.ras.path.Pattern.Matchers._
import org.apache.gluten.ras.rule.{RasRule, Shape}
@@ -54,7 +54,7 @@ object RemoveFilter extends RasRule[SparkPlan] {
leaf(clazz(classOf[BasicScanExecTransformer]))
).build())
- // A noop filter placeholder that indicates that all conditions are pushed
down to scan.
+ // A noop filter placeholder that indicates that all conditions were pushed
down to scan.
//
// This operator has zero cost in cost model to avoid planner from choosing
the
// original filter-scan that doesn't have all conditions pushed down to scan.
@@ -71,7 +71,7 @@ object RemoveFilter extends RasRule[SparkPlan] {
// spark.sql.adaptive.logLevel=ERROR.
case class NoopFilter(override val child: SparkPlan, override val output:
Seq[Attribute])
extends UnaryTransformSupport {
- override def metricsUpdater(): MetricsUpdater = NoopMetricsUpdater
+ override def metricsUpdater(): MetricsUpdater = MetricsUpdater.None
override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan = copy(newChild)
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala
b/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala
index 0a622ba0b..5201df3b3 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/metrics/MetricsUpdater.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.metrics
-import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
/**
@@ -26,16 +25,34 @@ import
org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
* TODO: place it to some other where since it's used not only by whole stage
facilities
*/
trait MetricsUpdater extends Serializable {
+ def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {}
+ def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = {}
+}
- def metrics: Map[String, SQLMetric]
+object MetricsUpdater {
+ // An empty metrics updater. Used when the operator generates native metrics
but
+ // it's yet unwanted to update the metrics in JVM side.
+ object Todo extends MetricsUpdater {}
- def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {}
+ // Used when the operator doesn't generate native metrics. It could be
because
+ // the operator doesn't generate any native query plan.
+ object None extends MetricsUpdater {
+ override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit =
+ throw new UnsupportedOperationException()
+ override def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit =
+ throw new UnsupportedOperationException()
+ }
- def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit = {}
+ // Indicates a branch of a MetricsUpdaterTree is terminated. It's not bound
to
+ // any operators.
+ object Terminate extends MetricsUpdater {
+ override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit =
+ throw new UnsupportedOperationException()
+ override def updateNativeMetrics(operatorMetrics: IOperatorMetrics): Unit =
+ throw new UnsupportedOperationException()
+ }
}
final case class MetricsUpdaterTree(updater: MetricsUpdater, children:
Seq[MetricsUpdaterTree])
-object NoopMetricsUpdater extends MetricsUpdater {
- override def metrics: Map[String, SQLMetric] = Map.empty
-}
+object MetricsUpdaterTree {}
diff --git
a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index f11800b89..0c387b429 100644
--- a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -54,10 +54,13 @@ object MetricsUtil extends Logging {
MetricsUpdaterTree(
smj.metricsUpdater(),
Seq(treeifyMetricsUpdaters(smj.bufferedPlan),
treeifyMetricsUpdaters(smj.streamedPlan)))
+ case t: TransformSupport if t.metricsUpdater() == MetricsUpdater.None
=>
+ assert(t.children.size == 1, "MetricsUpdater.None can only be used
on unary operator")
+ treeifyMetricsUpdaters(t.children.head)
case t: TransformSupport =>
MetricsUpdaterTree(t.metricsUpdater(),
t.children.map(treeifyMetricsUpdaters))
case _ =>
- MetricsUpdaterTree(NoopMetricsUpdater, Seq())
+ MetricsUpdaterTree(MetricsUpdater.Terminate, Seq())
}
}
@@ -180,6 +183,8 @@ object MetricsUtil extends Logging {
)
}
+ // FIXME: Metrics updating code is too magical to maintain. Tree-walking
algorithm should be made
+ // more declarative than by counting down these counters that don't have
fixed definition.
/**
* @return
* operator index and metrics index
@@ -192,6 +197,9 @@ object MetricsUtil extends Logging {
metricsIdx: Int,
joinParamsMap: JMap[JLong, JoinParams],
aggParamsMap: JMap[JLong, AggregationParams]): (JLong, Int) = {
+ if (mutNode.updater == MetricsUpdater.Terminate) {
+ return (operatorIdx, metricsIdx)
+ }
val operatorMetrics = new JArrayList[OperatorMetrics]()
var curMetricsIdx = metricsIdx
relMap
@@ -245,18 +253,16 @@ object MetricsUtil extends Logging {
mutNode.children.foreach {
child =>
- if (child.updater != NoopMetricsUpdater) {
- val result = updateTransformerMetricsInternal(
- child,
- relMap,
- newOperatorIdx,
- metrics,
- newMetricsIdx,
- joinParamsMap,
- aggParamsMap)
- newOperatorIdx = result._1
- newMetricsIdx = result._2
- }
+ val result = updateTransformerMetricsInternal(
+ child,
+ relMap,
+ newOperatorIdx,
+ metrics,
+ newMetricsIdx,
+ joinParamsMap,
+ aggParamsMap)
+ newOperatorIdx = result._1
+ newMetricsIdx = result._2
}
(newOperatorIdx, newMetricsIdx)
@@ -292,8 +298,6 @@ object MetricsUtil extends Logging {
val numNativeMetrics = metrics.inputRows.length
if (numNativeMetrics == 0) {
()
- } else if (mutNode.updater == NoopMetricsUpdater) {
- ()
} else {
updateTransformerMetricsInternal(
mutNode,
@@ -305,7 +309,7 @@ object MetricsUtil extends Logging {
aggParamsMap)
}
} catch {
- case e: Throwable =>
+ case e: Exception =>
logWarning(s"Updating native metrics failed due to ${e.getCause}.")
()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]