This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new cc2b3cc8b [GLUTEN-6674][CH] Support sort merge join metrics (#6774)
cc2b3cc8b is described below
commit cc2b3cc8b0042a28e921b8095b11c431065d59f1
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Aug 12 17:53:33 2024 +0800
[GLUTEN-6674][CH] Support sort merge join metrics (#6774)
[CH] Support sort merge join metrics
---
.../backendsapi/clickhouse/CHMetricsApi.scala | 26 +++++-
.../metrics/SortMergeJoinMetricsUpdater.scala | 96 +++++++++++++++++++++-
2 files changed, 119 insertions(+), 3 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 0ff53e1c5..9058bffd8 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -298,7 +298,31 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genSortMergeJoinTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
- Map.empty
+ Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
+ "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
input bytes"),
+ "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
+ "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for data"),
+ "outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of
waiting for output"),
+ "streamPreProjectionTime" ->
+ SQLMetrics.createTimingMetric(sparkContext, "time of stream side
preProjection"),
+ "buildPreProjectionTime" ->
+ SQLMetrics.createTimingMetric(sparkContext, "time of build side
preProjection"),
+ "postProjectTime" ->
+ SQLMetrics.createTimingMetric(sparkContext, "time of postProjection"),
+ "probeTime" ->
+ SQLMetrics.createTimingMetric(sparkContext, "time of probe"),
+ "totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time"),
+ "fillingRightJoinSideTime" -> SQLMetrics.createTimingMetric(
+ sparkContext,
+ "filling right join side time"),
+ "conditionTime" -> SQLMetrics.createTimingMetric(sparkContext, "join
condition time")
+ )
override def genSortMergeJoinTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new
SortMergeJoinMetricsUpdater(metrics)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/SortMergeJoinMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/SortMergeJoinMetricsUpdater.scala
index e8ebbbd2c..e5833a39b 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/SortMergeJoinMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/SortMergeJoinMetricsUpdater.scala
@@ -16,11 +16,103 @@
*/
package org.apache.gluten.metrics
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.metric.SQLMetric
-class SortMergeJoinMetricsUpdater(val metrics: Map[String, SQLMetric]) extends
MetricsUpdater {
+class SortMergeJoinMetricsUpdater(val metrics: Map[String, SQLMetric])
+ extends MetricsUpdater
+ with Logging {
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
- if (opMetrics != null) {}
+ try {
+ if (opMetrics != null) {
+ val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
+ if (!operatorMetrics.metricsList.isEmpty && operatorMetrics.joinParams
!= null) {
+ val joinParams = operatorMetrics.joinParams
+ var currentIdx = operatorMetrics.metricsList.size() - 1
+ var totalTime = 0L
+
+ // build side pre projection
+ if (joinParams.buildPreProjectionNeeded) {
+ metrics("buildPreProjectionTime") +=
+ (operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
+ metrics("outputVectors") +=
operatorMetrics.metricsList.get(currentIdx).outputVectors
+ totalTime += operatorMetrics.metricsList.get(currentIdx).time
+ currentIdx -= 1
+ }
+
+ // stream side pre projection
+ if (joinParams.streamPreProjectionNeeded) {
+ metrics("streamPreProjectionTime") +=
+ (operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
+ metrics("outputVectors") +=
operatorMetrics.metricsList.get(currentIdx).outputVectors
+ totalTime += operatorMetrics.metricsList.get(currentIdx).time
+ currentIdx -= 1
+ }
+
+ // update fillingRightJoinSideTime
+ MetricsUtil
+ .getAllProcessorList(operatorMetrics.metricsList.get(currentIdx))
+ .foreach(
+ processor => {
+ if (processor.name.equalsIgnoreCase("FillingRightJoinSide")) {
+ metrics("fillingRightJoinSideTime") += (processor.time /
1000L).toLong
+ }
+ })
+
+ // joining
+ val joinMetricsData = operatorMetrics.metricsList.get(currentIdx)
+ metrics("outputVectors") += joinMetricsData.outputVectors
+ metrics("inputWaitTime") += (joinMetricsData.inputWaitTime /
1000L).toLong
+ metrics("outputWaitTime") += (joinMetricsData.outputWaitTime /
1000L).toLong
+ totalTime += joinMetricsData.time
+
+ MetricsUtil
+ .getAllProcessorList(joinMetricsData)
+ .foreach(
+ processor => {
+ if (processor.name.equalsIgnoreCase("FillingRightJoinSide")) {
+ metrics("fillingRightJoinSideTime") += (processor.time /
1000L).toLong
+ }
+ if (processor.name.equalsIgnoreCase("FilterTransform")) {
+ metrics("conditionTime") += (processor.time / 1000L).toLong
+ }
+ if (processor.name.equalsIgnoreCase("JoiningTransform")) {
+ metrics("probeTime") += (processor.time / 1000L).toLong
+ }
+ if
(!SortMergeJoinMetricsUpdater.INCLUDING_PROCESSORS.contains(processor.name)) {
+ metrics("extraTime") += (processor.time / 1000L).toLong
+ }
+ if
(SortMergeJoinMetricsUpdater.CH_PLAN_NODE_NAME.contains(processor.name)) {
+ metrics("numOutputRows") += processor.outputRows
+ metrics("outputBytes") += processor.outputBytes
+ metrics("numInputRows") += processor.inputRows
+ metrics("inputBytes") += processor.inputBytes
+ }
+ })
+
+ currentIdx -= 1
+
+ // post projection
+ if (joinParams.postProjectionNeeded) {
+ metrics("postProjectTime") +=
+ (operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
+ metrics("outputVectors") +=
operatorMetrics.metricsList.get(currentIdx).outputVectors
+ totalTime += operatorMetrics.metricsList.get(currentIdx).time
+ currentIdx -= 1
+ }
+ metrics("totalTime") += (totalTime / 1000L).toLong
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Updating native metrics failed due to ${e.getCause}.")
+ throw e
+ }
}
}
+
+object SortMergeJoinMetricsUpdater {
+ val INCLUDING_PROCESSORS = Array("JoiningTransform", "FillingRightJoinSide",
"FilterTransform")
+ val CH_PLAN_NODE_NAME = Array("JoiningTransform")
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]