mkleen commented on code in PR #22613:
URL: https://github.com/apache/datafusion/pull/22613#discussion_r3329859401
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -15,31 +15,59 @@
// specific language governing permissions and limitations
// under the License.
-use crate::cache::CacheAccessor;
-use crate::cache::DefaultListFilesCache;
-use crate::cache::file_statistics_cache::{
- DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DefaultFileStatisticsCache,
- DefaultFilesMetadataCache,
-};
-use crate::cache::list_files_cache::ListFilesEntry;
-use crate::cache::list_files_cache::TableScopedPath;
-use datafusion_common::TableReference;
+use crate::cache::default_cache::DefaultCache;
+use crate::cache::{Cache, Value};
use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
use datafusion_common::stats::Precision;
+use datafusion_common::{HashMap, TableReference};
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use object_store::ObjectMeta;
use object_store::path::Path;
use std::any::Any;
-use std::collections::HashMap;
-use std::fmt::{Debug, Formatter};
+use std::fmt::{Debug, Display, Formatter};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
-pub use super::list_files_cache::{
- DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
-};
+/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap.
+pub fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
+ let mut size = object_meta.location.as_ref().len();
+
+ if let Some(e) = &object_meta.e_tag {
+ size += e.len();
+ }
+ if let Some(v) = &object_meta.version {
+ size += v.len();
+ }
+
+ size
+}
+
+/// Each entry is scoped to its use within a specific table so that the cache
+/// can differentiate between identical paths in different tables, and
+/// table-level cache invalidation.
+#[derive(PartialEq, Eq, Hash, Clone, Debug)]
+pub struct TableScopedPath {
Review Comment:
OK.
##########
datafusion/execution/src/cache/default_cache.rs:
##########
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+use datafusion_common::TableReference;
+use datafusion_common::instant::Instant;
+use datafusion_common::{HashMap, Result};
+
+use crate::cache::lru_queue::LruQueue;
+use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, Key, Value};
+
+/// Source of the current time used by a [`DefaultCache`] when applying TTLs.
+pub trait TimeProvider: Send + Sync {
+ /// Return the current instant.
+ fn now(&self) -> Instant;
+}
+
+/// [`TimeProvider`] backed by [`Instant::now`].
+///
+/// This is the default time source used by [`DefaultCache`]
+#[derive(Debug, Default)]
+pub struct SystemTimeProvider;
+
+impl TimeProvider for SystemTimeProvider {
+ fn now(&self) -> Instant {
+ Instant::now()
+ }
+}
+
+#[derive(Clone)]
+struct ValueEntry<V: Value> {
+ value: V,
+ expires: Option<Instant>,
+}
+
+struct DefaultCacheState<K: Key, V: Value> {
+ lru_queue: LruQueue<K, ValueEntry<V>>,
+ hits: HashMap<K, usize>,
+ memory_limit: usize,
+ memory_used: usize,
+ ttl: Option<Duration>,
+}
+
+impl<K: Key, V: Value> DefaultCacheState<K, V> {
+ fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
+ Self {
+ lru_queue: LruQueue::new(),
+ hits: HashMap::new(),
+ memory_limit,
+ memory_used: 0,
+ ttl,
+ }
+ }
+
+ fn get(&mut self, key: &K, now: Instant) -> Option<V> {
+ let entry = self.lru_queue.get(key)?;
+ if let Some(exp) = entry.expires
+ && now > exp
+ {
+ self.remove(key);
+ return None;
+ }
+ let value = entry.value.clone();
+ *self.hits.entry(key.clone()).or_insert(0) += 1;
+ Some(value)
+ }
+
+ fn contains_key(&mut self, key: &K, now: Instant) -> bool {
+ let Some(entry) = self.lru_queue.peek(key) else {
+ return false;
+ };
+ match entry.expires {
+ Some(exp) if now > exp => {
+ self.remove(key);
+ false
+ }
+ _ => true,
+ }
+ }
+
+ fn put(&mut self, key: &K, value: V, now: Instant) -> Option<V> {
+ let value_size = value.size();
+
+ if value_size == 0 {
+ return None;
+ }
+
+ let key_size = key.size();
+ let total_size = key_size + value_size;
+
+ if total_size > self.memory_limit {
+ // Remove potential stale entry
+ let result = self.remove(key);
+ if let Some(stale_entry) = &result {
+ self.memory_used -= key_size;
+ self.memory_used -= stale_entry.size();
+ }
+ return result;
+ }
+
+ let expires = self.ttl.map(|ttl| now + ttl);
+ let entry = ValueEntry { value, expires };
+
+ self.memory_used += total_size;
+ self.hits.insert(key.clone(), 0);
+ let old = self.lru_queue.put(key.clone(), entry);
+ if let Some(old_entry) = &old {
+ self.memory_used -= key_size;
+ self.memory_used -= old_entry.value.size();
+ }
+
+ self.evict_entries();
+
+ old.map(|v| v.value)
+ }
+
+ fn remove(&mut self, key: &K) -> Option<V> {
+ let entry = self.lru_queue.remove(key)?;
+ self.memory_used -= key.size();
+ self.memory_used -= entry.value.size();
+ self.hits.remove(key);
+ Some(entry.value)
+ }
+
+ fn evict_entries(&mut self) {
+ while self.memory_used > self.memory_limit {
+ let Some((evicted_key, evicted)) = self.lru_queue.pop() else {
+ // cache is empty while memory_used > memory_limit, cannot
happen
+ log::error!(
+ "DefaultCache memory accounting bug: memory_used={} but
cache is empty",
+ self.memory_used
+ );
+ debug_assert!(false, "memory_used > limit with empty cache");
+ self.memory_used = 0;
+ return;
+ };
+ self.memory_used -= evicted_key.size();
+ self.memory_used -= evicted.value.size();
+ self.hits.remove(&evicted_key);
+ }
+ }
+
+ fn clear(&mut self) {
+ self.lru_queue.clear();
+ self.hits.clear();
+ self.memory_used = 0;
+ }
+}
+
+/// In-memory [`Cache`] with an LRU eviction policy, byte-based memory limit,
+/// and optional per-entry TTL.
+///
+/// Entries are evicted in least-recently-used order whenever an insert would
+/// push `memory_used` above `memory_limit`. Inserts whose own size exceeds the
+/// limit are rejected (and any prior entry under the same key is removed).
+/// When a TTL is configured, the expiration is stamped onto each entry at
+/// insertion time and checked lazily on access.
+pub struct DefaultCache<K: Key, V: Value> {
+ state: Mutex<DefaultCacheState<K, V>>,
+ time_provider: Arc<dyn TimeProvider>,
+ name: String,
+}
+
+impl<K: Key, V: Value> DefaultCache<K, V> {
+ /// Create a cache with the given memory budget in bytes and no TTL.
+ pub fn new(memory_limit: usize) -> Self {
+ Self::with_ttl(memory_limit, None)
+ }
+
+ /// Create a cache with the given memory budget in bytes and an optional
+ /// TTL applied to every newly inserted entry.
+ pub fn with_ttl(memory_limit: usize, ttl: Option<Duration>) -> Self {
+ Self {
+ state: Mutex::new(DefaultCacheState::new(memory_limit, ttl)),
+ time_provider: Arc::new(SystemTimeProvider),
+ name: "DefaultCache".to_string(),
+ }
+ }
Review Comment:
OK.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]