viirya commented on code in PR #931:
URL: https://github.com/apache/datafusion-comet/pull/931#discussion_r1752517979


##########
spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala:
##########
@@ -54,27 +54,45 @@ case class CometSparkToColumnarExec(child: SparkPlan)
 
   override lazy val metrics: Map[String, SQLMetric] = Map(
     "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
-    "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"))
+    "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"),
+    "conversionTime" -> SQLMetrics.createNanoTimingMetric(
+      sparkContext,
+      "time converting Spark batches to Arrow batches"))
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     val numInputRows = longMetric("numInputRows")
     val numOutputBatches = longMetric("numOutputBatches")
+    val conversionTime = longMetric("conversionTime")
     val maxRecordsPerBatch = conf.arrowMaxRecordsPerBatch
     val timeZoneId = conf.sessionLocalTimeZone
     val schema = child.schema
 
     if (child.supportsColumnar) {
       child
         .executeColumnar()
-        .mapPartitionsInternal { iter =>
-          iter.flatMap { columnBatch =>
-            val context = TaskContext.get()
-            CometArrowConverters.columnarBatchToArrowBatchIter(
-              columnBatch,
-              schema,
-              maxRecordsPerBatch,
-              timeZoneId,
-              context)
+        .mapPartitionsInternal { sparkBatches =>
+          val arrowBatches =
+            sparkBatches.flatMap { sparkBatch =>
+              val context = TaskContext.get()
+              CometArrowConverters.columnarBatchToArrowBatchIter(
+                sparkBatch,
+                schema,
+                maxRecordsPerBatch,
+                timeZoneId,
+                context)
+            }
+          new Iterator[ColumnarBatch] {
+            // The conversion happens in next(), so redefine the call to 
measure time spent.
+            override def hasNext: Boolean = {
+              arrowBatches.hasNext
+            }
+
+            override def next(): ColumnarBatch = {
+              val startNs = System.nanoTime()
+              val batch = arrowBatches.next()
+              conversionTime += System.nanoTime() - startNs
+              batch
+            }
           }
         }
         .map { batch =>

Review Comment:
   Not related to this PR, but I wonder why you don't just combine this `map` 
with above? Which is just calculating `numInputRows` and `numOutputBatches` 
metrics.



##########
spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala:
##########
@@ -54,27 +54,45 @@ case class CometSparkToColumnarExec(child: SparkPlan)
 
   override lazy val metrics: Map[String, SQLMetric] = Map(
     "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
-    "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"))
+    "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"),
+    "conversionTime" -> SQLMetrics.createNanoTimingMetric(
+      sparkContext,
+      "time converting Spark batches to Arrow batches"))
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     val numInputRows = longMetric("numInputRows")
     val numOutputBatches = longMetric("numOutputBatches")
+    val conversionTime = longMetric("conversionTime")
     val maxRecordsPerBatch = conf.arrowMaxRecordsPerBatch
     val timeZoneId = conf.sessionLocalTimeZone
     val schema = child.schema
 
     if (child.supportsColumnar) {
       child
         .executeColumnar()
-        .mapPartitionsInternal { iter =>
-          iter.flatMap { columnBatch =>
-            val context = TaskContext.get()
-            CometArrowConverters.columnarBatchToArrowBatchIter(
-              columnBatch,
-              schema,
-              maxRecordsPerBatch,
-              timeZoneId,
-              context)
+        .mapPartitionsInternal { sparkBatches =>
+          val arrowBatches =
+            sparkBatches.flatMap { sparkBatch =>
+              val context = TaskContext.get()
+              CometArrowConverters.columnarBatchToArrowBatchIter(
+                sparkBatch,
+                schema,
+                maxRecordsPerBatch,
+                timeZoneId,
+                context)
+            }
+          new Iterator[ColumnarBatch] {
+            // The conversion happens in next(), so redefine the call to 
measure time spent.
+            override def hasNext: Boolean = {
+              arrowBatches.hasNext
+            }
+
+            override def next(): ColumnarBatch = {
+              val startNs = System.nanoTime()
+              val batch = arrowBatches.next()
+              conversionTime += System.nanoTime() - startNs
+              batch
+            }
           }
         }
         .map { batch =>

Review Comment:
   We can just combine this `map` with above? Which is just calculating 
`numInputRows` and `numOutputBatches` metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to