This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 406125db perf: Add metric for time spent in CometSparkToColumnarExec
(#931)
406125db is described below
commit 406125db3c649a750c03a1f71e58079d6a6c0e40
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Sep 19 07:51:08 2024 -0700
perf: Add metric for time spent in CometSparkToColumnarExec (#931)
* Add timing to CometSparkToColumnarExec.
* Fix test with a collect call, no pun intended.
* Clean up variable names and metric description.
* Address review feedback.
* Address more review feedback.
* Fix Scalastyle issue.
* Remove AQE from test because on Spark 3.5 it removes the Comet operators.
---
.../spark/sql/comet/CometSparkToColumnarExec.scala | 78 ++++++++++++++--------
.../org/apache/comet/exec/CometExecSuite.scala | 42 +++++++-----
2 files changed, 74 insertions(+), 46 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
index 640bb877..9602a286 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
@@ -54,11 +54,38 @@ case class CometSparkToColumnarExec(child: SparkPlan)
override lazy val metrics: Map[String, SQLMetric] = Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
- "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
output batches"))
+ "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
output batches"),
+ "conversionTime" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "time converting Spark batches to Arrow batches"))
+
+ // The conversion happens in next(), so wrap the call to measure time spent.
+ private def createTimingIter(
+ iter: Iterator[ColumnarBatch],
+ numInputRows: SQLMetric,
+ numOutputBatches: SQLMetric,
+ conversionTime: SQLMetric): Iterator[ColumnarBatch] = {
+ new Iterator[ColumnarBatch] {
+
+ override def hasNext: Boolean = {
+ iter.hasNext
+ }
+
+ override def next(): ColumnarBatch = {
+ val startNs = System.nanoTime()
+ val batch = iter.next()
+ conversionTime += System.nanoTime() - startNs
+ numInputRows += batch.numRows()
+ numOutputBatches += 1
+ batch
+ }
+ }
+ }
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
+ val conversionTime = longMetric("conversionTime")
val maxRecordsPerBatch = conf.arrowMaxRecordsPerBatch
val timeZoneId = conf.sessionLocalTimeZone
val schema = child.schema
@@ -66,41 +93,34 @@ case class CometSparkToColumnarExec(child: SparkPlan)
if (child.supportsColumnar) {
child
.executeColumnar()
- .mapPartitionsInternal { iter =>
- iter.flatMap { columnBatch =>
- val context = TaskContext.get()
- CometArrowConverters.columnarBatchToArrowBatchIter(
- columnBatch,
- schema,
- maxRecordsPerBatch,
- timeZoneId,
- context)
- }
- }
- .map { batch =>
- numInputRows += batch.numRows()
- numOutputBatches += 1
- batch
+ .mapPartitionsInternal { sparkBatches =>
+ val arrowBatches =
+ sparkBatches.flatMap { sparkBatch =>
+ val context = TaskContext.get()
+ CometArrowConverters.columnarBatchToArrowBatchIter(
+ sparkBatch,
+ schema,
+ maxRecordsPerBatch,
+ timeZoneId,
+ context)
+ }
+ createTimingIter(arrowBatches, numInputRows, numOutputBatches,
conversionTime)
}
} else {
child
.execute()
- .mapPartitionsInternal { iter =>
+ .mapPartitionsInternal { sparkBatches =>
val context = TaskContext.get()
- CometArrowConverters.rowToArrowBatchIter(
- iter,
- schema,
- maxRecordsPerBatch,
- timeZoneId,
- context)
- }
- .map { batch =>
- numInputRows += batch.numRows()
- numOutputBatches += 1
- batch
+ val arrowBatches =
+ CometArrowConverters.rowToArrowBatchIter(
+ sparkBatches,
+ schema,
+ maxRecordsPerBatch,
+ timeZoneId,
+ context)
+ createTimingIter(arrowBatches, numInputRows, numOutputBatches,
conversionTime)
}
}
-
}
override protected def withNewChildInternal(newChild: SparkPlan):
CometSparkToColumnarExec =
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 7e6334e0..c054d02d 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -1657,23 +1657,31 @@ class CometExecSuite extends CometTestBase {
}
test("SparkToColumnar over InMemoryTableScanExec") {
- Seq("true", "false").foreach(aqe => {
- Seq("true", "false").foreach(cacheVectorized => {
- withSQLConf(
- SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqe,
- CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
- SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> cacheVectorized) {
- spark
- .range(1000)
- .selectExpr("id as key", "id % 8 as value")
- .toDF("key", "value")
- .selectExpr("key", "value", "key+1")
- .createOrReplaceTempView("abc")
- spark.catalog.cacheTable("abc")
- val df = spark.sql("SELECT * FROM abc").groupBy("key").count()
- checkSparkAnswerAndOperator(df, includeClasses =
Seq(classOf[CometSparkToColumnarExec]))
- }
- })
+ Seq("true", "false").foreach(cacheVectorized => {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
+ SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> cacheVectorized) {
+ spark
+ .range(1000)
+ .selectExpr("id as key", "id % 8 as value")
+ .toDF("key", "value")
+ .selectExpr("key", "value", "key+1")
+ .createOrReplaceTempView("abc")
+ spark.catalog.cacheTable("abc")
+ val df = spark.sql("SELECT * FROM abc").groupBy("key").count()
+ checkSparkAnswerAndOperator(df, includeClasses =
Seq(classOf[CometSparkToColumnarExec]))
+ df.collect() // Without this collect we don't get an aggregation of
the metrics.
+
+ val metrics = find(df.queryExecution.executedPlan) {
+ case _: CometSparkToColumnarExec => true
+ case _ => false
+ }.map(_.metrics).get
+
+ assert(metrics.contains("conversionTime"))
+ assert(metrics("conversionTime").value > 0)
+
+ }
})
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]