Kaiyu Zhao created SPARK-55523:
----------------------------------

             Summary: [SQL] InputMetrics.recordsRead reports CachedBatch count 
instead of row count for cached DataFrame reads
                 Key: SPARK-55523
                 URL: https://issues.apache.org/jira/browse/SPARK-55523
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.5.5
         Environment: Spark 3.5.5, any storage level (MEMORY_ONLY, DISK_ONLY, 
etc.)
            Reporter: Kaiyu Zhao


Problem

When reading from a cached DataFrame (created via DataFrame.persist() or 
DataFrame.cache()), the TaskMetrics.inputMetrics.recordsRead metric reports the 
number of CachedBatch objects instead of the actual number of rows processed.

Example:
val df = spark.range(3605).toDF("id")
df.persist(StorageLevel.DISK_ONLY)
df.count() // Materialize cache

df.write.parquet("/tmp/output") // Writes 3605 rows

Expected: InputRecordsRead = 3605 (actual rows)
Actual: InputRecordsRead = 1 (number of CachedBatch objects)

This makes metrics misleading for monitoring, debugging, and performance 
analysis.

Root Cause

1. When DataFrame.persist() is called, Spark converts rows into columnar 
CachedBatch objects (each batch contains ~1000-10000 rows)
2. The underlying RDD becomes RDD[CachedBatch]
3. In RDD.getOrCompute() (line ~398), when reading from cache, it calls 
incRecordsRead(1) for each element in the cached iterator
4. Since each element is a CachedBatch (not a row), it counts batches instead 
of rows

Proposed Solution

Update InMemoryTableScanExec to track actual row counts:

1. In doExecute() and doExecuteColumnar(), add 
inputMetrics.incRecordsRead(batch.numRows) when iterating over cached batches
2. Remove the misleading incRecordsRead(1) per-element counting in 
RDD.getOrCompute() for cache hits to avoid double-counting

Impact

- Scope: Only affects DataFrame cache (via persist()/cache())
- Benefit: Accurate metrics for cached DataFrame reads  
- Compatibility: Changes reported metric values, but makes them accurate

Files to Modify

1. core/src/main/scala/org/apache/spark/rdd/RDD.scala
2. 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
3. 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 (tests)

I have a working implementation with tests and can provide a patch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to