martin-g commented on code in PR #20047:
URL: https://github.com/apache/datafusion/pull/20047#discussion_r2799001039
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -36,37 +36,160 @@ pub use crate::cache::DefaultFilesMetadataCache;
/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)`
/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
///
-/// Uses DashMap for lock-free concurrent access.
+/// # Internal details
+///
+/// The `memory_limit` controls the maximum size of the cache, which uses a
+/// Least Recently Used eviction algorithm. When adding a new entry, if the
total
+/// size of the cached entries exceeds `memory_limit`, the least recently used
entries
+/// are evicted until the total size is lower than `memory_limit`.
+///
///
/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
#[derive(Default)]
pub struct DefaultFileStatisticsCache {
- cache: DashMap<Path, CachedFileMetadata>,
+ state: Mutex<DefaultFileStatisticsCacheState>,
+}
+
+impl DefaultFileStatisticsCache {
+ pub fn new(memory_limit: usize) -> Self {
+ Self {
+ state:
Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)),
+ }
+ }
+
+ /// Returns the size of the cached memory, in bytes.
+ pub fn memory_used(&self) -> usize {
+ let state = self.state.lock().unwrap();
+ state.memory_used
+ }
+}
+
+pub struct DefaultFileStatisticsCacheState {
+ lru_queue: LruQueue<Path, CachedFileMetadata>,
+ memory_limit: usize,
+ memory_used: usize,
+}
+
+pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
+
+impl Default for DefaultFileStatisticsCacheState {
+ fn default() -> Self {
+ Self {
+ lru_queue: LruQueue::new(),
+ memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
+ memory_used: 0,
+ }
+ }
}
+impl DefaultFileStatisticsCacheState {
+ fn new(memory_limit: usize) -> Self {
+ Self {
+ lru_queue: LruQueue::new(),
+ memory_limit,
+ memory_used: 0,
+ }
+ }
+ fn get(&mut self, key: &Path) -> Option<CachedFileMetadata> {
+ self.lru_queue.get(key).cloned()
+ }
+
+ fn put(
+ &mut self,
+ key: &Path,
+ value: CachedFileMetadata,
+ ) -> Option<CachedFileMetadata> {
+ let key_size = key.heap_size();
+ let entry_size = value.heap_size();
+
+ if entry_size + key_size > self.memory_limit {
+ // Remove potential stale entry
+ self.remove(key);
+ return None;
+ }
+
+ let old_value = self.lru_queue.put(key.clone(), value);
+ self.memory_used += entry_size;
+
+ if let Some(old_entry) = &old_value {
+ self.memory_used -= old_entry.heap_size();
+ } else {
+ self.memory_used += key.heap_size();
+ }
+
+ self.evict_entries();
+
+ old_value
+ }
+
+ fn remove(&mut self, k: &Path) -> Option<CachedFileMetadata> {
+ if let Some(old_entry) = self.lru_queue.remove(k) {
+ self.memory_used -= k.heap_size();
+ self.memory_used -= old_entry.heap_size();
+ Some(old_entry)
+ } else {
+ None
+ }
+ }
+
+ fn contains_key(&self, k: &Path) -> bool {
+ self.lru_queue.contains_key(k)
+ }
+
+ fn len(&self) -> usize {
+ self.lru_queue.len()
+ }
+
+ fn clear(&mut self) {
+ self.lru_queue.clear();
+ self.memory_used = 0;
+ }
+
+ fn evict_entries(&mut self) {
+ while self.memory_used > self.memory_limit {
+ if let Some(removed) = self.lru_queue.pop() {
+ self.memory_used -= removed.0.heap_size();
+ self.memory_used -= removed.1.heap_size();
+ } else {
+ // cache is empty while memory_used > memory_limit, cannot
happen
+ debug_assert!(
+ false,
+ "This is a bug! Please report it to the Apache DataFusion
developers"
+ );
Review Comment:
```suggestion
log::error!(
"File statistics cache memory accounting bug:
memory_used={} but cache is empty. \
Please report this to the Apache DataFusion
developers.",
self.memory_used
);
debug_assert!(
false,
"memory_used={} but cache is empty",
self.memory_used
);
self.memory_used = 0;
```
1) log an error, so that it is also noticeable in production
2) reset `memory_used`, so that the cache might be used again
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]