This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new cc2b3cc8b [GLUTEN-6674][CH] Support sort merge join metrics (#6774)
cc2b3cc8b is described below

commit cc2b3cc8b0042a28e921b8095b11c431065d59f1
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Aug 12 17:53:33 2024 +0800

    [GLUTEN-6674][CH] Support sort merge join metrics (#6774)
    
    [CH] Support sort merge join metrics
---
 .../backendsapi/clickhouse/CHMetricsApi.scala      | 26 +++++-
 .../metrics/SortMergeJoinMetricsUpdater.scala      | 96 +++++++++++++++++++++-
 2 files changed, 119 insertions(+), 3 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 0ff53e1c5..9058bffd8 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -298,7 +298,31 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
 
   override def genSortMergeJoinTransformerMetrics(
       sparkContext: SparkContext): Map[String, SQLMetric] =
-    Map.empty
+    Map(
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
+      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
+      "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
+      "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
+      "outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for output"),
+      "streamPreProjectionTime" ->
+        SQLMetrics.createTimingMetric(sparkContext, "time of stream side 
preProjection"),
+      "buildPreProjectionTime" ->
+        SQLMetrics.createTimingMetric(sparkContext, "time of build side 
preProjection"),
+      "postProjectTime" ->
+        SQLMetrics.createTimingMetric(sparkContext, "time of postProjection"),
+      "probeTime" ->
+        SQLMetrics.createTimingMetric(sparkContext, "time of probe"),
+      "totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time"),
+      "fillingRightJoinSideTime" -> SQLMetrics.createTimingMetric(
+        sparkContext,
+        "filling right join side time"),
+      "conditionTime" -> SQLMetrics.createTimingMetric(sparkContext, "join 
condition time")
+    )
 
   override def genSortMergeJoinTransformerMetricsUpdater(
       metrics: Map[String, SQLMetric]): MetricsUpdater = new 
SortMergeJoinMetricsUpdater(metrics)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/SortMergeJoinMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/SortMergeJoinMetricsUpdater.scala
index e8ebbbd2c..e5833a39b 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/SortMergeJoinMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/SortMergeJoinMetricsUpdater.scala
@@ -16,11 +16,103 @@
  */
 package org.apache.gluten.metrics
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution.metric.SQLMetric
 
-class SortMergeJoinMetricsUpdater(val metrics: Map[String, SQLMetric]) extends 
MetricsUpdater {
+class SortMergeJoinMetricsUpdater(val metrics: Map[String, SQLMetric])
+  extends MetricsUpdater
+  with Logging {
 
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
-    if (opMetrics != null) {}
+    try {
+      if (opMetrics != null) {
+        val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
+        if (!operatorMetrics.metricsList.isEmpty && operatorMetrics.joinParams 
!= null) {
+          val joinParams = operatorMetrics.joinParams
+          var currentIdx = operatorMetrics.metricsList.size() - 1
+          var totalTime = 0L
+
+          // build side pre projection
+          if (joinParams.buildPreProjectionNeeded) {
+            metrics("buildPreProjectionTime") +=
+              (operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
+            metrics("outputVectors") += 
operatorMetrics.metricsList.get(currentIdx).outputVectors
+            totalTime += operatorMetrics.metricsList.get(currentIdx).time
+            currentIdx -= 1
+          }
+
+          // stream side pre projection
+          if (joinParams.streamPreProjectionNeeded) {
+            metrics("streamPreProjectionTime") +=
+              (operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
+            metrics("outputVectors") += 
operatorMetrics.metricsList.get(currentIdx).outputVectors
+            totalTime += operatorMetrics.metricsList.get(currentIdx).time
+            currentIdx -= 1
+          }
+
+          // update fillingRightJoinSideTime
+          MetricsUtil
+            .getAllProcessorList(operatorMetrics.metricsList.get(currentIdx))
+            .foreach(
+              processor => {
+                if (processor.name.equalsIgnoreCase("FillingRightJoinSide")) {
+                  metrics("fillingRightJoinSideTime") += (processor.time / 
1000L).toLong
+                }
+              })
+
+          // joining
+          val joinMetricsData = operatorMetrics.metricsList.get(currentIdx)
+          metrics("outputVectors") += joinMetricsData.outputVectors
+          metrics("inputWaitTime") += (joinMetricsData.inputWaitTime / 
1000L).toLong
+          metrics("outputWaitTime") += (joinMetricsData.outputWaitTime / 
1000L).toLong
+          totalTime += joinMetricsData.time
+
+          MetricsUtil
+            .getAllProcessorList(joinMetricsData)
+            .foreach(
+              processor => {
+                if (processor.name.equalsIgnoreCase("FillingRightJoinSide")) {
+                  metrics("fillingRightJoinSideTime") += (processor.time / 
1000L).toLong
+                }
+                if (processor.name.equalsIgnoreCase("FilterTransform")) {
+                  metrics("conditionTime") += (processor.time / 1000L).toLong
+                }
+                if (processor.name.equalsIgnoreCase("JoiningTransform")) {
+                  metrics("probeTime") += (processor.time / 1000L).toLong
+                }
+                if 
(!SortMergeJoinMetricsUpdater.INCLUDING_PROCESSORS.contains(processor.name)) {
+                  metrics("extraTime") += (processor.time / 1000L).toLong
+                }
+                if 
(SortMergeJoinMetricsUpdater.CH_PLAN_NODE_NAME.contains(processor.name)) {
+                  metrics("numOutputRows") += processor.outputRows
+                  metrics("outputBytes") += processor.outputBytes
+                  metrics("numInputRows") += processor.inputRows
+                  metrics("inputBytes") += processor.inputBytes
+                }
+              })
+
+          currentIdx -= 1
+
+          // post projection
+          if (joinParams.postProjectionNeeded) {
+            metrics("postProjectTime") +=
+              (operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
+            metrics("outputVectors") += 
operatorMetrics.metricsList.get(currentIdx).outputVectors
+            totalTime += operatorMetrics.metricsList.get(currentIdx).time
+            currentIdx -= 1
+          }
+          metrics("totalTime") += (totalTime / 1000L).toLong
+        }
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Updating native metrics failed due to ${e.getCause}.")
+        throw e
+    }
   }
 }
+
+object SortMergeJoinMetricsUpdater {
+  val INCLUDING_PROCESSORS = Array("JoiningTransform", "FillingRightJoinSide", 
"FilterTransform")
+  val CH_PLAN_NODE_NAME = Array("JoiningTransform")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to