This is an automated email from the ASF dual-hosted git repository.
rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new bab1a83ff0 [GLUTEN-10149][VL] Fix incorrect NestedLoopJoin metrics
(#10169)
bab1a83ff0 is described below
commit bab1a83ff0f6cfc50aaef3cbc1faade899142b78
Author: Jaime Pan <[email protected]>
AuthorDate: Tue Jul 15 00:32:28 2025 +0800
[GLUTEN-10149][VL] Fix incorrect NestedLoopJoin metrics (#10169)
---
.../gluten/backendsapi/velox/VeloxMetricsApi.scala | 64 +++++++++++++++++++---
.../org/apache/gluten/metrics/MetricsUtil.scala | 14 ++---
.../metrics/NestedLoopJoinMetricsUpdater.scala | 64 +++++++++++++++++-----
.../gluten/execution/VeloxMetricsSuite.scala | 20 +++++++
4 files changed, 134 insertions(+), 28 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 14d4059679..00c6ddb587 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -569,15 +569,63 @@ class VeloxMetricsApi extends MetricsApi with Logging {
override def genNestedLoopJoinTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
- "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
- "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of
NestedLoopJoin"),
- "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time
count"),
- "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak
memory bytes"),
- "numMemoryAllocations" -> SQLMetrics.createMetric(
+ "nestedLoopJoinBuildInputRows" -> SQLMetrics.createMetric(
sparkContext,
- "number of memory allocations")
+ "number of nested loop join build input rows"),
+ "nestedLoopJoinBuildOutputRows" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of nested loop join build output rows"),
+ "nestedLoopJoinBuildOutputVectors" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of nested loop join build output vectors"),
+ "nestedLoopJoinBuildOutputBytes" -> SQLMetrics.createSizeMetric(
+ sparkContext,
+ "number of nested loop join build output bytes"),
+ "nestedLoopJoinBuildCpuCount" -> SQLMetrics.createMetric(
+ sparkContext,
+ "nested loop join build cpu wall time count"),
+ "nestedLoopJoinBuildWallNanos" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "time of nested loop join build"),
+ "nestedLoopJoinBuildPeakMemoryBytes" -> SQLMetrics.createSizeMetric(
+ sparkContext,
+ "nested loop join build peak memory bytes"),
+ "nestedLoopJoinBuildNumMemoryAllocations" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of nested loop join build memory allocations"),
+ "nestedLoopJoinProbeInputRows" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of nested loop join probe input rows"),
+ "nestedLoopJoinProbeOutputRows" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of nested loop join probe output rows"),
+ "nestedLoopJoinProbeOutputVectors" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of nested loop join probe output vectors"),
+ "nestedLoopJoinProbeOutputBytes" -> SQLMetrics.createSizeMetric(
+ sparkContext,
+ "number of nested loop join probe output bytes"),
+ "nestedLoopJoinProbeCpuCount" -> SQLMetrics.createMetric(
+ sparkContext,
+ "nested loop join probe cpu wall time count"),
+ "nestedLoopJoinProbeWallNanos" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "time of nested loop join probe"),
+ "nestedLoopJoinProbePeakMemoryBytes" -> SQLMetrics.createSizeMetric(
+ sparkContext,
+ "nested loop join probe peak memory bytes"),
+ "nestedLoopJoinProbeNumMemoryAllocations" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of nested loop join probe memory allocations"),
+ "postProjectionCpuCount" -> SQLMetrics.createMetric(
+ sparkContext,
+ "postProject cpu wall time count"),
+ "postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "time of postProjection"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
+ "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes")
)
override def genNestedLoopJoinTransformerMetricsUpdater(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index dc32fc5d96..a7abc93621 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -229,8 +229,13 @@ object MetricsUtil extends Logging {
})
mutNode.updater match {
- case ju: HashJoinMetricsUpdater =>
- // JoinRel outputs two suites of metrics respectively for hash build
and hash probe.
+ case smj: SortMergeJoinMetricsUpdater =>
+ smj.updateJoinMetrics(
+ operatorMetrics,
+ metrics.getSingleMetrics,
+ joinParamsMap.get(operatorIdx))
+ case ju: JoinMetricsUpdaterBase =>
+ // JoinRel and CrossRel output two suites of metrics respectively for
build and probe.
// Therefore, fetch one more suite of metrics here.
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
curMetricsIdx -= 1
@@ -238,11 +243,6 @@ object MetricsUtil extends Logging {
operatorMetrics,
metrics.getSingleMetrics,
joinParamsMap.get(operatorIdx))
- case smj: SortMergeJoinMetricsUpdater =>
- smj.updateJoinMetrics(
- operatorMetrics,
- metrics.getSingleMetrics,
- joinParamsMap.get(operatorIdx))
case u: UnionMetricsUpdater =>
// JoinRel outputs two suites of metrics respectively for hash build
and hash probe.
// Therefore, fetch one more suite of metrics here.
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
index 3b058a8783..2f55849889 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
@@ -16,20 +16,58 @@
*/
package org.apache.gluten.metrics
+import org.apache.gluten.substrait.JoinParams
+
import org.apache.spark.sql.execution.metric.SQLMetric
-class NestedLoopJoinMetricsUpdater(val metrics: Map[String, SQLMetric])
extends MetricsUpdater {
-
- override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
- if (opMetrics != null) {
- val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
- metrics("numOutputRows") += operatorMetrics.outputRows
- metrics("outputVectors") += operatorMetrics.outputVectors
- metrics("outputBytes") += operatorMetrics.outputBytes
- metrics("cpuCount") += operatorMetrics.cpuCount
- metrics("wallNanos") += operatorMetrics.wallNanos
- metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
- metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
- }
+import java.util
+
+class NestedLoopJoinMetricsUpdater(override val metrics: Map[String,
SQLMetric])
+ extends JoinMetricsUpdaterBase(metrics) {
+
+ val nestedLoopJoinBuildInputRows: SQLMetric =
metrics("nestedLoopJoinBuildInputRows")
+ val nestedLoopJoinBuildOutputRows: SQLMetric =
metrics("nestedLoopJoinBuildOutputRows")
+ val nestedLoopJoinBuildOutputVectors: SQLMetric =
metrics("nestedLoopJoinBuildOutputVectors")
+ val nestedLoopJoinBuildOutputBytes: SQLMetric =
metrics("nestedLoopJoinBuildOutputBytes")
+ val nestedLoopJoinBuildCpuCount: SQLMetric =
metrics("nestedLoopJoinBuildCpuCount")
+ val nestedLoopJoinBuildWallNanos: SQLMetric =
metrics("nestedLoopJoinBuildWallNanos")
+ val nestedLoopJoinBuildPeakMemoryBytes: SQLMetric =
metrics("nestedLoopJoinBuildPeakMemoryBytes")
+ val nestedLoopJoinBuildNumMemoryAllocations: SQLMetric = metrics(
+ "nestedLoopJoinBuildNumMemoryAllocations")
+
+ val nestedLoopJoinProbeInputRows: SQLMetric =
metrics("nestedLoopJoinProbeInputRows")
+ val nestedLoopJoinProbeOutputRows: SQLMetric =
metrics("nestedLoopJoinProbeOutputRows")
+ val nestedLoopJoinProbeOutputVectors: SQLMetric =
metrics("nestedLoopJoinProbeOutputVectors")
+ val nestedLoopJoinProbeOutputBytes: SQLMetric =
metrics("nestedLoopJoinProbeOutputBytes")
+ val nestedLoopJoinProbeCpuCount: SQLMetric =
metrics("nestedLoopJoinProbeCpuCount")
+ val nestedLoopJoinProbeWallNanos: SQLMetric =
metrics("nestedLoopJoinProbeWallNanos")
+ val nestedLoopJoinProbePeakMemoryBytes: SQLMetric =
metrics("nestedLoopJoinProbePeakMemoryBytes")
+ val nestedLoopJoinProbeNumMemoryAllocations: SQLMetric = metrics(
+ "nestedLoopJoinProbeNumMemoryAllocations")
+
+ override protected def updateJoinMetricsInternal(
+ joinMetrics: util.ArrayList[OperatorMetrics],
+ joinParams: JoinParams): Unit = {
+ // nestedLoopJoinProbe
+ val nestedLoopJoinProbeMetrics = joinMetrics.get(0)
+ nestedLoopJoinProbeInputRows += nestedLoopJoinProbeMetrics.inputRows
+ nestedLoopJoinProbeOutputRows += nestedLoopJoinProbeMetrics.outputRows
+ nestedLoopJoinProbeOutputVectors +=
nestedLoopJoinProbeMetrics.outputVectors
+ nestedLoopJoinProbeOutputBytes += nestedLoopJoinProbeMetrics.outputBytes
+ nestedLoopJoinProbeCpuCount += nestedLoopJoinProbeMetrics.cpuCount
+ nestedLoopJoinProbeWallNanos += nestedLoopJoinProbeMetrics.wallNanos
+ nestedLoopJoinProbePeakMemoryBytes +=
nestedLoopJoinProbeMetrics.peakMemoryBytes
+ nestedLoopJoinProbeNumMemoryAllocations +=
nestedLoopJoinProbeMetrics.numMemoryAllocations
+
+ // nestedLoopJoinBuild
+ val nestedLoopJoinBuildMetrics = joinMetrics.get(1)
+ nestedLoopJoinBuildInputRows += nestedLoopJoinBuildMetrics.inputRows
+ nestedLoopJoinBuildOutputRows += nestedLoopJoinBuildMetrics.outputRows
+ nestedLoopJoinBuildOutputVectors +=
nestedLoopJoinBuildMetrics.outputVectors
+ nestedLoopJoinBuildOutputBytes += nestedLoopJoinBuildMetrics.outputBytes
+ nestedLoopJoinBuildCpuCount += nestedLoopJoinBuildMetrics.cpuCount
+ nestedLoopJoinBuildWallNanos += nestedLoopJoinBuildMetrics.wallNanos
+ nestedLoopJoinBuildPeakMemoryBytes +=
nestedLoopJoinBuildMetrics.peakMemoryBytes
+ nestedLoopJoinBuildNumMemoryAllocations +=
nestedLoopJoinBuildMetrics.numMemoryAllocations
}
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index 38e2bcc45e..fe4b0c4736 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -277,4 +277,24 @@ class VeloxMetricsSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
assert(metrics("storageReadBytes").value > 0)
assert(metrics("ramReadBytes").value == 0)
}
+
+ test("test nested loop join metrics") {
+ withSQLConf() {
+ runQueryAndCompare(
+ "select /*+ BROADCAST(t2) */ c1, c2 from (select c1 from metrics_t1
where c1 = 50) t1 ," +
+ "(select c2 from metrics_t2) t2;"
+ ) {
+ df =>
+ val join = find(df.queryExecution.executedPlan) {
+ case _: BroadcastNestedLoopJoinExecTransformer => true
+ case _ => false
+ }
+ assert(join.isDefined)
+ val metrics = join.get.metrics
+ assert(metrics("nestedLoopJoinBuildInputRows").value == 200)
+ assert(metrics("nestedLoopJoinProbeInputRows").value == 1)
+ assert(metrics("numOutputRows").value == 200 * 1)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]