schenksj opened a new issue, #4695:
URL: https://github.com/apache/datafusion-comet/issues/4695

   > _Disclosure: this proposal was drafted with the help of an AI assistant. I 
have reviewed it, understand the proposed approach end-to-end, and stand behind 
it; a more detailed design document and a prototype direction exist to back it 
up. Sharing as a high-level proposal first, per the project's preference for a 
clear problem statement over an AI-generated code dump._
   
   ## What is the problem the feature request solves?
   
   When Comet reads Parquet (including via Delta and Iceberg native scans) from 
cloud object storage (S3/ADLS/GCS), I/O is the dominant cost for many 
workloads, and Comet currently leaves two well-known wins on the table:
   
   1. **No data caching.** Comet does not cache file data across queries. Every 
scan re-fetches the same bytes from object storage, even for repeated/iterative 
workloads (BI dashboards, repeated `MERGE`/`UPDATE` source reads, re-scans 
within a session). The only caching today is per-scan Parquet footer metadata, 
which is in-memory and not reused across queries.
   
   2. **No read-ahead.** Reads are strictly demand-driven — the decoder blocks 
on object-store latency rather than overlapping I/O with compute. (Notably, 
vanilla Spark does not prefetch Parquet either, so this is a net-new capability 
rather than parity work.)
   
   These are the same two levers that make commercial engines substantially 
faster on the same data and hardware (e.g. Databricks markets analogous 
capabilities as **Predictive I/O** and the **disk cache**). Comet has the right 
architecture to add them in a clean, format-agnostic way.
   
   ## Describe the potential solution
   
   Add an **opt-in "Scan I/O Acceleration"** capability to Comet's native 
scans, composed of three cooperating pieces:
   
   1. **Node-local fragment cache** — a bounded, persistent cache of raw file 
fragments on local disk/SSD, shared across queries on an executor. Because data 
files in Parquet/Delta/Iceberg are immutable, caching is coherence-free (no 
invalidation protocol needed).
   
   2. **Asynchronous prefetch** — read-ahead that overlaps object-store I/O 
with decode. Where the query's predicate/projection/limit are known, prefetch 
is **filter-aware** (only fetch what the scan will actually read); otherwise it 
falls back to simple sequential read-ahead. Prefetch is budget-bounded and 
provides natural backpressure (e.g. a satisfied `LIMIT` stops it).
   
   3. **Cache-affinity scheduling** — to make a node-local cache effective on 
cloud storage (where there are no block locations), tasks must prefer the 
executor/node that already holds the relevant data. Comet's native scans 
already use a Comet-owned RDD, so this can be implemented once and apply 
uniformly across formats.
   
   Design intent:
   - **Format coverage:** Parquet, Delta (native/kernel-read), and Iceberg 
(native). The bulk of the mechanism lives at the shared I/O layer, so most of 
it is implemented once and reused.
   - **Opt-in and safe:** disabled by default, enabled per-scan via config. 
**Correctness never depends on the cache or on locality** — a cache miss or a 
busy/dead preferred host only affects latency, never results.
   - **Incremental:** the three pieces deliver value independently and can land 
as a sequence of small, reviewable PRs. A reasonable order is: fragment cache → 
sequential prefetch → filter-aware prefetch → cache-affinity scheduling, with 
optional cross-cluster pre-warming later. Iceberg's *filter-aware* prefetch may 
depend on upstream `iceberg-rust` work and can come last.
   
   This issue is intended as a **proposal at a high level** to gauge interest 
and direction. A detailed design document (architecture, injection points, 
scheduling model, configuration surface, metrics, and a full test/benchmark 
plan) is available and can be shared as a follow-up design doc or discussion if 
the community is interested.
   
   ## Additional context
   
   - **Why Comet is well-positioned:** Comet already owns the native scan path 
and its scan RDD for Parquet, Delta, and Iceberg, and reads (except Iceberg) 
already converge on a single object-store I/O layer — so a shared cache + 
prefetch layer can be injected at very few points and cover all three formats. 
Cache-affinity scheduling fits Comet's existing plan-rewrite model (a 
driver-side scheduling hint), with the actual caching/prefetch living below the 
scan at the I/O layer.
   - **Prior art / inspiration:** the asynchronous-prefetch scheduling model 
can follow the well-tested bounded/credit-based design of Spark's shuffle block 
fetcher; node-local cache-affinity via `preferredLocations` is a proven pattern 
for cloud-backed scans.
   - **Out of scope (initially):** caching decoded/Arrow data (we cache raw 
file bytes), any cross-node distributed cache or coherency protocol 
(unnecessary given immutable files), and the non-native fallback scan path.
   - **Open questions for discussion:** appetite for an in-tree on-disk cache 
and any new dependency it implies; default block/cache sizing; interaction with 
dynamic allocation; and whether to start with a "buy" (existing caching 
library) vs. "build" cache backend.
   
   ### Related issues
   
   - #3736 ([EPIC] native_datafusion scan improvements) — this proposal could 
live under, or alongside, that umbrella.
   - #3817 (`native_datafusion` doesn't use all available parallelism for scan) 
— related I/O-throughput concern; asynchronous/parallel prefetch is in the same 
area.
   - #1204 (Only create one native plan for a query on an executor) — related, 
since a node-local cache benefits from executor-scoped (rather than per-task) 
state.
   
   Note on terminology: a previous "prefetch" toggle in Comet (see #918) 
referred to the legacy `native_comet` reader's parallel ranged-read behavior. 
The **asynchronous prefetch** proposed here is a distinct, new capability 
(read-ahead of file fragments ahead of demand, optionally filter-aware) and 
should not be confused with that flag.
   
   I'm happy to break this into child issues per phase and to contribute the 
implementation. Feedback on scope, naming, and whether a design doc or a GitHub 
Discussion is the preferred next step would be very welcome.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to