nuno-faria commented on code in PR #22613:
URL: https://github.com/apache/datafusion/pull/22613#discussion_r3329221004


##########
datafusion/execution/src/cache/mod.rs:
##########


Review Comment:
   Does it still make sense to have a `CacheAccessor`? We could move these 
methods to `Cache` and have just one trait.



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -380,23 +333,21 @@ impl CacheManager {
                 }
                 Some(Arc::clone(lfc))
             }
-            None if config.list_files_cache_limit > 0 => {
-                let lfc: Arc<dyn ListFilesCache> = 
Arc::new(DefaultListFilesCache::new(
+            None if config.list_files_cache_limit > 0 => Some(Arc::new(
+                DefaultCache::<TableScopedPath, CachedFileList>::with_ttl(
                     config.list_files_cache_limit,
                     config.list_files_cache_ttl,
-                ));
-                Some(lfc)
-            }
+                )
+                .with_name("DefaultListFilesCache"),
+            )),
             _ => None,
         };

Review Comment:
   Can the creation of these two caches follow the pattern used by 
`file_metadata_cache`? I think it would make it easier to understand.



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -15,31 +15,59 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::cache::CacheAccessor;
-use crate::cache::DefaultListFilesCache;
-use crate::cache::file_statistics_cache::{
-    DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DefaultFileStatisticsCache,
-    DefaultFilesMetadataCache,
-};
-use crate::cache::list_files_cache::ListFilesEntry;
-use crate::cache::list_files_cache::TableScopedPath;
-use datafusion_common::TableReference;
+use crate::cache::default_cache::DefaultCache;
+use crate::cache::{Cache, Value};
 use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
 use datafusion_common::stats::Precision;
+use datafusion_common::{HashMap, TableReference};
 use datafusion_common::{Result, Statistics};
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use object_store::ObjectMeta;
 use object_store::path::Path;
 use std::any::Any;
-use std::collections::HashMap;
-use std::fmt::{Debug, Formatter};
+use std::fmt::{Debug, Display, Formatter};
 use std::ops::Deref;
 use std::sync::Arc;
 use std::time::Duration;
 
-pub use super::list_files_cache::{
-    DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
-};
+/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap.
+pub fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
+    let mut size = object_meta.location.as_ref().len();
+
+    if let Some(e) = &object_meta.e_tag {
+        size += e.len();
+    }
+    if let Some(v) = &object_meta.version {
+        size += v.len();
+    }
+
+    size
+}
+
+/// Each entry is scoped to its use within a specific table so that the cache
+/// can differentiate between identical paths in different tables, and
+/// table-level cache invalidation.
+#[derive(PartialEq, Eq, Hash, Clone, Debug)]
+pub struct TableScopedPath {

Review Comment:
   What do you think about moving `TableScopedPath` to `mod.rs`?



##########
datafusion/execution/src/cache/mod.rs:
##########
@@ -78,3 +82,83 @@ pub trait CacheAccessor<K, V>: Send + Sync {
     /// Return the cache name.
     fn name(&self) -> String;
 }
+
+/// A managed cache with capacity, expiration, and introspection policies.
+///
+/// Keys must implement [`Key`] and values must implement [`Value`] so
+/// the implementation can account for heap usage when enforcing the cache 
limit.
+///
+pub trait Cache<K: Key, V: Value>: CacheAccessor<K, V> {
+    /// Current memory budget, in bytes.
+    fn cache_limit(&self) -> usize;
+
+    /// Change the memory budget in bytes.
+    fn update_cache_limit(&self, limit: usize);
+
+    /// Time-to-live applied to newly inserted entries, or `None` if entries
+    /// never expire on their own.
+    fn cache_ttl(&self) -> Option<Duration>;
+
+    /// Change the TTL applied to subsequent inserts.
+    fn update_cache_ttl(&self, _ttl: Option<Duration>);
+
+    /// Invalidate every entry associated with `table_ref`.
+    fn drop_table_entries(
+        &self,
+        _table_ref: &Option<TableReference>,
+    ) -> datafusion_common::Result<()>;

Review Comment:
   nit: No need for the underscore in the arguments.



##########
datafusion/execution/src/cache/mod.rs:
##########
@@ -78,3 +82,83 @@ pub trait CacheAccessor<K, V>: Send + Sync {
     /// Return the cache name.
     fn name(&self) -> String;
 }
+
+/// A managed cache with capacity, expiration, and introspection policies.
+///
+/// Keys must implement [`Key`] and values must implement [`Value`] so
+/// the implementation can account for heap usage when enforcing the cache 
limit.
+///
+pub trait Cache<K: Key, V: Value>: CacheAccessor<K, V> {
+    /// Current memory budget, in bytes.
+    fn cache_limit(&self) -> usize;
+
+    /// Change the memory budget in bytes.
+    fn update_cache_limit(&self, limit: usize);
+
+    /// Time-to-live applied to newly inserted entries, or `None` if entries
+    /// never expire on their own.
+    fn cache_ttl(&self) -> Option<Duration>;
+
+    /// Change the TTL applied to subsequent inserts.
+    fn update_cache_ttl(&self, _ttl: Option<Duration>);
+
+    /// Invalidate every entry associated with `table_ref`.
+    fn drop_table_entries(
+        &self,
+        _table_ref: &Option<TableReference>,
+    ) -> datafusion_common::Result<()>;
+
+    /// Snapshot of all current entries with per-entry metadata (size, hits,
+    /// expiration) for diagnostics and observability.
+    fn list_entries(&self) -> HashMap<K, CacheEntryInfo<V>>;
+}
+
+/// Key type for entries stored in a [`Cache`].
+pub trait Key: Clone + Eq + Hash + Send + Sync + Debug {
+    /// Size of the key in bytes, used for cache memory accounting.
+    fn size(&self) -> usize;
+
+    /// Table this key is associated with, or `None` if the key is not
+    /// table-scoped.
+    fn table_ref(&self) -> Option<&TableReference>;
+}
+
+/// Value type for entries stored in a [`Cache`].
+pub trait Value: Clone + Send + Sync {

Review Comment:
   What about changing to `CacheKey` and `CacheValue`? Would make it less 
ambiguous.



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -380,23 +333,21 @@ impl CacheManager {
                 }
                 Some(Arc::clone(lfc))
             }
-            None if config.list_files_cache_limit > 0 => {
-                let lfc: Arc<dyn ListFilesCache> = 
Arc::new(DefaultListFilesCache::new(
+            None if config.list_files_cache_limit > 0 => Some(Arc::new(
+                DefaultCache::<TableScopedPath, CachedFileList>::with_ttl(
                     config.list_files_cache_limit,
                     config.list_files_cache_ttl,
-                ));
-                Some(lfc)
-            }
+                )
+                .with_name("DefaultListFilesCache"),
+            )),
             _ => None,
         };
 
         let file_metadata_cache = config
             .file_metadata_cache
             .as_ref()
             .map(Arc::clone)
-            .unwrap_or_else(|| {
-                
Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
-            });
+            .unwrap_or_else(|| 
Arc::new(DefaultCache::new(config.metadata_cache_limit)));

Review Comment:
   Should it be named with `with_name` like the other two?



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -78,37 +106,23 @@ impl CachedFileMetadata {
     }
 }
 
