[ 
https://issues.apache.org/jira/browse/SPARK-55523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-55523:
-----------------------------------
    Labels: cache metrics observability pull-request-available  (was: cache 
metrics observability)

> [SQL] InputMetrics.recordsRead reports CachedBatch count instead of row count 
> for cached DataFrame reads
> --------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-55523
>                 URL: https://issues.apache.org/jira/browse/SPARK-55523
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.5.5
>         Environment: Spark 3.5.5, any storage level (MEMORY_ONLY, DISK_ONLY, 
> etc.)
>            Reporter: Kaiyu Zhao
>            Priority: Minor
>              Labels: cache, metrics, observability, pull-request-available
>
> Problem
> When reading from a cached DataFrame (created via DataFrame.persist() or 
> DataFrame.cache()), the TaskMetrics.inputMetrics.recordsRead metric reports 
> the number of CachedBatch objects instead of the actual number of rows 
> processed.
> Example:
> val df = spark.range(3605).toDF("id")
> df.persist(StorageLevel.DISK_ONLY)
> df.count() // Materialize cache
> df.write.parquet("/tmp/output") // Writes 3605 rows
> Expected: InputRecordsRead = 3605 (actual rows)
> Actual: InputRecordsRead = 1 (number of CachedBatch objects)
> This makes metrics misleading for monitoring, debugging, and performance 
> analysis.
> Root Cause
> 1. When DataFrame.persist() is called, Spark converts rows into columnar 
> CachedBatch objects (each batch contains ~1000-10000 rows)
> 2. The underlying RDD becomes RDD[CachedBatch]
> 3. In RDD.getOrCompute() (line ~398), when reading from cache, it calls 
> incRecordsRead(1) for each element in the cached iterator
> 4. Since each element is a CachedBatch (not a row), it counts batches instead 
> of rows
> Proposed Solution
> Update InMemoryTableScanExec to track actual row counts:
> 1. In doExecute() and doExecuteColumnar(), add 
> inputMetrics.incRecordsRead(batch.numRows) when iterating over cached batches
> 2. Remove the misleading incRecordsRead(1) per-element counting in 
> RDD.getOrCompute() for cache hits to avoid double-counting
> Impact
> - Scope: Only affects DataFrame cache (via persist()/cache())
> - Benefit: Accurate metrics for cached DataFrame reads  
> - Compatibility: Changes reported metric values, but makes them accurate
> Files to Modify
> 1. core/src/main/scala/org/apache/spark/rdd/RDD.scala
> 2. 
> sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
> 3. 
> sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
>  (tests)
> I have a working implementation with tests and can provide a patch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to