schenksj commented on issue #4695:
URL: 
https://github.com/apache/datafusion-comet/issues/4695#issuecomment-4883378862

   ### Update: initial implementation + benchmarks
   
   We've built an initial implementation of two of the three pieces in this 
proposal:
   
   - **Node-local fragment cache** — block-aligned, sitting behind the 
`object_store` API, with an
     in-memory tier (SIEVE eviction, single-flight miss dedup, coalesced 
fetches) and an on-disk SSD
     tier (region files, crc32c, wholesale region eviction, an admission gate 
that filters single-pass
     scan traffic). It relies on data-file immutability (ETag/version 
validation, no invalidation
     protocol). The cache core is storage-API-neutral (no 
`object_store`/`opendal`/engine deps) with a
     thin `object_store` adapter, so the same core can back the Delta/Iceberg 
paths later.
   - **Cache-affinity scheduling** — driver-side sticky file→host assignment 
exposed through
     `RDD.getPreferredLocations`, so repeat reads of a file route back to the 
executor that already
     cached it (turning per-host private caches into a coherent cluster cache).
   
   Both are opt-in / default-off, and correctness never depends on the cache or 
on locality, exactly
   as framed above. **Asynchronous prefetch (the third piece) is not 
implemented yet.**
   
   Below are cold-vs-warm numbers across scan / aggregation / join shapes, to 
gauge the effect.
   
   **Important setup caveat:** these were run on a small **MacBook Air** 
(`local[4]`) reading from
   **S3 over the public internet on commercial broadband** — so cold times 
include WAN round-trips and
   are *slower* than a co-located, in-region executor would see. The absolute 
numbers and speedups are
   therefore **illustrative, not datacenter-representative**; a real cluster 
with S3 in-region would
   show smaller (but still meaningful) ratios. Each scenario forces a genuine 
cold read (the cache is
   cleared), then times warm runs; `ColdFetch`/`WarmFetch` are bytes read from 
S3 per run (warm ≈ 0 =
   served from cache).
   
   **Incompressible data (random doubles, ~0.5 GB on disk):**
   
   ```
   Category Scenario                  Cold(ms)  Warm(ms)  Speedup  ColdFetch  
WarmFetch
   
------------------------------------------------------------------------------------
   scan     full scan (16 cols)          30519       503    60.7x    514.8MB    
  0.0MB
   scan     projection (1 col)            3594       147    24.4x     66.8MB    
  0.0MB
   scan     projection (4 cols)           9647       115    83.9x    146.8MB    
  0.0MB
   scan     filter (v2 < 0.25)            5410       119    45.5x     82.8MB    
  0.0MB
   agg      group-by aggregation          5472       158    34.6x     98.8MB    
  0.0MB
   agg      count distinct key            2885        77    37.5x     50.8MB    
  0.0MB
   join     join fact-dim                 5150       149    34.6x     82.8MB    
  0.0MB
   join     join + group-by               6319       176    35.9x     82.8MB    
  0.0MB
   
------------------------------------------------------------------------------------
   mean warm speedup: 44.6x across 8 scenarios
   ```
   
   **Compressible data (id-derived; Parquet dictionary/RLE crushes it to ~8 
MB):**
   
   ```
   Category Scenario                  Cold(ms)  Warm(ms)  Speedup  ColdFetch  
WarmFetch
   
------------------------------------------------------------------------------------
   scan     full scan (16 cols)           1991        81    24.6x      7.8MB    
  0.0MB
   scan     projection (1 col)             677        38    17.8x      7.8MB    
  0.0MB
   scan     projection (4 cols)           1264        48    26.3x      7.8MB    
  0.0MB
   scan     filter (v2 < 0.25)             730        38    19.2x      7.8MB    
  0.0MB
   agg      group-by aggregation           852       124     6.9x      7.8MB    
  0.0MB
   agg      count distinct key             599        68     8.8x      7.8MB    
  0.0MB
   join     join fact-dim                  736        66    11.2x      7.8MB    
  0.0MB
   join     join + group-by                771       101     7.6x      7.8MB    
  0.0MB
   
------------------------------------------------------------------------------------
   mean warm speedup: 15.3x across 8 scenarios
   ```
   
   A few observations:
   
   - The cache serves real data volume (full scan ~515 MB cold → ~0 warm) and 
only fetches the blocks
     a query touches (a 1-column projection reads ~67 MB vs ~515 MB for the 
full 16-column scan).
   - Even the tiny compressed file still shows a large speedup — the cold cost 
over object storage is
     dominated by request/round-trip latency, so the cache helps for the 
compressed data real tables
     actually have, not just large files.
   - Aggregation/join gains are lower than pure scans because shuffle/hash 
compute is a fixed cost the
     cache doesn't touch — a useful signal about where scan-side caching pays 
off most.
   
   Happy to share the detailed design document (architecture, injection points, 
config surface,
   metrics, test plan) and to break the work into per-phase child issues as 
suggested above.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to