andygrove opened a new issue, #4781:
URL: https://github.com/apache/datafusion-comet/issues/4781

   ## Is your feature request related to a problem or challenge?
   
   PR #4591 adds native support for Spark's in-memory cache via 
`ArrowCachedBatchSerializer` and `CometInMemoryTableScanExec`. The goal is to 
land the functional version first. This issue tracks performance optimizations 
identified during review that can follow in later PRs.
   
   Benchmark from #4591 (`CometInMemoryCacheBenchmark`, 5M-row cached table, 
Apple M3 Ultra, JDK 17, Spark 3.5) showed the native path is about 1.5x faster 
on a full repeated scan but only about 1.1x on a selective filter, which 
suggests there is headroom, especially on the read path.
   
   ## Describe the potential solution
   
   ### Read path (runs on every cached scan, highest impact)
   
   1. **Hoist `UnsafeProjection.create` out of the per-batch loop.** In 
`ArrowCachedBatchSerializer.convertCachedBatchToInternalRow`, 
`UnsafeProjection.create(...)` is currently constructed inside the `flatMap` 
over decoded batches, so it recompiles Janino codegen once per cached batch. It 
should be built once per partition (inside `mapPartitions`, outside `flatMap`).
   
   2. **Reduce the per-scan deep copy.** `CometInMemoryTableScanExec` sets 
`arrow_ffi_safe = false`, so `scan.rs` deep-copies every column of every 
surviving batch on each scan. This is correct today because 
`ArrowReaderIterator` owns and closes each decoded batch, but it is the main 
structural cost on the read path and is the likely reason the selective-filter 
benchmark only reached ~1.1x. Removing it requires reworking the decode path to 
transfer ownership to native rather than closing the batch on the JVM side. 
This is a larger, riskier change and should be scoped carefully.
   
   3. **Optional uncompressed cache format.** `Utils.decodeBatches` 
decompresses (LZ4 by default) on every scan. A knob to store the Arrow bytes 
uncompressed would trade memory for faster repeated reads.
   
   ### Write path (runs once per `cache()`, but slow for large caches)
   
   4. **Specialize `computeStats` per column and stop boxing.** `readValue` 
returns `Any`, so every primitive read boxes, and both `compare(dt, ...)` and 
`tracksBounds(dt)` re-dispatch on `DataType` per row even though the type is 
invariant per column. Dispatch on the type once per column, then run a 
specialized primitive loop that keeps min/max in primitive locals.
   
   5. **Copy-on-update for string stats.** `readValue` does 
`col.getUTF8String(rowId).copy()` for every non-null string, but only the min 
and max are kept. Compare against the current bound using the borrowed 
`UTF8String` and copy only when it becomes the new lower or upper bound.
   
   ### Suggested priority
   
   1. Hoist `UnsafeProjection.create` (trivial, clear win).
   2. Specialize `computeStats` and add copy-on-update for strings (biggest 
build-time win).
   3. Investigate reducing the per-scan deep copy, with a scan-isolating 
benchmark variant to confirm the pruning benefit.
   
   ## Additional context
   
   Follow-on to #4591. See the review discussion there for full detail.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to