This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new e49a796 feat: Improve CometBroadcastHashJoin statistics (#339)
e49a796 is described below
commit e49a796f2065f63c9274f49c28680f06b634a0ed
Author: Pablo Langa <[email protected]>
AuthorDate: Tue Apr 30 01:26:56 2024 +0000
feat: Improve CometBroadcastHashJoin statistics (#339)
* broadcast hash join metrics
(cherry picked from commit 97a647a0757250f9feaea6571b8cb0738c6ec340)
* broadcast hash join test
(cherry picked from commit df418aeaf9f0923d17a69edf5829c8f77a1934c1)
* format
* add assume
---
.../apache/spark/sql/comet/CometMetricNode.scala | 22 +++++++++++++
.../org/apache/spark/sql/comet/operators.scala | 22 +++----------
.../org/apache/comet/exec/CometExecSuite.scala | 36 +++++++++++++++++++++-
3 files changed, 61 insertions(+), 19 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
index 60b26ca..7288455 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
@@ -79,6 +79,28 @@ object CometMetricNode {
"total time (in ms) spent in this operator"))
}
+ /**
+ * SQL Metrics for DataFusion HashJoin
+ */
+ def hashJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = {
+ Map(
+ "build_time" ->
+ SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting
build-side of join"),
+ "build_input_batches" ->
+ SQLMetrics.createMetric(sc, "Number of batches consumed by
build-side"),
+ "build_input_rows" ->
+ SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"),
+ "build_mem_used" ->
+ SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
+ "input_batches" ->
+ SQLMetrics.createMetric(sc, "Number of batches consumed by
probe-side"),
+ "input_rows" ->
+ SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
+ "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches
produced"),
+ "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"),
+ "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for
joining"))
+ }
+
/**
* Creates a [[CometMetricNode]] from a [[CometPlan]].
*/
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 39ffef1..4e6d997 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -697,24 +697,7 @@ case class CometHashJoinExec(
Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right)
override lazy val metrics: Map[String, SQLMetric] =
- Map(
- "build_time" ->
- SQLMetrics.createNanoTimingMetric(
- sparkContext,
- "Total time for collecting build-side of join"),
- "build_input_batches" ->
- SQLMetrics.createMetric(sparkContext, "Number of batches consumed by
build-side"),
- "build_input_rows" ->
- SQLMetrics.createMetric(sparkContext, "Number of rows consumed by
build-side"),
- "build_mem_used" ->
- SQLMetrics.createSizeMetric(sparkContext, "Memory used by build-side"),
- "input_batches" ->
- SQLMetrics.createMetric(sparkContext, "Number of batches consumed by
probe-side"),
- "input_rows" ->
- SQLMetrics.createMetric(sparkContext, "Number of rows consumed by
probe-side"),
- "output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of
batches produced"),
- "output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows
produced"),
- "join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total
time for joining"))
+ CometMetricNode.hashJoinMetrics(sparkContext)
}
case class CometBroadcastHashJoinExec(
@@ -846,6 +829,9 @@ case class CometBroadcastHashJoinExec(
override def hashCode(): Int =
Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right)
+
+ override lazy val metrics: Map[String, SQLMetric] =
+ CometMetricNode.hashJoinMetrics(sparkContext)
}
case class CometSortMergeJoinExec(
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index e5b3523..3eb20aa 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics,
CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Hex
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode
-import org.apache.spark.sql.comet.{CometBroadcastExchangeExec,
CometCollectLimitExec, CometFilterExec, CometHashAggregateExec,
CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec,
CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
+import org.apache.spark.sql.comet.{CometBroadcastExchangeExec,
CometBroadcastHashJoinExec, CometCollectLimitExec, CometFilterExec,
CometHashAggregateExec, CometHashJoinExec, CometProjectExec,
CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec,
CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec,
SQLExecution, UnionExec}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
@@ -364,6 +364,40 @@ class CometExecSuite extends CometTestBase {
}
}
+ test("Comet native metrics: BroadcastHashJoin") {
+ assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
+ withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") {
+ withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") {
+ val df = sql("SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON
t1._1 = t2._1")
+ df.collect()
+
+ val metrics = find(df.queryExecution.executedPlan) {
+ case _: CometBroadcastHashJoinExec => true
+ case _ => false
+ }.map(_.metrics).get
+
+ assert(metrics.contains("build_time"))
+ assert(metrics("build_time").value > 1L)
+ assert(metrics.contains("build_input_batches"))
+ assert(metrics("build_input_batches").value == 25L)
+ assert(metrics.contains("build_mem_used"))
+ assert(metrics("build_mem_used").value > 1L)
+ assert(metrics.contains("build_input_rows"))
+ assert(metrics("build_input_rows").value == 25L)
+ assert(metrics.contains("input_batches"))
+ assert(metrics("input_batches").value == 5L)
+ assert(metrics.contains("input_rows"))
+ assert(metrics("input_rows").value == 5L)
+ assert(metrics.contains("output_batches"))
+ assert(metrics("output_batches").value == 5L)
+ assert(metrics.contains("output_rows"))
+ assert(metrics("output_rows").value == 5L)
+ assert(metrics.contains("join_time"))
+ assert(metrics("join_time").value > 1L)
+ }
+ }
+ }
+
test(
"fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec "
+
"should be CometRoot") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]