Kaiyu Zhao created SPARK-55523:
----------------------------------
Summary: [SQL] InputMetrics.recordsRead reports CachedBatch count
instead of row count for cached DataFrame reads
Key: SPARK-55523
URL: https://issues.apache.org/jira/browse/SPARK-55523
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.5.5
Environment: Spark 3.5.5, any storage level (MEMORY_ONLY, DISK_ONLY,
etc.)
Reporter: Kaiyu Zhao
Problem
When reading from a cached DataFrame (created via DataFrame.persist() or
DataFrame.cache()), the TaskMetrics.inputMetrics.recordsRead metric reports the
number of CachedBatch objects instead of the actual number of rows processed.
Example:
val df = spark.range(3605).toDF("id")
df.persist(StorageLevel.DISK_ONLY)
df.count() // Materialize cache
df.write.parquet("/tmp/output") // Writes 3605 rows
Expected: InputRecordsRead = 3605 (actual rows)
Actual: InputRecordsRead = 1 (number of CachedBatch objects)
This makes metrics misleading for monitoring, debugging, and performance
analysis.
Root Cause
1. When DataFrame.persist() is called, Spark converts rows into columnar
CachedBatch objects (each batch contains ~1000-10000 rows)
2. The underlying RDD becomes RDD[CachedBatch]
3. In RDD.getOrCompute() (line ~398), when reading from cache, it calls
incRecordsRead(1) for each element in the cached iterator
4. Since each element is a CachedBatch (not a row), it counts batches instead
of rows
Proposed Solution
Update InMemoryTableScanExec to track actual row counts:
1. In doExecute() and doExecuteColumnar(), add
inputMetrics.incRecordsRead(batch.numRows) when iterating over cached batches
2. Remove the misleading incRecordsRead(1) per-element counting in
RDD.getOrCompute() for cache hits to avoid double-counting
Impact
- Scope: Only affects DataFrame cache (via persist()/cache())
- Benefit: Accurate metrics for cached DataFrame reads
- Compatibility: Changes reported metric values, but makes them accurate
Files to Modify
1. core/src/main/scala/org/apache/spark/rdd/RDD.scala
2.
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
3.
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
(tests)
I have a working implementation with tests and can provide a patch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]