[
https://issues.apache.org/jira/browse/SPARK-55523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-55523:
-----------------------------------
Labels: cache metrics observability pull-request-available (was: cache
metrics observability)
> [SQL] InputMetrics.recordsRead reports CachedBatch count instead of row count
> for cached DataFrame reads
> --------------------------------------------------------------------------------------------------------
>
> Key: SPARK-55523
> URL: https://issues.apache.org/jira/browse/SPARK-55523
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.5.5
> Environment: Spark 3.5.5, any storage level (MEMORY_ONLY, DISK_ONLY,
> etc.)
> Reporter: Kaiyu Zhao
> Priority: Minor
> Labels: cache, metrics, observability, pull-request-available
>
> Problem
> When reading from a cached DataFrame (created via DataFrame.persist() or
> DataFrame.cache()), the TaskMetrics.inputMetrics.recordsRead metric reports
> the number of CachedBatch objects instead of the actual number of rows
> processed.
> Example:
> val df = spark.range(3605).toDF("id")
> df.persist(StorageLevel.DISK_ONLY)
> df.count() // Materialize cache
> df.write.parquet("/tmp/output") // Writes 3605 rows
> Expected: InputRecordsRead = 3605 (actual rows)
> Actual: InputRecordsRead = 1 (number of CachedBatch objects)
> This makes metrics misleading for monitoring, debugging, and performance
> analysis.
> Root Cause
> 1. When DataFrame.persist() is called, Spark converts rows into columnar
> CachedBatch objects (each batch contains ~1000-10000 rows)
> 2. The underlying RDD becomes RDD[CachedBatch]
> 3. In RDD.getOrCompute() (line ~398), when reading from cache, it calls
> incRecordsRead(1) for each element in the cached iterator
> 4. Since each element is a CachedBatch (not a row), it counts batches instead
> of rows
> Proposed Solution
> Update InMemoryTableScanExec to track actual row counts:
> 1. In doExecute() and doExecuteColumnar(), add
> inputMetrics.incRecordsRead(batch.numRows) when iterating over cached batches
> 2. Remove the misleading incRecordsRead(1) per-element counting in
> RDD.getOrCompute() for cache hits to avoid double-counting
> Impact
> - Scope: Only affects DataFrame cache (via persist()/cache())
> - Benefit: Accurate metrics for cached DataFrame reads
> - Compatibility: Changes reported metric values, but makes them accurate
> Files to Modify
> 1. core/src/main/scala/org/apache/spark/rdd/RDD.scala
> 2.
> sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
> 3.
> sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
> (tests)
> I have a working implementation with tests and can provide a patch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]