-/// A cache for file statistics and orderings.
-///
-/// This cache stores [`CachedFileMetadata`] which includes:
-/// - File metadata for validation (size, last_modified)
-/// - Statistics for the file
-/// - Ordering information for the file
-///
-/// If enabled via [`CacheManagerConfig::with_file_statistics_cache`] this
-/// cache avoids inferring the same file statistics repeatedly during the
-/// session lifetime.
-///
-/// The typical usage pattern is:
-/// 1. Call `get(path)` to check for cached value
-/// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
-/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
-///
-/// See [`crate::runtime_env::RuntimeEnv`] for more details
-pub trait FileStatisticsCache:
-    CacheAccessor<TableScopedPath, CachedFileMetadata>
-{
-    /// Cache memory limit in bytes.
-    fn cache_limit(&self) -> usize;
+impl Value for CachedFileMetadata {
+    fn size(&self) -> usize {
+        DFHeapSize::heap_size(self, &mut DFHeapSizeCtx::default())
+    }
+}
 
-    /// Updates the cache with a new memory limit in bytes.
-    fn update_cache_limit(&self, limit: usize);
+pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
 
-    /// Retrieves the information about the entries currently cached.
-    fn list_entries(&self) -> HashMap<TableScopedPath, 
FileStatisticsCacheEntry>;
+pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; // Infinite
 
-    fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> 
Result<()>;
-}
+pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 
20MiB
+
+pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M

Review Comment:
   nit: I find it a bit confusing to read the `cache_manager.rs` file. Could we 
move the consts to the top, then the pub types, and the structs+impls that are 
used in the types?



