This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new a9e6d4be4a feat: Limit the memory used in the file metadata cache (#17031) a9e6d4be4a is described below commit a9e6d4be4a63c2180814a627d3b45cd0adf5b61c Author: Nuno Faria <nunofpfa...@gmail.com> AuthorDate: Thu Aug 7 11:29:12 2025 +0100 feat: Limit the memory used in the file metadata cache (#17031) * feat: Limit the memory used in the file metadata cache * Implement custom LRU queue * Use parking_lot::Mutex * Add is_empty unit test * Rename config to metadata_cache_limit, Set limit to 50M * Remove Option from the metadata memory limit * Add license to lru_queue * Update datafusion/execution/src/cache/cache_unit.rs Removes the previous unreachable!(). Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> * Fix syntax error * Fix clippy error --------- Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- datafusion/core/src/execution/context/mod.rs | 4 + datafusion/core/tests/sql/runtime_config.rs | 33 ++ datafusion/datasource-parquet/src/file_format.rs | 19 +- datafusion/datasource-parquet/src/reader.rs | 4 + datafusion/execution/src/cache/cache_manager.rs | 79 +++- datafusion/execution/src/cache/cache_unit.rs | 339 ++++++++++++-- datafusion/execution/src/cache/lru_queue.rs | 537 +++++++++++++++++++++++ datafusion/execution/src/cache/mod.rs | 1 + datafusion/execution/src/runtime_env.rs | 16 +- docs/source/user-guide/configs.md | 11 +- 10 files changed, 974 insertions(+), 69 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 9eb1bccc40..bbe33c5179 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1068,6 +1068,10 @@ impl SessionContext { builder.with_max_temp_directory_size(directory_size as u64) } "temp_directory" => builder.with_temp_file_path(value), + "metadata_cache_limit" => { + let limit = Self::parse_memory_limit(value)?; + builder.with_metadata_cache_limit(limit) + } _ => { return Err(DataFusionError::Plan(format!( "Unknown runtime configuration: {variable}" diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index b05c36e335..9627d7bccd 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -200,6 +200,39 @@ async fn test_max_temp_directory_size_enforcement() { ); } +#[tokio::test] +async fn test_test_metadata_cache_limit() { + let ctx = SessionContext::new(); + + let update_limit = async |ctx: &SessionContext, limit: &str| { + ctx.sql( + format!("SET datafusion.runtime.metadata_cache_limit = '{limit}'").as_str(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + }; + + let get_limit = |ctx: &SessionContext| -> usize { + ctx.task_ctx() + .runtime_env() + .cache_manager + .get_file_metadata_cache() + .cache_limit() + }; + + update_limit(&ctx, "100M").await; + assert_eq!(get_limit(&ctx), 100 * 1024 * 1024); + + update_limit(&ctx, "2G").await; + assert_eq!(get_limit(&ctx), 2 * 1024 * 1024 * 1024); + + update_limit(&ctx, "123K").await; + assert_eq!(get_limit(&ctx), 123 * 1024); +} + #[tokio::test] async fn test_unknown_runtime_config() { let ctx = SessionContext::new(); diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 7210cc09a0..d86ca630c8 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -449,17 +449,14 @@ impl FileFormat for ParquetFormat { // Use the CachedParquetFileReaderFactory when metadata caching is enabled if self.options.global.cache_metadata { - if let Some(metadata_cache) = - state.runtime_env().cache_manager.get_file_metadata_cache() - { - let store = state - .runtime_env() - .object_store(conf.object_store_url.clone())?; - let cached_parquet_read_factory = - Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache)); - source = - source.with_parquet_file_reader_factory(cached_parquet_read_factory); - } + let metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + let store = state + .runtime_env() + .object_store(conf.object_store_url.clone())?; + let cached_parquet_read_factory = + Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache)); + source = source.with_parquet_file_reader_factory(cached_parquet_read_factory); } if let Some(metadata_size_hint) = metadata_size_hint { diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 6ad9428770..648ed7c0bc 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -288,4 +288,8 @@ impl FileMetadata for CachedParquetMetaData { fn as_any(&self) -> &dyn Any { self } + + fn memory_size(&self) -> usize { + self.0.memory_size() + } } diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 37f1baa17f..a91e4f8458 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -39,12 +39,20 @@ pub trait FileMetadata: Any + Send + Sync { /// Returns the file metadata as [`Any`] so that it can be downcasted to a specific /// implementation. fn as_any(&self) -> &dyn Any; + + /// Returns the size of the metadata in bytes. + fn memory_size(&self) -> usize; } /// Cache to store file-embedded metadata. pub trait FileMetadataCache: CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta> { + // Returns the cache's memory limit in bytes. + fn cache_limit(&self) -> usize; + + // Updates the cache with a new memory limit in bytes. + fn update_cache_limit(&self, limit: usize); } impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> { @@ -65,30 +73,36 @@ impl Debug for dyn FileMetadataCache { } } -#[derive(Default, Debug)] +#[derive(Debug)] pub struct CacheManager { file_statistic_cache: Option<FileStatisticsCache>, list_files_cache: Option<ListFilesCache>, - file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + file_metadata_cache: Arc<dyn FileMetadataCache>, } impl CacheManager { pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> { - let mut manager = CacheManager::default(); - if let Some(cc) = &config.table_files_statistics_cache { - manager.file_statistic_cache = Some(Arc::clone(cc)) - } - if let Some(lc) = &config.list_files_cache { - manager.list_files_cache = Some(Arc::clone(lc)) - } - if let Some(mc) = &config.file_metadata_cache { - manager.file_metadata_cache = Some(Arc::clone(mc)); - } else { - manager.file_metadata_cache = - Some(Arc::new(DefaultFilesMetadataCache::default())); - } - - Ok(Arc::new(manager)) + let file_statistic_cache = + config.table_files_statistics_cache.as_ref().map(Arc::clone); + + let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone); + + let file_metadata_cache = config + .file_metadata_cache + .as_ref() + .map(Arc::clone) + .unwrap_or_else(|| { + Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit)) + }); + + // the cache memory limit might have changed, ensure the limit is updated + file_metadata_cache.update_cache_limit(config.metadata_cache_limit); + + Ok(Arc::new(CacheManager { + file_statistic_cache, + list_files_cache, + file_metadata_cache, + })) } /// Get the cache of listing files statistics. @@ -102,12 +116,19 @@ impl CacheManager { } /// Get the file embedded metadata cache. - pub fn get_file_metadata_cache(&self) -> Option<Arc<dyn FileMetadataCache>> { - self.file_metadata_cache.clone() + pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> { + Arc::clone(&self.file_metadata_cache) + } + + /// Get the limit of the file embedded metadata cache. + pub fn get_metadata_cache_limit(&self) -> usize { + self.file_metadata_cache.cache_limit() } } -#[derive(Clone, Default)] +const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M + +#[derive(Clone)] pub struct CacheManagerConfig { /// Enable cache of files statistics when listing files. /// Avoid get same file statistics repeatedly in same datafusion session. @@ -124,6 +145,19 @@ pub struct CacheManagerConfig { /// data file (e.g., Parquet footer and page metadata). /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + /// Limit of the file-embedded metadata cache, in bytes. + pub metadata_cache_limit: usize, +} + +impl Default for CacheManagerConfig { + fn default() -> Self { + Self { + table_files_statistics_cache: Default::default(), + list_files_cache: Default::default(), + file_metadata_cache: Default::default(), + metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT, + } + } } impl CacheManagerConfig { @@ -147,4 +181,9 @@ impl CacheManagerConfig { self.file_metadata_cache = cache; self } + + pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self { + self.metadata_cache_limit = limit; + self + } } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 70d007bf5b..576076ca4e 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use crate::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use crate::cache::lru_queue::LruQueue; use crate::cache::CacheAccessor; use datafusion_common::Statistics; @@ -158,33 +159,165 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache { } } +/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct. +struct DefaultFilesMetadataCacheState { + lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>, + memory_limit: usize, + memory_used: usize, +} + +impl DefaultFilesMetadataCacheState { + fn new(memory_limit: usize) -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit, + memory_used: 0, + } + } + + /// Returns the respective entry from the cache, if it exists and the `size` and `last_modified` + /// properties from [`ObjectMeta`] match. + /// If the entry exists, it becomes the most recently used. + fn get(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> { + self.lru_queue + .get(&k.location) + .map(|(object_meta, metadata)| { + if object_meta.size != k.size + || object_meta.last_modified != k.last_modified + { + None + } else { + Some(Arc::clone(metadata)) + } + }) + .unwrap_or(None) + } + + /// Checks if the metadata is currently cached (entry exists and the `size` and `last_modified` + /// properties of [`ObjectMeta`] match). + /// The LRU queue is not updated. + fn contains_key(&self, k: &ObjectMeta) -> bool { + self.lru_queue + .peek(&k.location) + .map(|(object_meta, _)| { + object_meta.size == k.size && object_meta.last_modified == k.last_modified + }) + .unwrap_or(false) + } + + /// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required. + /// If the key is already in the cache, the previous metadata is returned. + /// If the size of the metadata is greater than the `memory_limit`, the value is not inserted. + fn put( + &mut self, + key: ObjectMeta, + value: Arc<dyn FileMetadata>, + ) -> Option<Arc<dyn FileMetadata>> { + let value_size = value.memory_size(); + + // no point in trying to add this value to the cache if it cannot fit entirely + if value_size > self.memory_limit { + return None; + } + + // if the key is already in the cache, the old value is removed + let old_value = self.lru_queue.put(key.location.clone(), (key, value)); + self.memory_used += value_size; + if let Some((_, ref old_metadata)) = old_value { + self.memory_used -= old_metadata.memory_size(); + } + + self.evict_entries(); + + old_value.map(|v| v.1) + } + + /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. + fn evict_entries(&mut self) { + while self.memory_used > self.memory_limit { + if let Some(removed) = self.lru_queue.pop() { + let metadata: Arc<dyn FileMetadata> = removed.1 .1; + self.memory_used -= metadata.memory_size(); + } else { + // cache is empty while memory_used > memory_limit, cannot happen + debug_assert!( + false, + "cache is empty while memory_used > memory_limit, cannot happen" + ); + return; + } + } + } + + /// Removes an entry from the cache and returns it, if it exists. + fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> { + if let Some((_, old_metadata)) = self.lru_queue.remove(&k.location) { + self.memory_used -= old_metadata.memory_size(); + Some(old_metadata) + } else { + None + } + } + + /// Returns the number of entries currently cached. + fn len(&self) -> usize { + self.lru_queue.len() + } + + /// Removes all entries from the cache. + fn clear(&mut self) { + self.lru_queue.clear(); + self.memory_used = 0; + } +} + /// Collected file embedded metadata cache. /// The metadata for some file is invalided when the file size or last modification time have been /// changed. +/// The `memory_limit` passed in the constructor controls the maximum size of the cache, which uses +/// a Least Recently Used eviction algorithm. /// Users should use the `get` and `put` methods. The `get_with_extra` and `put_with_extra` methods /// simply call `get` and `put`, respectively. -#[derive(Default)] pub struct DefaultFilesMetadataCache { - metadata: DashMap<Path, (ObjectMeta, Arc<dyn FileMetadata>)>, + // the state is wrapped in a Mutex to ensure the operations are atomic + state: Mutex<DefaultFilesMetadataCacheState>, +} + +impl DefaultFilesMetadataCache { + /// The `memory_limit` parameter controls the maximum size of the cache, in bytes, using a Least + /// Recently Used eviction algorithm. + pub fn new(memory_limit: usize) -> Self { + Self { + state: Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)), + } + } + + /// Returns the size of the cached memory, in bytes. + pub fn memory_used(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_used + } } -impl FileMetadataCache for DefaultFilesMetadataCache {} +impl FileMetadataCache for DefaultFilesMetadataCache { + fn cache_limit(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_limit + } + + fn update_cache_limit(&self, limit: usize) { + let mut state = self.state.lock().unwrap(); + state.memory_limit = limit; + state.evict_entries(); + } +} impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for DefaultFilesMetadataCache { type Extra = ObjectMeta; fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> { - self.metadata - .get(&k.location) - .map(|s| { - let (extra, metadata) = s.value(); - if extra.size != k.size || extra.last_modified != k.last_modified { - None - } else { - Some(Arc::clone(metadata)) - } - }) - .unwrap_or(None) + let mut state = self.state.lock().unwrap(); + state.get(k) } fn get_with_extra( @@ -200,9 +333,8 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for DefaultFilesMetadataCa key: &ObjectMeta, value: Arc<dyn FileMetadata>, ) -> Option<Arc<dyn FileMetadata>> { - self.metadata - .insert(key.location.clone(), (key.clone(), value)) - .map(|x| x.1) + let mut state = self.state.lock().unwrap(); + state.put(key.clone(), value) } fn put_with_extra( @@ -215,25 +347,23 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for DefaultFilesMetadataCa } fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> { - self.metadata.remove(&k.location).map(|x| x.1 .1) + let mut state = self.state.lock().unwrap(); + state.remove(k) } fn contains_key(&self, k: &ObjectMeta) -> bool { - self.metadata - .get(&k.location) - .map(|s| { - let (extra, _) = s.value(); - extra.size == k.size && extra.last_modified == k.last_modified - }) - .unwrap_or(false) + let state = self.state.lock().unwrap(); + state.contains_key(k) } fn len(&self) -> usize { - self.metadata.len() + let state = self.state.lock().unwrap(); + state.len() } fn clear(&self) { - self.metadata.clear(); + let mut state = self.state.lock().unwrap(); + state.clear(); } fn name(&self) -> String { @@ -245,7 +375,7 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for DefaultFilesMetadataCa mod tests { use std::sync::Arc; - use crate::cache::cache_manager::FileMetadata; + use crate::cache::cache_manager::{FileMetadata, FileMetadataCache}; use crate::cache::cache_unit::{ DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache, }; @@ -330,10 +460,14 @@ mod tests { fn as_any(&self) -> &dyn std::any::Any { self } + + fn memory_size(&self) -> usize { + self.metadata.len() + } } #[test] - fn test_file_metadata_cache() { + fn test_default_file_metadata_cache() { let object_meta = ObjectMeta { location: Path::from("test"), last_modified: DateTime::parse_from_rfc3339("2025-07-29T12:12:12+00:00") @@ -348,11 +482,11 @@ mod tests { metadata: "retrieved_metadata".to_owned(), }); - let mut cache = DefaultFilesMetadataCache::default(); + let mut cache = DefaultFilesMetadataCache::new(1024 * 1024); assert!(cache.get(&object_meta).is_none()); // put - cache.put(&object_meta, metadata); + cache.put(&object_meta, Arc::clone(&metadata)); // get and contains of a valid entry assert!(cache.contains_key(&object_meta)); @@ -387,5 +521,146 @@ mod tests { cache.remove(&object_meta); assert!(cache.get(&object_meta).is_none()); assert!(!cache.contains_key(&object_meta)); + + // len and clear + cache.put(&object_meta, Arc::clone(&metadata)); + cache.put(&object_meta2, metadata); + assert_eq!(cache.len(), 2); + cache.clear(); + assert_eq!(cache.len(), 0); + } + + fn generate_test_metadata_with_size( + path: &str, + size: usize, + ) -> (ObjectMeta, Arc<dyn FileMetadata>) { + let object_meta = ObjectMeta { + location: Path::from(path), + last_modified: chrono::Utc::now(), + size: size as u64, + e_tag: None, + version: None, + }; + let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata { + metadata: "a".repeat(size), + }); + + (object_meta, metadata) + } + + #[test] + fn test_default_file_metadata_cache_with_limit() { + let mut cache = DefaultFilesMetadataCache::new(1000); + let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100); + let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 500); + let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300); + + cache.put(&object_meta1, metadata1); + cache.put(&object_meta2, metadata2); + cache.put(&object_meta3, metadata3); + + // all entries will fit + assert_eq!(cache.len(), 3); + assert_eq!(cache.memory_used(), 900); + assert!(cache.contains_key(&object_meta1)); + assert!(cache.contains_key(&object_meta2)); + assert!(cache.contains_key(&object_meta3)); + + // add a new entry which will remove the least recently used ("1") + let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 200); + cache.put(&object_meta4, metadata4); + assert_eq!(cache.len(), 3); + assert_eq!(cache.memory_used(), 1000); + assert!(!cache.contains_key(&object_meta1)); + assert!(cache.contains_key(&object_meta4)); + + // get entry "2", which will move it to the top of the queue, and add a new one which will + // remove the new least recently used ("3") + cache.get(&object_meta2); + let (object_meta5, metadata5) = generate_test_metadata_with_size("5", 100); + cache.put(&object_meta5, metadata5); + assert_eq!(cache.len(), 3); + assert_eq!(cache.memory_used(), 800); + assert!(!cache.contains_key(&object_meta3)); + assert!(cache.contains_key(&object_meta5)); + + // new entry which will not be able to fit in the 1000 bytes allocated + let (object_meta6, metadata6) = generate_test_metadata_with_size("6", 1200); + cache.put(&object_meta6, metadata6); + assert_eq!(cache.len(), 3); + assert_eq!(cache.memory_used(), 800); + assert!(!cache.contains_key(&object_meta6)); + + // new entry which is able to fit without removing any entry + let (object_meta7, metadata7) = generate_test_metadata_with_size("7", 200); + cache.put(&object_meta7, metadata7); + assert_eq!(cache.len(), 4); + assert_eq!(cache.memory_used(), 1000); + assert!(cache.contains_key(&object_meta7)); + + // new entry which will remove all other entries + let (object_meta8, metadata8) = generate_test_metadata_with_size("8", 999); + cache.put(&object_meta8, metadata8); + assert_eq!(cache.len(), 1); + assert_eq!(cache.memory_used(), 999); + assert!(cache.contains_key(&object_meta8)); + + // when updating an entry, the previous ones are not unnecessarily removed + let (object_meta9, metadata9) = generate_test_metadata_with_size("9", 300); + let (object_meta10, metadata10) = generate_test_metadata_with_size("10", 200); + let (object_meta11_v1, metadata11_v1) = + generate_test_metadata_with_size("11", 400); + cache.put(&object_meta9, metadata9); + cache.put(&object_meta10, metadata10); + cache.put(&object_meta11_v1, metadata11_v1); + assert_eq!(cache.memory_used(), 900); + assert_eq!(cache.len(), 3); + let (object_meta11_v2, metadata11_v2) = + generate_test_metadata_with_size("11", 500); + cache.put(&object_meta11_v2, metadata11_v2); + assert_eq!(cache.memory_used(), 1000); + assert_eq!(cache.len(), 3); + assert!(cache.contains_key(&object_meta9)); + assert!(cache.contains_key(&object_meta10)); + assert!(cache.contains_key(&object_meta11_v2)); + assert!(!cache.contains_key(&object_meta11_v1)); + + // when updating an entry that now exceeds the limit, the LRU ("9") needs to be removed + let (object_meta11_v3, metadata11_v3) = + generate_test_metadata_with_size("11", 501); + cache.put(&object_meta11_v3, metadata11_v3); + assert_eq!(cache.memory_used(), 701); + assert_eq!(cache.len(), 2); + assert!(cache.contains_key(&object_meta10)); + assert!(cache.contains_key(&object_meta11_v3)); + assert!(!cache.contains_key(&object_meta11_v2)); + + // manually removing an entry that is not the LRU + cache.remove(&object_meta11_v3); + assert_eq!(cache.len(), 1); + assert_eq!(cache.memory_used(), 200); + assert!(cache.contains_key(&object_meta10)); + assert!(!cache.contains_key(&object_meta11_v3)); + + // clear + cache.clear(); + assert_eq!(cache.len(), 0); + assert_eq!(cache.memory_used(), 0); + + // resizing the cache should clear the extra entries + let (object_meta12, metadata12) = generate_test_metadata_with_size("12", 300); + let (object_meta13, metadata13) = generate_test_metadata_with_size("13", 200); + let (object_meta14, metadata14) = generate_test_metadata_with_size("14", 500); + cache.put(&object_meta12, metadata12); + cache.put(&object_meta13, metadata13); + cache.put(&object_meta14, metadata14); + assert_eq!(cache.len(), 3); + assert_eq!(cache.memory_used(), 1000); + cache.update_cache_limit(600); + assert_eq!(cache.len(), 1); + assert_eq!(cache.memory_used(), 500); + assert!(!cache.contains_key(&object_meta12)); + assert!(!cache.contains_key(&object_meta13)); + assert!(cache.contains_key(&object_meta14)); } } diff --git a/datafusion/execution/src/cache/lru_queue.rs b/datafusion/execution/src/cache/lru_queue.rs new file mode 100644 index 0000000000..3dc308dc3f --- /dev/null +++ b/datafusion/execution/src/cache/lru_queue.rs @@ -0,0 +1,537 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + collections::HashMap, + hash::Hash, + sync::{Arc, Weak}, +}; + +use parking_lot::Mutex; + +#[derive(Default)] +/// Provides a Least Recently Used queue with unbounded capacity. +/// +/// # Examples +/// +/// ``` +/// use datafusion_execution::cache::lru_queue::LruQueue; +/// +/// let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); +/// lru_queue.put(1, 10); +/// lru_queue.put(2, 20); +/// lru_queue.put(3, 30); +/// assert_eq!(lru_queue.get(&2), Some(&20)); +/// assert_eq!(lru_queue.pop(), Some((1, 10))); +/// assert_eq!(lru_queue.pop(), Some((3, 30))); +/// assert_eq!(lru_queue.pop(), Some((2, 20))); +/// assert_eq!(lru_queue.pop(), None); +/// ``` +pub struct LruQueue<K: Eq + Hash + Clone, V> { + data: LruData<K, V>, + queue: LruList<K>, +} + +/// Maps the key to the [`LruNode`] in queue and the value. +type LruData<K, V> = HashMap<K, (Arc<Mutex<LruNode<K>>>, V)>; + +#[derive(Default)] +/// Doubly-linked list that maintains the LRU order +struct LruList<K> { + head: Link<K>, + tail: Link<K>, +} + +/// Doubly-linked list node. +struct LruNode<K> { + key: K, + prev: Link<K>, + next: Link<K>, +} + +/// Weak pointer to a [`LruNode`], used to connect nodes in the doubly-linked list. +/// The strong reference is guaranteed to be stored in the `data` map of the [`LruQueue`]. +type Link<K> = Option<Weak<Mutex<LruNode<K>>>>; + +impl<K: Eq + Hash + Clone, V> LruQueue<K, V> { + pub fn new() -> Self { + Self { + data: HashMap::new(), + queue: LruList { + head: None, + tail: None, + }, + } + } + + /// Returns a reference to value mapped by `key`, if it exists. + /// If the entry exists, it becomes the most recently used. + pub fn get(&mut self, key: &K) -> Option<&V> { + if let Some(value) = self.remove(key) { + self.put(key.clone(), value); + } + self.data.get(key).map(|(_, value)| value) + } + + /// Returns a reference to value mapped by `key`, if it exists. + /// Does not affect the queue order. + pub fn peek(&self, key: &K) -> Option<&V> { + self.data.get(key).map(|(_, value)| value) + } + + /// Checks whether there is an entry with key `key` in the queue. + /// Does not affect the queue order. + pub fn contains_key(&self, key: &K) -> bool { + self.data.contains_key(key) + } + + /// Inserts an entry in the queue, becoming the most recently used. + /// If the entry already exists, returns the previous value. + pub fn put(&mut self, key: K, value: V) -> Option<V> { + let old_value = self.remove(&key); + + let node = Arc::new(Mutex::new(LruNode { + key: key.clone(), + prev: None, + next: None, + })); + + match self.queue.head { + // queue is not empty + Some(ref old_head) => { + old_head + .upgrade() + .expect("value has been unexpectedly dropped") + .lock() + .prev = Some(Arc::downgrade(&node)); + node.lock().next = Some(Weak::clone(old_head)); + self.queue.head = Some(Arc::downgrade(&node)); + } + // queue is empty + _ => { + self.queue.head = Some(Arc::downgrade(&node)); + self.queue.tail = Some(Arc::downgrade(&node)); + } + } + + self.data.insert(key, (node, value)); + + old_value + } + + /// Removes and returns the least recently used value. + /// Returns `None` if the queue is empty. + pub fn pop(&mut self) -> Option<(K, V)> { + let key_to_remove = self.queue.tail.as_ref().map(|n| { + n.upgrade() + .expect("value has been unexpectedly dropped") + .lock() + .key + .clone() + }); + if let Some(k) = key_to_remove { + let value = self.remove(&k).unwrap(); // confirmed above that the entry exists + Some((k, value)) + } else { + None + } + } + + /// Removes a specific entry from the queue, if it exists. + pub fn remove(&mut self, key: &K) -> Option<V> { + if let Some((old_node, old_value)) = self.data.remove(key) { + let LruNode { key: _, prev, next } = &*old_node.lock(); + match (prev, next) { + // single node in the queue + (None, None) => { + self.queue.head = None; + self.queue.tail = None; + } + // removed the head node + (None, Some(n)) => { + let n_strong = + n.upgrade().expect("value has been unexpectedly dropped"); + n_strong.lock().prev = None; + self.queue.head = Some(Weak::clone(n)); + } + // removed the tail node + (Some(p), None) => { + let p_strong = + p.upgrade().expect("value has been unexpectedly dropped"); + p_strong.lock().next = None; + self.queue.tail = Some(Weak::clone(p)); + } + // removed a middle node + (Some(p), Some(n)) => { + let n_strong = + n.upgrade().expect("value has been unexpectedly dropped"); + let p_strong = + p.upgrade().expect("value has been unexpectedly dropped"); + n_strong.lock().prev = Some(Weak::clone(p)); + p_strong.lock().next = Some(Weak::clone(n)); + } + }; + Some(old_value) + } else { + None + } + } + + /// Returns the number of entries in the queue. + pub fn len(&self) -> usize { + self.data.len() + } + + /// Checks whether the queue has no items. + pub fn is_empty(&self) -> bool { + self.data.is_empty() + } + + // Removes all entries from the queue. + pub fn clear(&mut self) { + self.queue.head = None; + self.queue.tail = None; + self.data.clear(); + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use rand::seq::IndexedRandom; + + use crate::cache::lru_queue::LruQueue; + + #[test] + fn test_get() { + let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); + + // value does not exist + assert_eq!(lru_queue.get(&1), None); + + // value exists + lru_queue.put(1, 10); + assert_eq!(lru_queue.get(&1), Some(&10)); + assert_eq!(lru_queue.get(&1), Some(&10)); + + // value is removed + lru_queue.remove(&1); + assert_eq!(lru_queue.get(&1), None); + } + + #[test] + fn test_peek() { + let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); + + // value does not exist + assert_eq!(lru_queue.peek(&1), None); + + // value exists + lru_queue.put(1, 10); + assert_eq!(lru_queue.peek(&1), Some(&10)); + assert_eq!(lru_queue.peek(&1), Some(&10)); + + // value is removed + lru_queue.remove(&1); + assert_eq!(lru_queue.peek(&1), None); + } + + #[test] + fn test_put() { + let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); + + // no previous value + assert_eq!(lru_queue.put(1, 10), None); + + // update, the previous value is returned + assert_eq!(lru_queue.put(1, 11), Some(10)); + assert_eq!(lru_queue.put(1, 12), Some(11)); + assert_eq!(lru_queue.put(1, 13), Some(12)); + } + + #[test] + fn test_remove() { + let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); + + // value does not exist + assert_eq!(lru_queue.remove(&1), None); + + // value exists and is returned + lru_queue.put(1, 10); + assert_eq!(lru_queue.remove(&1), Some(10)); + + // value does not exist + assert_eq!(lru_queue.remove(&1), None); + } + + #[test] + fn test_contains_key() { + let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); + + // value does not exist + assert!(!lru_queue.contains_key(&1)); + + // value exists + lru_queue.put(1, 10); + assert!(lru_queue.contains_key(&1)); + + // value is removed + lru_queue.remove(&1); + assert!(!lru_queue.contains_key(&1)); + } + + #[test] + fn test_len() { + let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); + + // empty + assert_eq!(lru_queue.len(), 0); + + // puts + lru_queue.put(1, 10); + assert_eq!(lru_queue.len(), 1); + lru_queue.put(2, 20); + assert_eq!(lru_queue.len(), 2); + lru_queue.put(3, 30); + assert_eq!(lru_queue.len(), 3); + lru_queue.put(1, 11); + lru_queue.put(3, 31); + assert_eq!(lru_queue.len(), 3); + + // removes + lru_queue.remove(&1); + assert_eq!(lru_queue.len(), 2); + lru_queue.remove(&1); + assert_eq!(lru_queue.len(), 2); + lru_queue.remove(&4); + assert_eq!(lru_queue.len(), 2); + lru_queue.remove(&3); + assert_eq!(lru_queue.len(), 1); + lru_queue.remove(&2); + assert_eq!(lru_queue.len(), 0); + lru_queue.remove(&2); + assert_eq!(lru_queue.len(), 0); + + // clear + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + assert_eq!(lru_queue.len(), 3); + lru_queue.clear(); + assert_eq!(lru_queue.len(), 0); + } + + #[test] + fn test_is_empty() { + let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); + + // empty + assert!(lru_queue.is_empty()); + + // puts + lru_queue.put(1, 10); + assert!(!lru_queue.is_empty()); + lru_queue.put(2, 20); + assert!(!lru_queue.is_empty()); + + // removes + lru_queue.remove(&1); + assert!(!lru_queue.is_empty()); + lru_queue.remove(&1); + assert!(!lru_queue.is_empty()); + lru_queue.remove(&2); + assert!(lru_queue.is_empty()); + + // clear + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + assert!(!lru_queue.is_empty()); + lru_queue.clear(); + assert!(lru_queue.is_empty()); + } + + #[test] + fn test_clear() { + let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); + + // empty + lru_queue.clear(); + + // filled + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + assert_eq!(lru_queue.get(&1), Some(&10)); + assert_eq!(lru_queue.get(&2), Some(&20)); + assert_eq!(lru_queue.get(&3), Some(&30)); + lru_queue.clear(); + assert_eq!(lru_queue.get(&1), None); + assert_eq!(lru_queue.get(&2), None); + assert_eq!(lru_queue.get(&3), None); + assert_eq!(lru_queue.len(), 0); + } + + #[test] + fn test_pop() { + let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); + + // empty queue + assert_eq!(lru_queue.pop(), None); + + // simplest case + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), None); + + // 'get' changes the order + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.get(&2); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), None); + + // multiple 'gets' + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.get(&2); + lru_queue.get(&3); + lru_queue.get(&1); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), None); + + // 'peak' does not change the order + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.peek(&2); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), None); + + // 'contains' does not change the order + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.contains_key(&2); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), None); + + // 'put' on the same key promotes it + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.put(2, 21); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), Some((2, 21))); + assert_eq!(lru_queue.pop(), None); + + // multiple 'puts' + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.put(2, 21); + lru_queue.put(3, 31); + lru_queue.put(1, 11); + assert_eq!(lru_queue.pop(), Some((2, 21))); + assert_eq!(lru_queue.pop(), Some((3, 31))); + assert_eq!(lru_queue.pop(), Some((1, 11))); + assert_eq!(lru_queue.pop(), None); + + // 'remove' an element in the middle of the queue + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.remove(&2); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), None); + + // 'remove' the LRU + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.remove(&1); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), None); + + // 'remove' the MRU + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.remove(&3); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), None); + } + + #[test] + /// Fuzzy test using an hashmap as the base to check the methods. + fn test_fuzzy() { + let mut lru_queue: LruQueue<i32, i32> = LruQueue::new(); + let mut map: HashMap<i32, i32> = HashMap::new(); + let max_keys = 1_000; + let methods = ["get", "put", "remove", "pop", "contains", "len"]; + let mut rng = rand::rng(); + + for i in 0..1_000_000 { + match *methods.choose(&mut rng).unwrap() { + "get" => { + assert_eq!(lru_queue.get(&(i % max_keys)), map.get(&(i % max_keys))) + } + "put" => assert_eq!( + lru_queue.put(i % max_keys, i), + map.insert(i % max_keys, i) + ), + "remove" => assert_eq!( + lru_queue.remove(&(i % max_keys)), + map.remove(&(i % max_keys)) + ), + "pop" => { + let removed = lru_queue.pop(); + if let Some((k, v)) = removed { + assert_eq!(Some(v), map.remove(&k)) + } + } + "contains" => { + assert_eq!( + lru_queue.contains_key(&(i % max_keys)), + map.contains_key(&(i % max_keys)) + ) + } + "len" => assert_eq!(lru_queue.len(), map.len()), + _ => unreachable!(), + } + } + } +} diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 4271bebd0b..b1857c94fa 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -17,6 +17,7 @@ pub mod cache_manager; pub mod cache_unit; +pub mod lru_queue; /// The cache accessor, users usually working on this interface while manipulating caches. /// This interface does not get `mut` references and thus has to handle its own diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index bb9025391f..fc26f997a2 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -268,6 +268,12 @@ impl RuntimeEnvBuilder { self.with_disk_manager_builder(builder.with_max_temp_directory_size(size)) } + /// Specify the limit of the file-embedded metadata cache, in bytes. + pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self { + self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit); + self + } + /// Build a RuntimeEnv pub fn build(self) -> Result<RuntimeEnv> { let Self { @@ -305,7 +311,10 @@ impl RuntimeEnvBuilder { .cache_manager .get_file_statistic_cache(), list_files_cache: runtime_env.cache_manager.get_list_files_cache(), - file_metadata_cache: runtime_env.cache_manager.get_file_metadata_cache(), + file_metadata_cache: Some( + runtime_env.cache_manager.get_file_metadata_cache(), + ), + metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(), }; Self { @@ -337,6 +346,11 @@ impl RuntimeEnvBuilder { key: "datafusion.runtime.temp_directory".to_string(), value: None, // Default is system-dependent description: "The path to the temporary file directory.", + }, + ConfigEntry { + key: "datafusion.runtime.metadata_cache_limit".to_string(), + value: Some("50M".to_owned()), + description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", } ] } diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 3fc8e98437..9190452c0a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -187,11 +187,12 @@ SET datafusion.runtime.memory_limit = '2G'; The following runtime configuration settings are available: -| key | default | description | -| ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | +| key | default | description | +| ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.metadata_cache_limit | 50M | Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | # Tuning Guide --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org