This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e49a796  feat: Improve CometBroadcastHashJoin statistics (#339)
e49a796 is described below

commit e49a796f2065f63c9274f49c28680f06b634a0ed
Author: Pablo Langa <[email protected]>
AuthorDate: Tue Apr 30 01:26:56 2024 +0000

    feat: Improve CometBroadcastHashJoin statistics (#339)
    
    * broadcast hash join metrics
    
    (cherry picked from commit 97a647a0757250f9feaea6571b8cb0738c6ec340)
    
    * broadcast hash join test
    
    (cherry picked from commit df418aeaf9f0923d17a69edf5829c8f77a1934c1)
    
    * format
    
    * add assume
---
 .../apache/spark/sql/comet/CometMetricNode.scala   | 22 +++++++++++++
 .../org/apache/spark/sql/comet/operators.scala     | 22 +++----------
 .../org/apache/comet/exec/CometExecSuite.scala     | 36 +++++++++++++++++++++-
 3 files changed, 61 insertions(+), 19 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
index 60b26ca..7288455 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
@@ -79,6 +79,28 @@ object CometMetricNode {
         "total time (in ms) spent in this operator"))
   }
 
+  /**
+   * SQL Metrics for DataFusion HashJoin
+   */
+  def hashJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = {
+    Map(
+      "build_time" ->
+        SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting 
build-side of join"),
+      "build_input_batches" ->
+        SQLMetrics.createMetric(sc, "Number of batches consumed by 
build-side"),
+      "build_input_rows" ->
+        SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"),
+      "build_mem_used" ->
+        SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
+      "input_batches" ->
+        SQLMetrics.createMetric(sc, "Number of batches consumed by 
probe-side"),
+      "input_rows" ->
+        SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
+      "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches 
produced"),
+      "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"),
+      "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for 
joining"))
+  }
+
   /**
    * Creates a [[CometMetricNode]] from a [[CometPlan]].
    */
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 39ffef1..4e6d997 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -697,24 +697,7 @@ case class CometHashJoinExec(
     Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right)
 
   override lazy val metrics: Map[String, SQLMetric] =
-    Map(
-      "build_time" ->
-        SQLMetrics.createNanoTimingMetric(
-          sparkContext,
-          "Total time for collecting build-side of join"),
-      "build_input_batches" ->
-        SQLMetrics.createMetric(sparkContext, "Number of batches consumed by 
build-side"),
-      "build_input_rows" ->
-        SQLMetrics.createMetric(sparkContext, "Number of rows consumed by 
build-side"),
-      "build_mem_used" ->
-        SQLMetrics.createSizeMetric(sparkContext, "Memory used by build-side"),
-      "input_batches" ->
-        SQLMetrics.createMetric(sparkContext, "Number of batches consumed by 
probe-side"),
-      "input_rows" ->
-        SQLMetrics.createMetric(sparkContext, "Number of rows consumed by 
probe-side"),
-      "output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of 
batches produced"),
-      "output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows 
produced"),
-      "join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total 
time for joining"))
+    CometMetricNode.hashJoinMetrics(sparkContext)
 }
 
 case class CometBroadcastHashJoinExec(
@@ -846,6 +829,9 @@ case class CometBroadcastHashJoinExec(
 
   override def hashCode(): Int =
     Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right)
+
+  override lazy val metrics: Map[String, SQLMetric] =
+    CometMetricNode.hashJoinMetrics(sparkContext)
 }
 
 case class CometSortMergeJoinExec(
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index e5b3523..3eb20aa 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, 
CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Hex
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode
-import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, 
CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, 
CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, 
CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
+import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, 
CometBroadcastHashJoinExec, CometCollectLimitExec, CometFilterExec, 
CometHashAggregateExec, CometHashJoinExec, CometProjectExec, 
CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, 
CometTakeOrderedAndProjectExec}
 import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, 
CometShuffleExchangeExec}
 import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, 
SQLExecution, UnionExec}
 import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
@@ -364,6 +364,40 @@ class CometExecSuite extends CometTestBase {
     }
   }
 
+  test("Comet native metrics: BroadcastHashJoin") {
+    assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
+    withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") {
+      withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") {
+        val df = sql("SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON 
t1._1 = t2._1")
+        df.collect()
+
+        val metrics = find(df.queryExecution.executedPlan) {
+          case _: CometBroadcastHashJoinExec => true
+          case _ => false
+        }.map(_.metrics).get
+
+        assert(metrics.contains("build_time"))
+        assert(metrics("build_time").value > 1L)
+        assert(metrics.contains("build_input_batches"))
+        assert(metrics("build_input_batches").value == 25L)
+        assert(metrics.contains("build_mem_used"))
+        assert(metrics("build_mem_used").value > 1L)
+        assert(metrics.contains("build_input_rows"))
+        assert(metrics("build_input_rows").value == 25L)
+        assert(metrics.contains("input_batches"))
+        assert(metrics("input_batches").value == 5L)
+        assert(metrics.contains("input_rows"))
+        assert(metrics("input_rows").value == 5L)
+        assert(metrics.contains("output_batches"))
+        assert(metrics("output_batches").value == 5L)
+        assert(metrics.contains("output_rows"))
+        assert(metrics("output_rows").value == 5L)
+        assert(metrics.contains("join_time"))
+        assert(metrics("join_time").value > 1L)
+      }
+    }
+  }
+
   test(
     "fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec " 
+
       "should be CometRoot") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to