EmilyMatt commented on code in PR #2615:
URL: https://github.com/apache/datafusion-comet/pull/2615#discussion_r2448229340


##########
spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala:
##########
@@ -203,7 +203,7 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
           return withInfos(scanExec, fallbackReasons.toSet)
         }
 
-        if (scanImpl != CometConf.SCAN_NATIVE_COMET && 
encryptionEnabled(hadoopConf)) {
+        if (encryptionEnabled(hadoopConf) && scanImpl != 
CometConf.SCAN_NATIVE_COMET) {

Review Comment:
   This had me perplexed for a bit so just reversed the ordering it makes it 
much more understandable imo



##########
spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala:
##########
@@ -51,6 +51,14 @@ case class CometBatchScanExec(wrapped: BatchScanExec, 
runtimeFilters: Seq[Expres
   override lazy val inputRDD: RDD[InternalRow] = wrappedScan.inputRDD
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val rdd = inputRDD.asInstanceOf[RDD[ColumnarBatch]]
+
+    // Can skip the following logic if we're using different metrics to 
calculate this,
+    // e.g., Datafusion reader metrics.
+    if (Seq("numOutputRows", "scanTime").exists(metric => 
!metrics.contains(metric))) {

Review Comment:
   This could have been resolved in a number of ways but in the end I just 
chose the safest one



##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -214,7 +217,8 @@ private NativeBatchReader(AbstractColumnReader[] 
columnReaders) {
       boolean useLegacyDateTimestamp,
       StructType partitionSchema,
       InternalRow partitionValues,
-      Map<String, SQLMetric> metrics) {
+      Map<String, SQLMetric> metrics,
+      Object metricsNode) {

Review Comment:
   Honestly using the Object here is really itchy but under the constraints it 
was the best way



##########
spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala:
##########
@@ -246,29 +246,25 @@ case class CometScanExec(
     }
   }
 
-  override lazy val metrics: Map[String, SQLMetric] = wrapped.metrics ++ {
-    // Tracking scan time has overhead, we can't afford to do it for each row, 
and can only do
-    // it for each batch.
-    if (supportsColumnar) {
-      Map(
-        "scanTime" -> SQLMetrics.createNanoTimingMetric(
-          sparkContext,
-          "scan time")) ++ CometMetricNode.scanMetrics(sparkContext)
-    } else {
-      Map.empty
-    }
-  } ++ {
-    relation.fileFormat match {
-      case f: MetricsSupport => f.initMetrics(sparkContext)
+  override lazy val metrics: Map[String, SQLMetric] =
+    wrapped.driverMetrics ++ (relation.fileFormat match {
+      case m: MetricsSupport => m.getMetrics
       case _ => Map.empty
-    }
-  }
+    })
 
   protected override def doExecute(): RDD[InternalRow] = {
     ColumnarToRowExec(this).doExecute()
   }
 
   protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val rdd = inputRDD.asInstanceOf[RDD[ColumnarBatch]]
+
+    // Can skip the following logic if we're using different metrics to 
calculate this,
+    // e.g., Datafusion reader metrics.
+    if (Seq("numOutputRows", "scanTime").exists(metric => 
!metrics.contains(metric))) {

Review Comment:
   Likewise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to