abhita commented on code in PR #17031:
URL: https://github.com/apache/datafusion/pull/17031#discussion_r2356725320
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -158,33 +159,165 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for
DefaultListFilesCache {
}
}
+/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct.
+struct DefaultFilesMetadataCacheState {
+ lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
+ memory_limit: usize,
+ memory_used: usize,
+}
+
+impl DefaultFilesMetadataCacheState {
+ fn new(memory_limit: usize) -> Self {
+ Self {
+ lru_queue: LruQueue::new(),
+ memory_limit,
+ memory_used: 0,
+ }
+ }
+
+ /// Returns the respective entry from the cache, if it exists and the
`size` and `last_modified`
+ /// properties from [`ObjectMeta`] match.
+ /// If the entry exists, it becomes the most recently used.
+ fn get(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
+ self.lru_queue
+ .get(&k.location)
+ .map(|(object_meta, metadata)| {
+ if object_meta.size != k.size
+ || object_meta.last_modified != k.last_modified
+ {
+ None
+ } else {
+ Some(Arc::clone(metadata))
+ }
+ })
+ .unwrap_or(None)
+ }
+
+ /// Checks if the metadata is currently cached (entry exists and the
`size` and `last_modified`
+ /// properties of [`ObjectMeta`] match).
+ /// The LRU queue is not updated.
+ fn contains_key(&self, k: &ObjectMeta) -> bool {
+ self.lru_queue
+ .peek(&k.location)
+ .map(|(object_meta, _)| {
+ object_meta.size == k.size && object_meta.last_modified ==
k.last_modified
+ })
+ .unwrap_or(false)
+ }
+
+ /// Adds a new key-value pair to cache, meaning LRU entries might be
evicted if required.
+ /// If the key is already in the cache, the previous metadata is returned.
+ /// If the size of the metadata is greater than the `memory_limit`, the
value is not inserted.
+ fn put(
+ &mut self,
+ key: ObjectMeta,
+ value: Arc<dyn FileMetadata>,
+ ) -> Option<Arc<dyn FileMetadata>> {
+ let value_size = value.memory_size();
+
+ // no point in trying to add this value to the cache if it cannot fit
entirely
+ if value_size > self.memory_limit {
+ return None;
+ }
+
+ // if the key is already in the cache, the old value is removed
+ let old_value = self.lru_queue.put(key.location.clone(), (key, value));
+ self.memory_used += value_size;
+ if let Some((_, ref old_metadata)) = old_value {
+ self.memory_used -= old_metadata.memory_size();
+ }
+
+ self.evict_entries();
+
+ old_value.map(|v| v.1)
+ }
+
+ /// Evicts entries from the LRU cache until `memory_used` is lower than
`memory_limit`.
+ fn evict_entries(&mut self) {
+ while self.memory_used > self.memory_limit {
+ if let Some(removed) = self.lru_queue.pop() {
+ let metadata: Arc<dyn FileMetadata> = removed.1 .1;
+ self.memory_used -= metadata.memory_size();
+ } else {
+ // cache is empty while memory_used > memory_limit, cannot
happen
+ debug_assert!(
+ false,
+ "cache is empty while memory_used > memory_limit, cannot
happen"
+ );
+ return;
+ }
+ }
+ }
+
+ /// Removes an entry from the cache and returns it, if it exists.
+ fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
+ if let Some((_, old_metadata)) = self.lru_queue.remove(&k.location) {
+ self.memory_used -= old_metadata.memory_size();
+ Some(old_metadata)
+ } else {
+ None
+ }
+ }
+
+ /// Returns the number of entries currently cached.
+ fn len(&self) -> usize {
+ self.lru_queue.len()
+ }
+
+ /// Removes all entries from the cache.
+ fn clear(&mut self) {
+ self.lru_queue.clear();
+ self.memory_used = 0;
+ }
+}
+
/// Collected file embedded metadata cache.
/// The metadata for some file is invalided when the file size or last
modification time have been
/// changed.
+/// The `memory_limit` passed in the constructor controls the maximum size of
the cache, which uses
+/// a Least Recently Used eviction algorithm.
/// Users should use the `get` and `put` methods. The `get_with_extra` and
`put_with_extra` methods
/// simply call `get` and `put`, respectively.
-#[derive(Default)]
pub struct DefaultFilesMetadataCache {
- metadata: DashMap<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
+ // the state is wrapped in a Mutex to ensure the operations are atomic
+ state: Mutex<DefaultFilesMetadataCacheState>,
+}
+
+impl DefaultFilesMetadataCache {
+ /// The `memory_limit` parameter controls the maximum size of the cache,
in bytes, using a Least
+ /// Recently Used eviction algorithm.
+ pub fn new(memory_limit: usize) -> Self {
+ Self {
+ state:
Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)),
+ }
+ }
+
+ /// Returns the size of the cached memory, in bytes.
+ pub fn memory_used(&self) -> usize {
+ let state = self.state.lock().unwrap();
+ state.memory_used
+ }
}
-impl FileMetadataCache for DefaultFilesMetadataCache {}
+impl FileMetadataCache for DefaultFilesMetadataCache {
+ fn cache_limit(&self) -> usize {
+ let state = self.state.lock().unwrap();
+ state.memory_limit
+ }
+
+ fn update_cache_limit(&self, limit: usize) {
+ let mut state = self.state.lock().unwrap();
+ state.memory_limit = limit;
+ state.evict_entries();
+ }
+}
impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for
DefaultFilesMetadataCache {
type Extra = ObjectMeta;
fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
- self.metadata
- .get(&k.location)
- .map(|s| {
- let (extra, metadata) = s.value();
- if extra.size != k.size || extra.last_modified !=
k.last_modified {
- None
- } else {
- Some(Arc::clone(metadata))
- }
- })
- .unwrap_or(None)
+ let mut state = self.state.lock().unwrap();
+ state.get(k)
Review Comment:
If maintaining the data-structure for the eviction was the key-factor here,
can we think of having a **standard/common data-structure like DashMap<> for
maintaining Cache-Entries and have a tracking data-structure for the
Keys?**(Cache structure agnostic of eviction policies)
This way, the code becomes much more extensible for plugging in custom
eviction policies in the future.
This would also give us the opportunity to explore locks over Keys rather
than the whole cache.
Any thoughts?
@nuno-faria @alamb
Referring to `liquid-cache` implementation as per :
https://github.com/XiangpengHao/liquid-cache/blob/d232270cfaf495ba257d748a51123673409f7c72/src/storage/src/cache/policies.rs#L85
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]