zhuqi-lucas opened a new issue, #22553:
URL: https://github.com/apache/datafusion/issues/22553

   ### Is your feature request related to a problem or challenge?
   
   `FileMetadataCache` (introduced in #16971, default-enabled per the #17000 
epic) does not coordinate between concurrent callers that miss on the same 
`Path`. The `fetch_metadata` path in 
`datafusion/datasource-parquet/src/metadata.rs` is:
   
   1. `cache.get(path)` — return if hit and `is_valid_for`
   2. on miss: drive `ParquetMetaDataPushDecoder` (one or more S3 range reads)
   3. `cache.put(path, entry)`
   
   If N concurrent queries against the same file arrive while the cache is cold 
for that key, **all N** independently do step 2 — N S3 metadata round-trips for 
what should be one. The first to finish `put`s, the rest finish their own loads 
and overwrite. Net effect on production hot-pod workloads (many concurrent 
queries against the same parquet file right after a rolling deploy, or any 
cold-cache window) is a thundering herd of duplicate S3 metadata fetches.
   
   The current `DefaultFilesMetadataCache` is `Mutex<HashMap + LRU>` with sync 
`get/put` 
([file_metadata_cache.rs](https://github.com/apache/datafusion/blob/main/datafusion/execution/src/cache/file_metadata_cache.rs));
 there is no in-flight tracking, and the `CacheAccessor` trait signature (sync 
get/put) cannot express a load-through semantic.
   
   ### Describe the solution you'd like
   
   Add a default-implemented async method on `FileMetadataCache` (and analogous 
on `FileStatisticsCache`) that lets implementors provide singleflight / 
load-through:
   
   ```rust
   pub trait FileMetadataCache: CacheAccessor<Path, CachedFileMetadataEntry> {
       // ... existing methods unchanged ...
   
       /// Load-through with coalescing semantics: if multiple concurrent 
callers
       /// miss the same key, an implementor may run `load` only once and share
       /// the result with the other waiters.
       ///
       /// The default implementation does NOT coalesce — it preserves current
       /// behavior so existing implementors do not need to change. Implementors
       /// backed by `moka::future::Cache::try_get_with`, a `dashmap` of
       /// `tokio::sync::broadcast` senders, etc. should override this.
       async fn get_or_try_load<F, Fut>(
           &self,
           key: &Path,
           current_meta: &ObjectMeta,
           load: F,
       ) -> Result<CachedFileMetadataEntry>
       where
           F: FnOnce() -> Fut + Send,
           Fut: Future<Output = Result<CachedFileMetadataEntry>> + Send,
       {
           if let Some(cached) = self.get(key) {
               if cached.is_valid_for(current_meta) {
                   return Ok(cached);
               }
           }
           let entry = load().await?;
           self.put(key, entry.clone());
           Ok(entry)
       }
   }
   ```
   
   Then `fetch_metadata` in `datafusion/datasource-parquet/src/metadata.rs` 
switches from the explicit `get` / `is_valid_for` / `put` dance to a single 
`get_or_try_load` call. Behavior unchanged for the default impl; deployments 
with a custom cache (e.g. moka-backed) get singleflight automatically.
   
   ### Describe alternatives I've considered
   
   1. **Implementor-side workaround.** Wrap `DefaultFilesMetadataCache` in a 
custom struct that adds its own pending-load map. Possible but means every 
deployment that cares re-implements the same dedup logic.
   2. **Sync-only with `block_on`.** Trying to express singleflight on a sync 
trait by `block_on`-ing inside `get/put` blocks a tokio worker thread per cache 
miss, which is a soundness hazard on small runtimes.
   3. **Wait for #18405 (Decoupling Cache and Eviction Strategies).** That 
issue addresses storage / eviction pluggability, not the load-through gap. The 
two are complementary; this proposal is a smaller and more focused addition.
   
   ### Additional context
   
   I'm working on adopting this cache in a deployment that today uses a 
moka-backed metadata cache with `try_get_with` (singleflight). For our workload 
(many concurrent queries against the same hot parquet file from a small pool of 
pods), losing the dedup is a real regression — a single cold-cache file becomes 
N concurrent S3 metadata reads for N concurrent queries.
   
   Happy to send the PR if the API shape looks reasonable. Tagging @nuno-faria 
and @alamb since they're closest to this code path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to