This is an automated email from the ASF dual-hosted git repository.

rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new bab1a83ff0 [GLUTEN-10149][VL] Fix incorrect NestedLoopJoin metrics 
(#10169)
bab1a83ff0 is described below

commit bab1a83ff0f6cfc50aaef3cbc1faade899142b78
Author: Jaime Pan <[email protected]>
AuthorDate: Tue Jul 15 00:32:28 2025 +0800

    [GLUTEN-10149][VL] Fix incorrect NestedLoopJoin metrics (#10169)
---
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala | 64 +++++++++++++++++++---
 .../org/apache/gluten/metrics/MetricsUtil.scala    | 14 ++---
 .../metrics/NestedLoopJoinMetricsUpdater.scala     | 64 +++++++++++++++++-----
 .../gluten/execution/VeloxMetricsSuite.scala       | 20 +++++++
 4 files changed, 134 insertions(+), 28 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 14d4059679..00c6ddb587 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -569,15 +569,63 @@ class VeloxMetricsApi extends MetricsApi with Logging {
   override def genNestedLoopJoinTransformerMetrics(
       sparkContext: SparkContext): Map[String, SQLMetric] =
     Map(
-      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
-      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
-      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of 
NestedLoopJoin"),
-      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time 
count"),
-      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak 
memory bytes"),
-      "numMemoryAllocations" -> SQLMetrics.createMetric(
+      "nestedLoopJoinBuildInputRows" -> SQLMetrics.createMetric(
         sparkContext,
-        "number of memory allocations")
+        "number of nested loop join build input rows"),
+      "nestedLoopJoinBuildOutputRows" -> SQLMetrics.createMetric(
+        sparkContext,
+        "number of nested loop join build output rows"),
+      "nestedLoopJoinBuildOutputVectors" -> SQLMetrics.createMetric(
+        sparkContext,
+        "number of nested loop join build output vectors"),
+      "nestedLoopJoinBuildOutputBytes" -> SQLMetrics.createSizeMetric(
+        sparkContext,
+        "number of nested loop join build output bytes"),
+      "nestedLoopJoinBuildCpuCount" -> SQLMetrics.createMetric(
+        sparkContext,
+        "nested loop join build cpu wall time count"),
+      "nestedLoopJoinBuildWallNanos" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of nested loop join build"),
+      "nestedLoopJoinBuildPeakMemoryBytes" -> SQLMetrics.createSizeMetric(
+        sparkContext,
+        "nested loop join build peak memory bytes"),
+      "nestedLoopJoinBuildNumMemoryAllocations" -> SQLMetrics.createMetric(
+        sparkContext,
+        "number of nested loop join build memory allocations"),
+      "nestedLoopJoinProbeInputRows" -> SQLMetrics.createMetric(
+        sparkContext,
+        "number of nested loop join probe input rows"),
+      "nestedLoopJoinProbeOutputRows" -> SQLMetrics.createMetric(
+        sparkContext,
+        "number of nested loop join probe output rows"),
+      "nestedLoopJoinProbeOutputVectors" -> SQLMetrics.createMetric(
+        sparkContext,
+        "number of nested loop join probe output vectors"),
+      "nestedLoopJoinProbeOutputBytes" -> SQLMetrics.createSizeMetric(
+        sparkContext,
+        "number of nested loop join probe output bytes"),
+      "nestedLoopJoinProbeCpuCount" -> SQLMetrics.createMetric(
+        sparkContext,
+        "nested loop join probe cpu wall time count"),
+      "nestedLoopJoinProbeWallNanos" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of nested loop join probe"),
+      "nestedLoopJoinProbePeakMemoryBytes" -> SQLMetrics.createSizeMetric(
+        sparkContext,
+        "nested loop join probe peak memory bytes"),
+      "nestedLoopJoinProbeNumMemoryAllocations" -> SQLMetrics.createMetric(
+        sparkContext,
+        "number of nested loop join probe memory allocations"),
+      "postProjectionCpuCount" -> SQLMetrics.createMetric(
+        sparkContext,
+        "postProject cpu wall time count"),
+      "postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of postProjection"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+      "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
+      "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes")
     )
 
   override def genNestedLoopJoinTransformerMetricsUpdater(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index dc32fc5d96..a7abc93621 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -229,8 +229,13 @@ object MetricsUtil extends Logging {
         })
 
     mutNode.updater match {
-      case ju: HashJoinMetricsUpdater =>
-        // JoinRel outputs two suites of metrics respectively for hash build 
and hash probe.
+      case smj: SortMergeJoinMetricsUpdater =>
+        smj.updateJoinMetrics(
+          operatorMetrics,
+          metrics.getSingleMetrics,
+          joinParamsMap.get(operatorIdx))
+      case ju: JoinMetricsUpdaterBase =>
+        // JoinRel and CrossRel output two suites of metrics respectively for 
build and probe.
         // Therefore, fetch one more suite of metrics here.
         operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
         curMetricsIdx -= 1
@@ -238,11 +243,6 @@ object MetricsUtil extends Logging {
           operatorMetrics,
           metrics.getSingleMetrics,
           joinParamsMap.get(operatorIdx))
-      case smj: SortMergeJoinMetricsUpdater =>
-        smj.updateJoinMetrics(
-          operatorMetrics,
-          metrics.getSingleMetrics,
-          joinParamsMap.get(operatorIdx))
       case u: UnionMetricsUpdater =>
         // JoinRel outputs two suites of metrics respectively for hash build 
and hash probe.
         // Therefore, fetch one more suite of metrics here.
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
index 3b058a8783..2f55849889 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/NestedLoopJoinMetricsUpdater.scala
@@ -16,20 +16,58 @@
  */
 package org.apache.gluten.metrics
 
+import org.apache.gluten.substrait.JoinParams
+
 import org.apache.spark.sql.execution.metric.SQLMetric
 
-class NestedLoopJoinMetricsUpdater(val metrics: Map[String, SQLMetric]) 
extends MetricsUpdater {
-
-  override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
-    if (opMetrics != null) {
-      val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      metrics("numOutputRows") += operatorMetrics.outputRows
-      metrics("outputVectors") += operatorMetrics.outputVectors
-      metrics("outputBytes") += operatorMetrics.outputBytes
-      metrics("cpuCount") += operatorMetrics.cpuCount
-      metrics("wallNanos") += operatorMetrics.wallNanos
-      metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
-      metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
-    }
+import java.util
+
+class NestedLoopJoinMetricsUpdater(override val metrics: Map[String, 
SQLMetric])
+  extends JoinMetricsUpdaterBase(metrics) {
+
+  val nestedLoopJoinBuildInputRows: SQLMetric = 
metrics("nestedLoopJoinBuildInputRows")
+  val nestedLoopJoinBuildOutputRows: SQLMetric = 
metrics("nestedLoopJoinBuildOutputRows")
+  val nestedLoopJoinBuildOutputVectors: SQLMetric = 
metrics("nestedLoopJoinBuildOutputVectors")
+  val nestedLoopJoinBuildOutputBytes: SQLMetric = 
metrics("nestedLoopJoinBuildOutputBytes")
+  val nestedLoopJoinBuildCpuCount: SQLMetric = 
metrics("nestedLoopJoinBuildCpuCount")
+  val nestedLoopJoinBuildWallNanos: SQLMetric = 
metrics("nestedLoopJoinBuildWallNanos")
+  val nestedLoopJoinBuildPeakMemoryBytes: SQLMetric = 
metrics("nestedLoopJoinBuildPeakMemoryBytes")
+  val nestedLoopJoinBuildNumMemoryAllocations: SQLMetric = metrics(
+    "nestedLoopJoinBuildNumMemoryAllocations")
+
+  val nestedLoopJoinProbeInputRows: SQLMetric = 
metrics("nestedLoopJoinProbeInputRows")
+  val nestedLoopJoinProbeOutputRows: SQLMetric = 
metrics("nestedLoopJoinProbeOutputRows")
+  val nestedLoopJoinProbeOutputVectors: SQLMetric = 
metrics("nestedLoopJoinProbeOutputVectors")
+  val nestedLoopJoinProbeOutputBytes: SQLMetric = 
metrics("nestedLoopJoinProbeOutputBytes")
+  val nestedLoopJoinProbeCpuCount: SQLMetric = 
metrics("nestedLoopJoinProbeCpuCount")
+  val nestedLoopJoinProbeWallNanos: SQLMetric = 
metrics("nestedLoopJoinProbeWallNanos")
+  val nestedLoopJoinProbePeakMemoryBytes: SQLMetric = 
metrics("nestedLoopJoinProbePeakMemoryBytes")
+  val nestedLoopJoinProbeNumMemoryAllocations: SQLMetric = metrics(
+    "nestedLoopJoinProbeNumMemoryAllocations")
+
+  override protected def updateJoinMetricsInternal(
+      joinMetrics: util.ArrayList[OperatorMetrics],
+      joinParams: JoinParams): Unit = {
+    // nestedLoopJoinProbe
+    val nestedLoopJoinProbeMetrics = joinMetrics.get(0)
+    nestedLoopJoinProbeInputRows += nestedLoopJoinProbeMetrics.inputRows
+    nestedLoopJoinProbeOutputRows += nestedLoopJoinProbeMetrics.outputRows
+    nestedLoopJoinProbeOutputVectors += 
nestedLoopJoinProbeMetrics.outputVectors
+    nestedLoopJoinProbeOutputBytes += nestedLoopJoinProbeMetrics.outputBytes
+    nestedLoopJoinProbeCpuCount += nestedLoopJoinProbeMetrics.cpuCount
+    nestedLoopJoinProbeWallNanos += nestedLoopJoinProbeMetrics.wallNanos
+    nestedLoopJoinProbePeakMemoryBytes += 
nestedLoopJoinProbeMetrics.peakMemoryBytes
+    nestedLoopJoinProbeNumMemoryAllocations += 
nestedLoopJoinProbeMetrics.numMemoryAllocations
+
+    // nestedLoopJoinBuild
+    val nestedLoopJoinBuildMetrics = joinMetrics.get(1)
+    nestedLoopJoinBuildInputRows += nestedLoopJoinBuildMetrics.inputRows
+    nestedLoopJoinBuildOutputRows += nestedLoopJoinBuildMetrics.outputRows
+    nestedLoopJoinBuildOutputVectors += 
nestedLoopJoinBuildMetrics.outputVectors
+    nestedLoopJoinBuildOutputBytes += nestedLoopJoinBuildMetrics.outputBytes
+    nestedLoopJoinBuildCpuCount += nestedLoopJoinBuildMetrics.cpuCount
+    nestedLoopJoinBuildWallNanos += nestedLoopJoinBuildMetrics.wallNanos
+    nestedLoopJoinBuildPeakMemoryBytes += 
nestedLoopJoinBuildMetrics.peakMemoryBytes
+    nestedLoopJoinBuildNumMemoryAllocations += 
nestedLoopJoinBuildMetrics.numMemoryAllocations
   }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index 38e2bcc45e..fe4b0c4736 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -277,4 +277,24 @@ class VeloxMetricsSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
     assert(metrics("storageReadBytes").value > 0)
     assert(metrics("ramReadBytes").value == 0)
   }
+
+  test("test nested loop join metrics") {
+    withSQLConf() {
+      runQueryAndCompare(
+        "select /*+ BROADCAST(t2) */ c1, c2 from (select c1  from metrics_t1 
where c1 = 50) t1 ," +
+          "(select c2 from metrics_t2) t2;"
+      ) {
+        df =>
+          val join = find(df.queryExecution.executedPlan) {
+            case _: BroadcastNestedLoopJoinExecTransformer => true
+            case _ => false
+          }
+          assert(join.isDefined)
+          val metrics = join.get.metrics
+          assert(metrics("nestedLoopJoinBuildInputRows").value == 200)
+          assert(metrics("nestedLoopJoinProbeInputRows").value == 1)
+          assert(metrics("numOutputRows").value == 200 * 1)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to