This is an automated email from the ASF dual-hosted git repository. kazuyukitanimura pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 2d8e142e1 feat: CometNativeScan metrics from ParquetFileMetrics and FileStreamMetrics (#1172) 2d8e142e1 is described below commit 2d8e142e13caa503ab1b25e29ceb5f377ca0e4a9 Author: Matt Butrovich <mbutrov...@users.noreply.github.com> AuthorDate: Fri Feb 14 14:24:23 2025 -0500 feat: CometNativeScan metrics from ParquetFileMetrics and FileStreamMetrics (#1172) * Sync with main. * Spark History server sems to appead the word total at the end, so I removed the redundant word at the beginning of the new metrics. * Fix typo. --- .../spark/sql/comet/CometNativeScanExec.scala | 80 ++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index ed9e545dd..c28fd74dd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types._ import org.apache.spark.util.collection._ @@ -59,6 +60,7 @@ case class CometNativeScanExec( override def outputPartitioning: Partitioning = UnknownPartitioning(originalPlan.inputRDD.getNumPartitions) + override def outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering override def stringArgs: Iterator[Any] = Iterator(output) @@ -74,6 +76,83 @@ case class CometNativeScanExec( } override def hashCode(): Int = Objects.hashCode(output) + + override lazy val metrics: Map[String, SQLMetric] = { + // We don't append CometMetricNode.baselineMetrics because + // elapsed_compute has no counterpart on the native side. + Map( + "output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "time_elapsed_opening" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for file opening"), + "time_elapsed_scanning_until_data" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for file scanning + " + + "first record batch of decompression + decoding"), + "time_elapsed_scanning_total" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Elapsed wall clock time for for scanning " + + "+ record batch decompression / decoding"), + "time_elapsed_processing" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for data decompression + decoding"), + "file_open_errors" -> + SQLMetrics.createMetric(sparkContext, "Count of errors opening file"), + "file_scan_errors" -> + SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"), + "predicate_evaluation_errors" -> + SQLMetrics.createMetric( + sparkContext, + "Number of times the predicate could not be evaluated"), + "row_groups_matched_bloom_filter" -> + SQLMetrics.createMetric( + sparkContext, + "Number of row groups whose bloom filters were checked and matched (not pruned)"), + "row_groups_pruned_bloom_filter" -> + SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by bloom filters"), + "row_groups_matched_statistics" -> + SQLMetrics.createMetric( + sparkContext, + "Number of row groups whose statistics were checked and matched (not pruned)"), + "row_groups_pruned_statistics" -> + SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by statistics"), + "bytes_scanned" -> + SQLMetrics.createSizeMetric(sparkContext, "Number of bytes scanned"), + "pushdown_rows_pruned" -> + SQLMetrics.createMetric( + sparkContext, + "Rows filtered out by predicates pushed into parquet scan"), + "pushdown_rows_matched" -> + SQLMetrics.createMetric(sparkContext, "Rows passed predicates pushed into parquet scan"), + "row_pushdown_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Time spent evaluating row-level pushdown filters"), + "statistics_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Time spent evaluating row group-level statistics filters"), + "bloom_filter_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Time spent evaluating row group Bloom Filters"), + "page_index_rows_pruned" -> + SQLMetrics.createMetric(sparkContext, "Rows filtered out by parquet page index"), + "page_index_rows_matched" -> + SQLMetrics.createMetric(sparkContext, "Rows passed through the parquet page index"), + "page_index_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Time spent evaluating parquet page index filters"), + "metadata_load_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Time spent reading and parsing metadata from the footer")) + } } object CometNativeScanExec extends DataTypeSupport { @@ -102,6 +181,7 @@ object CometNativeScanExec extends DataTypeSupport { case other: AnyRef => other case null => null } + val newArgs = mapProductIterator(scanExec, transform(_)) val wrapped = scanExec.makeCopy(newArgs).asInstanceOf[FileSourceScanExec] val batchScanExec = CometNativeScanExec( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org