##########
datafusion/execution/src/cache/default_cache.rs:
##########
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+use datafusion_common::TableReference;
+use datafusion_common::instant::Instant;
+use datafusion_common::{HashMap, Result};
+
+use crate::cache::lru_queue::LruQueue;
+use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, Key, Value};
+
+/// Source of the current time used by a [`DefaultCache`] when applying TTLs.
+pub trait TimeProvider: Send + Sync {
+    /// Return the current instant.
+    fn now(&self) -> Instant;
+}
+
+/// [`TimeProvider`] backed by [`Instant::now`].
+///
+/// This is the default time source used by [`DefaultCache`]
+#[derive(Debug, Default)]
+pub struct SystemTimeProvider;
+
+impl TimeProvider for SystemTimeProvider {
+    fn now(&self) -> Instant {
+        Instant::now()
+    }
+}
+
+#[derive(Clone)]
+struct ValueEntry<V: Value> {
+    value: V,
+    expires: Option<Instant>,
+}
+
+struct DefaultCacheState<K: Key, V: Value> {
+    lru_queue: LruQueue<K, ValueEntry<V>>,
+    hits: HashMap<K, usize>,
+    memory_limit: usize,
+    memory_used: usize,
+    ttl: Option<Duration>,
+}
+
+impl<K: Key, V: Value> DefaultCacheState<K, V> {
+    fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
+        Self {
+            lru_queue: LruQueue::new(),
+            hits: HashMap::new(),
+            memory_limit,
+            memory_used: 0,
+            ttl,
+        }
+    }
+
+    fn get(&mut self, key: &K, now: Instant) -> Option<V> {
+        let entry = self.lru_queue.get(key)?;
+        if let Some(exp) = entry.expires
+            && now > exp
+        {
+            self.remove(key);
+            return None;
+        }
+        let value = entry.value.clone();
+        *self.hits.entry(key.clone()).or_insert(0) += 1;
+        Some(value)
+    }
+
+    fn contains_key(&mut self, key: &K, now: Instant) -> bool {
+        let Some(entry) = self.lru_queue.peek(key) else {
+            return false;
+        };
+        match entry.expires {
+            Some(exp) if now > exp => {
+                self.remove(key);
+                false
+            }
+            _ => true,
+        }
+    }
+
+    fn put(&mut self, key: &K, value: V, now: Instant) -> Option<V> {
+        let value_size = value.size();
+
+        if value_size == 0 {
+            return None;
+        }
+
+        let key_size = key.size();
+        let total_size = key_size + value_size;
+
+        if total_size > self.memory_limit {
+            // Remove potential stale entry
+            let result = self.remove(key);
+            if let Some(stale_entry) = &result {
+                self.memory_used -= key_size;
+                self.memory_used -= stale_entry.size();
+            }
+            return result;
+        }
+
+        let expires = self.ttl.map(|ttl| now + ttl);
+        let entry = ValueEntry { value, expires };
+
+        self.memory_used += total_size;
+        self.hits.insert(key.clone(), 0);
+        let old = self.lru_queue.put(key.clone(), entry);
+        if let Some(old_entry) = &old {
+            self.memory_used -= key_size;
+            self.memory_used -= old_entry.value.size();
+        }
+
+        self.evict_entries();
+
+        old.map(|v| v.value)
+    }
+
+    fn remove(&mut self, key: &K) -> Option<V> {
+        let entry = self.lru_queue.remove(key)?;
+        self.memory_used -= key.size();
+        self.memory_used -= entry.value.size();
+        self.hits.remove(key);
+        Some(entry.value)
+    }
+
+    fn evict_entries(&mut self) {
+        while self.memory_used > self.memory_limit {
+            let Some((evicted_key, evicted)) = self.lru_queue.pop() else {
+                // cache is empty while memory_used > memory_limit, cannot 
happen
+                log::error!(
+                    "DefaultCache memory accounting bug: memory_used={} but 
cache is empty",
+                    self.memory_used
+                );
+                debug_assert!(false, "memory_used > limit with empty cache");
+                self.memory_used = 0;
+                return;
+            };
+            self.memory_used -= evicted_key.size();
+            self.memory_used -= evicted.value.size();
+            self.hits.remove(&evicted_key);
+        }
+    }
+
+    fn clear(&mut self) {
+        self.lru_queue.clear();
+        self.hits.clear();
+        self.memory_used = 0;
+    }
+}
+
+/// In-memory [`Cache`] with an LRU eviction policy, byte-based memory limit,
+/// and optional per-entry TTL.
+///
+/// Entries are evicted in least-recently-used order whenever an insert would
+/// push `memory_used` above `memory_limit`. Inserts whose own size exceeds the
+/// limit are rejected (and any prior entry under the same key is removed).
+/// When a TTL is configured, the expiration is stamped onto each entry at
+/// insertion time and checked lazily on access.
+pub struct DefaultCache<K: Key, V: Value> {
+    state: Mutex<DefaultCacheState<K, V>>,
+    time_provider: Arc<dyn TimeProvider>,
+    name: String,
+}
+
+impl<K: Key, V: Value> DefaultCache<K, V> {
+    /// Create a cache with the given memory budget in bytes and no TTL.
+    pub fn new(memory_limit: usize) -> Self {
+        Self::with_ttl(memory_limit, None)
+    }
+
+    /// Create a cache with the given memory budget in bytes and an optional
+    /// TTL applied to every newly inserted entry.
+    pub fn with_ttl(memory_limit: usize, ttl: Option<Duration>) -> Self {
+        Self {
+            state: Mutex::new(DefaultCacheState::new(memory_limit, ttl)),
+            time_provider: Arc::new(SystemTimeProvider),
+            name: "DefaultCache".to_string(),
+        }
+    }

Review Comment:
   I think its better to either make `with_ttl` a setter like `with_name` or 
change the name to `new_with_ttl` since it creates a new value.



##########
datafusion/execution/src/cache/list_files_cache.rs:
##########
@@ -927,14 +479,10 @@ mod tests {
     #[test]
     fn test_ttl_expiration_in_get() {
         let ttl = Duration::from_millis(100);
-        let cache = DefaultListFilesCache::new(10000, Some(ttl));
+        let cache = DefaultCache::with_ttl(1000, Some(ttl));

Review Comment:
   Any reason to change the size in this test?



##########
datafusion/execution/src/cache/cache_manager.rs:
##########


Review Comment:
   This doesn't appear to be used anymore. Even though it's public I think it 
would be ok to delete it.



##########
datafusion/execution/src/cache/default_cache.rs:
##########
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+use datafusion_common::TableReference;
+use datafusion_common::instant::Instant;
+use datafusion_common::{HashMap, Result};
+
+use crate::cache::lru_queue::LruQueue;
+use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, Key, Value};
+
+/// Source of the current time used by a [`DefaultCache`] when applying TTLs.
+pub trait TimeProvider: Send + Sync {
+    /// Return the current instant.
+    fn now(&self) -> Instant;
+}
+
+/// [`TimeProvider`] backed by [`Instant::now`].
+///
+/// This is the default time source used by [`DefaultCache`]
+#[derive(Debug, Default)]
+pub struct SystemTimeProvider;
+
+impl TimeProvider for SystemTimeProvider {
+    fn now(&self) -> Instant {
+        Instant::now()
+    }
+}
+
+#[derive(Clone)]
+struct ValueEntry<V: Value> {
+    value: V,
+    expires: Option<Instant>,
+}
+
+struct DefaultCacheState<K: Key, V: Value> {
+    lru_queue: LruQueue<K, ValueEntry<V>>,
+    hits: HashMap<K, usize>,
+    memory_limit: usize,
+    memory_used: usize,
+    ttl: Option<Duration>,
+}
+
+impl<K: Key, V: Value> DefaultCacheState<K, V> {
+    fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
+        Self {
+            lru_queue: LruQueue::new(),
+            hits: HashMap::new(),
+            memory_limit,
+            memory_used: 0,
+            ttl,
+        }
+    }
+
+    fn get(&mut self, key: &K, now: Instant) -> Option<V> {
+        let entry = self.lru_queue.get(key)?;
+        if let Some(exp) = entry.expires
+            && now > exp
+        {
+            self.remove(key);
+            return None;
+        }
+        let value = entry.value.clone();
+        *self.hits.entry(key.clone()).or_insert(0) += 1;
+        Some(value)
+    }
+
+    fn contains_key(&mut self, key: &K, now: Instant) -> bool {
+        let Some(entry) = self.lru_queue.peek(key) else {
+            return false;
+        };
+        match entry.expires {
+            Some(exp) if now > exp => {
+                self.remove(key);
+                false
+            }
+            _ => true,
+        }
+    }
+
+    fn put(&mut self, key: &K, value: V, now: Instant) -> Option<V> {
+        let value_size = value.size();
+
+        if value_size == 0 {

Review Comment:
   Better to add it to the doc in `DefaultCache`.



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -15,31 +15,61 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::cache::CacheAccessor;
-use crate::cache::DefaultListFilesCache;
-use crate::cache::file_statistics_cache::{
-    DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DefaultFileStatisticsCache,
-    DefaultFilesMetadataCache,
-};
-use crate::cache::list_files_cache::ListFilesEntry;
-use crate::cache::list_files_cache::TableScopedPath;
-use datafusion_common::TableReference;
+use crate::cache::cache::DefaultCache;
+use crate::cache::{Cache, Value};
 use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
 use datafusion_common::stats::Precision;
+use datafusion_common::{HashMap, TableReference};
 use datafusion_common::{Result, Statistics};
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use object_store::ObjectMeta;
 use object_store::path::Path;
 use std::any::Any;
-use std::collections::HashMap;
-use std::fmt::{Debug, Formatter};
+use std::fmt::{Debug, Display, Formatter};
 use std::ops::Deref;
 use std::sync::Arc;
 use std::time::Duration;
 
-pub use super::list_files_cache::{
-    DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
-};
+/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap.
+pub fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {

Review Comment:
   I'm also not a fan of this random function here. Could we at least move it 
near `impl Value for CachedFileList`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to