This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 406125db perf: Add metric for time spent in CometSparkToColumnarExec 
(#931)
406125db is described below

commit 406125db3c649a750c03a1f71e58079d6a6c0e40
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Sep 19 07:51:08 2024 -0700

    perf: Add metric for time spent in CometSparkToColumnarExec (#931)
    
    * Add timing to CometSparkToColumnarExec.
    
    * Fix test with a collect call, no pun intended.
    
    * Clean up variable names and metric description.
    
    * Address review feedback.
    
    * Address more review feedback.
    
    * Fix Scalastyle issue.
    
    * Remove AQE from test because on Spark 3.5 it removes the Comet operators.
---
 .../spark/sql/comet/CometSparkToColumnarExec.scala | 78 ++++++++++++++--------
 .../org/apache/comet/exec/CometExecSuite.scala     | 42 +++++++-----
 2 files changed, 74 insertions(+), 46 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
index 640bb877..9602a286 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
@@ -54,11 +54,38 @@ case class CometSparkToColumnarExec(child: SparkPlan)
 
   override lazy val metrics: Map[String, SQLMetric] = Map(
     "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
-    "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"))
+    "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"),
+    "conversionTime" -> SQLMetrics.createNanoTimingMetric(
+      sparkContext,
+      "time converting Spark batches to Arrow batches"))
+
+  // The conversion happens in next(), so wrap the call to measure time spent.
+  private def createTimingIter(
+      iter: Iterator[ColumnarBatch],
+      numInputRows: SQLMetric,
+      numOutputBatches: SQLMetric,
+      conversionTime: SQLMetric): Iterator[ColumnarBatch] = {
+    new Iterator[ColumnarBatch] {
+
+      override def hasNext: Boolean = {
+        iter.hasNext
+      }
+
+      override def next(): ColumnarBatch = {
+        val startNs = System.nanoTime()
+        val batch = iter.next()
+        conversionTime += System.nanoTime() - startNs
+        numInputRows += batch.numRows()
+        numOutputBatches += 1
+        batch
+      }
+    }
+  }
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     val numInputRows = longMetric("numInputRows")
     val numOutputBatches = longMetric("numOutputBatches")
+    val conversionTime = longMetric("conversionTime")
     val maxRecordsPerBatch = conf.arrowMaxRecordsPerBatch
     val timeZoneId = conf.sessionLocalTimeZone
     val schema = child.schema
@@ -66,41 +93,34 @@ case class CometSparkToColumnarExec(child: SparkPlan)
     if (child.supportsColumnar) {
       child
         .executeColumnar()
-        .mapPartitionsInternal { iter =>
-          iter.flatMap { columnBatch =>
-            val context = TaskContext.get()
-            CometArrowConverters.columnarBatchToArrowBatchIter(
-              columnBatch,
-              schema,
-              maxRecordsPerBatch,
-              timeZoneId,
-              context)
-          }
-        }
-        .map { batch =>
-          numInputRows += batch.numRows()
-          numOutputBatches += 1
-          batch
+        .mapPartitionsInternal { sparkBatches =>
+          val arrowBatches =
+            sparkBatches.flatMap { sparkBatch =>
+              val context = TaskContext.get()
+              CometArrowConverters.columnarBatchToArrowBatchIter(
+                sparkBatch,
+                schema,
+                maxRecordsPerBatch,
+                timeZoneId,
+                context)
+            }
+          createTimingIter(arrowBatches, numInputRows, numOutputBatches, 
conversionTime)
         }
     } else {
       child
         .execute()
-        .mapPartitionsInternal { iter =>
+        .mapPartitionsInternal { sparkBatches =>
           val context = TaskContext.get()
-          CometArrowConverters.rowToArrowBatchIter(
-            iter,
-            schema,
-            maxRecordsPerBatch,
-            timeZoneId,
-            context)
-        }
-        .map { batch =>
-          numInputRows += batch.numRows()
-          numOutputBatches += 1
-          batch
+          val arrowBatches =
+            CometArrowConverters.rowToArrowBatchIter(
+              sparkBatches,
+              schema,
+              maxRecordsPerBatch,
+              timeZoneId,
+              context)
+          createTimingIter(arrowBatches, numInputRows, numOutputBatches, 
conversionTime)
         }
     }
-
   }
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
CometSparkToColumnarExec =
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 7e6334e0..c054d02d 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -1657,23 +1657,31 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("SparkToColumnar over InMemoryTableScanExec") {
-    Seq("true", "false").foreach(aqe => {
-      Seq("true", "false").foreach(cacheVectorized => {
-        withSQLConf(
-          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqe,
-          CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
-          SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> cacheVectorized) {
-          spark
-            .range(1000)
-            .selectExpr("id as key", "id % 8 as value")
-            .toDF("key", "value")
-            .selectExpr("key", "value", "key+1")
-            .createOrReplaceTempView("abc")
-          spark.catalog.cacheTable("abc")
-          val df = spark.sql("SELECT * FROM abc").groupBy("key").count()
-          checkSparkAnswerAndOperator(df, includeClasses = 
Seq(classOf[CometSparkToColumnarExec]))
-        }
-      })
+    Seq("true", "false").foreach(cacheVectorized => {
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+        CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
+        SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> cacheVectorized) {
+        spark
+          .range(1000)
+          .selectExpr("id as key", "id % 8 as value")
+          .toDF("key", "value")
+          .selectExpr("key", "value", "key+1")
+          .createOrReplaceTempView("abc")
+        spark.catalog.cacheTable("abc")
+        val df = spark.sql("SELECT * FROM abc").groupBy("key").count()
+        checkSparkAnswerAndOperator(df, includeClasses = 
Seq(classOf[CometSparkToColumnarExec]))
+        df.collect() // Without this collect we don't get an aggregation of 
the metrics.
+
+        val metrics = find(df.queryExecution.executedPlan) {
+          case _: CometSparkToColumnarExec => true
+          case _ => false
+        }.map(_.metrics).get
+
+        assert(metrics.contains("conversionTime"))
+        assert(metrics("conversionTime").value > 0)
+
+      }
     })
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to