This is an automated email from the ASF dual-hosted git repository.

kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d8e142e1 feat: CometNativeScan metrics from ParquetFileMetrics and 
FileStreamMetrics (#1172)
2d8e142e1 is described below

commit 2d8e142e13caa503ab1b25e29ceb5f377ca0e4a9
Author: Matt Butrovich <mbutrov...@users.noreply.github.com>
AuthorDate: Fri Feb 14 14:24:23 2025 -0500

    feat: CometNativeScan metrics from ParquetFileMetrics and FileStreamMetrics 
(#1172)
    
    * Sync with main.
    
    * Spark History server sems to appead the word total at the end, so I 
removed the redundant word at the beginning of the new metrics.
    
    * Fix typo.
---
 .../spark/sql/comet/CometNativeScanExec.scala      | 80 ++++++++++++++++++++++
 1 file changed, 80 insertions(+)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index ed9e545dd..c28fd74dd 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.collection._
 
@@ -59,6 +60,7 @@ case class CometNativeScanExec(
 
   override def outputPartitioning: Partitioning =
     UnknownPartitioning(originalPlan.inputRDD.getNumPartitions)
+
   override def outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering
 
   override def stringArgs: Iterator[Any] = Iterator(output)
@@ -74,6 +76,83 @@ case class CometNativeScanExec(
   }
 
   override def hashCode(): Int = Objects.hashCode(output)
+
+  override lazy val metrics: Map[String, SQLMetric] = {
+    // We don't append CometMetricNode.baselineMetrics because
+    // elapsed_compute has no counterpart on the native side.
+    Map(
+      "output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "time_elapsed_opening" ->
+        SQLMetrics.createNanoTimingMetric(
+          sparkContext,
+          "Wall clock time elapsed for file opening"),
+      "time_elapsed_scanning_until_data" ->
+        SQLMetrics.createNanoTimingMetric(
+          sparkContext,
+          "Wall clock time elapsed for file scanning + " +
+            "first record batch of decompression + decoding"),
+      "time_elapsed_scanning_total" ->
+        SQLMetrics.createNanoTimingMetric(
+          sparkContext,
+          "Elapsed wall clock time for for scanning " +
+            "+ record batch decompression / decoding"),
+      "time_elapsed_processing" ->
+        SQLMetrics.createNanoTimingMetric(
+          sparkContext,
+          "Wall clock time elapsed for data decompression + decoding"),
+      "file_open_errors" ->
+        SQLMetrics.createMetric(sparkContext, "Count of errors opening file"),
+      "file_scan_errors" ->
+        SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"),
+      "predicate_evaluation_errors" ->
+        SQLMetrics.createMetric(
+          sparkContext,
+          "Number of times the predicate could not be evaluated"),
+      "row_groups_matched_bloom_filter" ->
+        SQLMetrics.createMetric(
+          sparkContext,
+          "Number of row groups whose bloom filters were checked and matched 
(not pruned)"),
+      "row_groups_pruned_bloom_filter" ->
+        SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by 
bloom filters"),
+      "row_groups_matched_statistics" ->
+        SQLMetrics.createMetric(
+          sparkContext,
+          "Number of row groups whose statistics were checked and matched (not 
pruned)"),
+      "row_groups_pruned_statistics" ->
+        SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by 
statistics"),
+      "bytes_scanned" ->
+        SQLMetrics.createSizeMetric(sparkContext, "Number of bytes scanned"),
+      "pushdown_rows_pruned" ->
+        SQLMetrics.createMetric(
+          sparkContext,
+          "Rows filtered out by predicates pushed into parquet scan"),
+      "pushdown_rows_matched" ->
+        SQLMetrics.createMetric(sparkContext, "Rows passed predicates pushed 
into parquet scan"),
+      "row_pushdown_eval_time" ->
+        SQLMetrics.createNanoTimingMetric(
+          sparkContext,
+          "Time spent evaluating row-level pushdown filters"),
+      "statistics_eval_time" ->
+        SQLMetrics.createNanoTimingMetric(
+          sparkContext,
+          "Time spent evaluating row group-level statistics filters"),
+      "bloom_filter_eval_time" ->
+        SQLMetrics.createNanoTimingMetric(
+          sparkContext,
+          "Time spent evaluating row group Bloom Filters"),
+      "page_index_rows_pruned" ->
+        SQLMetrics.createMetric(sparkContext, "Rows filtered out by parquet 
page index"),
+      "page_index_rows_matched" ->
+        SQLMetrics.createMetric(sparkContext, "Rows passed through the parquet 
page index"),
+      "page_index_eval_time" ->
+        SQLMetrics.createNanoTimingMetric(
+          sparkContext,
+          "Time spent evaluating parquet page index filters"),
+      "metadata_load_time" ->
+        SQLMetrics.createNanoTimingMetric(
+          sparkContext,
+          "Time spent reading and parsing metadata from the footer"))
+  }
 }
 
 object CometNativeScanExec extends DataTypeSupport {
@@ -102,6 +181,7 @@ object CometNativeScanExec extends DataTypeSupport {
       case other: AnyRef => other
       case null => null
     }
+
     val newArgs = mapProductIterator(scanExec, transform(_))
     val wrapped = scanExec.makeCopy(newArgs).asInstanceOf[FileSourceScanExec]
     val batchScanExec = CometNativeScanExec(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to