This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a9e6d4be4a feat: Limit the memory used in the file metadata cache 
(#17031)
a9e6d4be4a is described below

commit a9e6d4be4a63c2180814a627d3b45cd0adf5b61c
Author: Nuno Faria <nunofpfa...@gmail.com>
AuthorDate: Thu Aug 7 11:29:12 2025 +0100

    feat: Limit the memory used in the file metadata cache (#17031)
    
    * feat: Limit the memory used in the file metadata cache
    
    * Implement custom LRU queue
    
    * Use parking_lot::Mutex
    
    * Add is_empty unit test
    
    * Rename config to metadata_cache_limit, Set limit to 50M
    
    * Remove Option from the metadata memory limit
    
    * Add license to lru_queue
    
    * Update datafusion/execution/src/cache/cache_unit.rs
    
    Removes the previous unreachable!().
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
    
    * Fix syntax error
    
    * Fix clippy error
    
    ---------
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 datafusion/core/src/execution/context/mod.rs     |   4 +
 datafusion/core/tests/sql/runtime_config.rs      |  33 ++
 datafusion/datasource-parquet/src/file_format.rs |  19 +-
 datafusion/datasource-parquet/src/reader.rs      |   4 +
 datafusion/execution/src/cache/cache_manager.rs  |  79 +++-
 datafusion/execution/src/cache/cache_unit.rs     | 339 ++++++++++++--
 datafusion/execution/src/cache/lru_queue.rs      | 537 +++++++++++++++++++++++
 datafusion/execution/src/cache/mod.rs            |   1 +
 datafusion/execution/src/runtime_env.rs          |  16 +-
 docs/source/user-guide/configs.md                |  11 +-
 10 files changed, 974 insertions(+), 69 deletions(-)

diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index 9eb1bccc40..bbe33c5179 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -1068,6 +1068,10 @@ impl SessionContext {
                 builder.with_max_temp_directory_size(directory_size as u64)
             }
             "temp_directory" => builder.with_temp_file_path(value),
+            "metadata_cache_limit" => {
+                let limit = Self::parse_memory_limit(value)?;
+                builder.with_metadata_cache_limit(limit)
+            }
             _ => {
                 return Err(DataFusionError::Plan(format!(
                     "Unknown runtime configuration: {variable}"
diff --git a/datafusion/core/tests/sql/runtime_config.rs 
b/datafusion/core/tests/sql/runtime_config.rs
index b05c36e335..9627d7bccd 100644
--- a/datafusion/core/tests/sql/runtime_config.rs
+++ b/datafusion/core/tests/sql/runtime_config.rs
@@ -200,6 +200,39 @@ async fn test_max_temp_directory_size_enforcement() {
     );
 }
 
+#[tokio::test]
+async fn test_test_metadata_cache_limit() {
+    let ctx = SessionContext::new();
+
+    let update_limit = async |ctx: &SessionContext, limit: &str| {
+        ctx.sql(
+            format!("SET datafusion.runtime.metadata_cache_limit = 
'{limit}'").as_str(),
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+    };
+
+    let get_limit = |ctx: &SessionContext| -> usize {
+        ctx.task_ctx()
+            .runtime_env()
+            .cache_manager
+            .get_file_metadata_cache()
+            .cache_limit()
+    };
+
+    update_limit(&ctx, "100M").await;
+    assert_eq!(get_limit(&ctx), 100 * 1024 * 1024);
+
+    update_limit(&ctx, "2G").await;
+    assert_eq!(get_limit(&ctx), 2 * 1024 * 1024 * 1024);
+
+    update_limit(&ctx, "123K").await;
+    assert_eq!(get_limit(&ctx), 123 * 1024);
+}
+
 #[tokio::test]
 async fn test_unknown_runtime_config() {
     let ctx = SessionContext::new();
diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index 7210cc09a0..d86ca630c8 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -449,17 +449,14 @@ impl FileFormat for ParquetFormat {
 
         // Use the CachedParquetFileReaderFactory when metadata caching is 
enabled
         if self.options.global.cache_metadata {
-            if let Some(metadata_cache) =
-                state.runtime_env().cache_manager.get_file_metadata_cache()
-            {
-                let store = state
-                    .runtime_env()
-                    .object_store(conf.object_store_url.clone())?;
-                let cached_parquet_read_factory =
-                    Arc::new(CachedParquetFileReaderFactory::new(store, 
metadata_cache));
-                source =
-                    
source.with_parquet_file_reader_factory(cached_parquet_read_factory);
-            }
+            let metadata_cache =
+                state.runtime_env().cache_manager.get_file_metadata_cache();
+            let store = state
+                .runtime_env()
+                .object_store(conf.object_store_url.clone())?;
+            let cached_parquet_read_factory =
+                Arc::new(CachedParquetFileReaderFactory::new(store, 
metadata_cache));
+            source = 
source.with_parquet_file_reader_factory(cached_parquet_read_factory);
         }
 
         if let Some(metadata_size_hint) = metadata_size_hint {
diff --git a/datafusion/datasource-parquet/src/reader.rs 
b/datafusion/datasource-parquet/src/reader.rs
index 6ad9428770..648ed7c0bc 100644
--- a/datafusion/datasource-parquet/src/reader.rs
+++ b/datafusion/datasource-parquet/src/reader.rs
@@ -288,4 +288,8 @@ impl FileMetadata for CachedParquetMetaData {
     fn as_any(&self) -> &dyn Any {
         self
     }
+
+    fn memory_size(&self) -> usize {
+        self.0.memory_size()
+    }
 }
diff --git a/datafusion/execution/src/cache/cache_manager.rs 
b/datafusion/execution/src/cache/cache_manager.rs
index 37f1baa17f..a91e4f8458 100644
--- a/datafusion/execution/src/cache/cache_manager.rs
+++ b/datafusion/execution/src/cache/cache_manager.rs
@@ -39,12 +39,20 @@ pub trait FileMetadata: Any + Send + Sync {
     /// Returns the file metadata as [`Any`] so that it can be downcasted to a 
specific
     /// implementation.
     fn as_any(&self) -> &dyn Any;
+
+    /// Returns the size of the metadata in bytes.
+    fn memory_size(&self) -> usize;
 }
 
 /// Cache to store file-embedded metadata.
 pub trait FileMetadataCache:
     CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
 {
+    // Returns the cache's memory limit in bytes.
+    fn cache_limit(&self) -> usize;
+
+    // Updates the cache with a new memory limit in bytes.
+    fn update_cache_limit(&self, limit: usize);
 }
 
 impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
@@ -65,30 +73,36 @@ impl Debug for dyn FileMetadataCache {
     }
 }
 
-#[derive(Default, Debug)]
+#[derive(Debug)]
 pub struct CacheManager {
     file_statistic_cache: Option<FileStatisticsCache>,
     list_files_cache: Option<ListFilesCache>,
-    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
+    file_metadata_cache: Arc<dyn FileMetadataCache>,
 }
 
 impl CacheManager {
     pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
-        let mut manager = CacheManager::default();
-        if let Some(cc) = &config.table_files_statistics_cache {
-            manager.file_statistic_cache = Some(Arc::clone(cc))
-        }
-        if let Some(lc) = &config.list_files_cache {
-            manager.list_files_cache = Some(Arc::clone(lc))
-        }
-        if let Some(mc) = &config.file_metadata_cache {
-            manager.file_metadata_cache = Some(Arc::clone(mc));
-        } else {
-            manager.file_metadata_cache =
-                Some(Arc::new(DefaultFilesMetadataCache::default()));
-        }
-
-        Ok(Arc::new(manager))
+        let file_statistic_cache =
+            config.table_files_statistics_cache.as_ref().map(Arc::clone);
+
+        let list_files_cache = 
config.list_files_cache.as_ref().map(Arc::clone);
+
+        let file_metadata_cache = config
+            .file_metadata_cache
+            .as_ref()
+            .map(Arc::clone)
+            .unwrap_or_else(|| {
+                
Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
+            });
+
+        // the cache memory limit might have changed, ensure the limit is 
updated
+        file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
+
+        Ok(Arc::new(CacheManager {
+            file_statistic_cache,
+            list_files_cache,
+            file_metadata_cache,
+        }))
     }
 
     /// Get the cache of listing files statistics.
@@ -102,12 +116,19 @@ impl CacheManager {
     }
 
     /// Get the file embedded metadata cache.
-    pub fn get_file_metadata_cache(&self) -> Option<Arc<dyn 
FileMetadataCache>> {
-        self.file_metadata_cache.clone()
+    pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
+        Arc::clone(&self.file_metadata_cache)
+    }
+
+    /// Get the limit of the file embedded metadata cache.
+    pub fn get_metadata_cache_limit(&self) -> usize {
+        self.file_metadata_cache.cache_limit()
     }
 }
 
-#[derive(Clone, Default)]
+const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
+
+#[derive(Clone)]
 pub struct CacheManagerConfig {
     /// Enable cache of files statistics when listing files.
     /// Avoid get same file statistics repeatedly in same datafusion session.
@@ -124,6 +145,19 @@ pub struct CacheManagerConfig {
     /// data file (e.g., Parquet footer and page metadata).
     /// If not provided, the [`CacheManager`] will create a 
[`DefaultFilesMetadataCache`].
     pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
+    /// Limit of the file-embedded metadata cache, in bytes.
+    pub metadata_cache_limit: usize,
+}
+
+impl Default for CacheManagerConfig {
+    fn default() -> Self {
+        Self {
+            table_files_statistics_cache: Default::default(),
+            list_files_cache: Default::default(),
+            file_metadata_cache: Default::default(),
+            metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
+        }
+    }
 }
 
 impl CacheManagerConfig {
@@ -147,4 +181,9 @@ impl CacheManagerConfig {
         self.file_metadata_cache = cache;
         self
     }
+
+    pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
+        self.metadata_cache_limit = limit;
+        self
+    }
 }
diff --git a/datafusion/execution/src/cache/cache_unit.rs 
b/datafusion/execution/src/cache/cache_unit.rs
index 70d007bf5b..576076ca4e 100644
--- a/datafusion/execution/src/cache/cache_unit.rs
+++ b/datafusion/execution/src/cache/cache_unit.rs
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 use crate::cache::cache_manager::{FileMetadata, FileMetadataCache};
+use crate::cache::lru_queue::LruQueue;
 use crate::cache::CacheAccessor;
 
 use datafusion_common::Statistics;
@@ -158,33 +159,165 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
     }
 }
 
+/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct.
+struct DefaultFilesMetadataCacheState {
+    lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
+    memory_limit: usize,
+    memory_used: usize,
+}
+
+impl DefaultFilesMetadataCacheState {
+    fn new(memory_limit: usize) -> Self {
+        Self {
+            lru_queue: LruQueue::new(),
+            memory_limit,
+            memory_used: 0,
+        }
+    }
+
+    /// Returns the respective entry from the cache, if it exists and the 
`size` and `last_modified`
+    /// properties from [`ObjectMeta`] match.
+    /// If the entry exists, it becomes the most recently used.
+    fn get(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
+        self.lru_queue
+            .get(&k.location)
+            .map(|(object_meta, metadata)| {
+                if object_meta.size != k.size
+                    || object_meta.last_modified != k.last_modified
+                {
+                    None
+                } else {
+                    Some(Arc::clone(metadata))
+                }
+            })
+            .unwrap_or(None)
+    }
+
+    /// Checks if the metadata is currently cached (entry exists and the 
`size` and `last_modified`
+    /// properties of [`ObjectMeta`] match).
+    /// The LRU queue is not updated.
+    fn contains_key(&self, k: &ObjectMeta) -> bool {
+        self.lru_queue
+            .peek(&k.location)
+            .map(|(object_meta, _)| {
+                object_meta.size == k.size && object_meta.last_modified == 
k.last_modified
+            })
+            .unwrap_or(false)
+    }
+
+    /// Adds a new key-value pair to cache, meaning LRU entries might be 
evicted if required.
+    /// If the key is already in the cache, the previous metadata is returned.
+    /// If the size of the metadata is greater than the `memory_limit`, the 
value is not inserted.
+    fn put(
+        &mut self,
+        key: ObjectMeta,
+        value: Arc<dyn FileMetadata>,
+    ) -> Option<Arc<dyn FileMetadata>> {
+        let value_size = value.memory_size();
+
+        // no point in trying to add this value to the cache if it cannot fit 
entirely
+        if value_size > self.memory_limit {
+            return None;
+        }
+
+        // if the key is already in the cache, the old value is removed
+        let old_value = self.lru_queue.put(key.location.clone(), (key, value));
+        self.memory_used += value_size;
+        if let Some((_, ref old_metadata)) = old_value {
+            self.memory_used -= old_metadata.memory_size();
+        }
+
+        self.evict_entries();
+
+        old_value.map(|v| v.1)
+    }
+
+    /// Evicts entries from the LRU cache until `memory_used` is lower than 
`memory_limit`.
+    fn evict_entries(&mut self) {
+        while self.memory_used > self.memory_limit {
+            if let Some(removed) = self.lru_queue.pop() {
+                let metadata: Arc<dyn FileMetadata> = removed.1 .1;
+                self.memory_used -= metadata.memory_size();
+            } else {
+                // cache is empty while memory_used > memory_limit, cannot 
happen
+                debug_assert!(
+                    false,
+                    "cache is empty while memory_used > memory_limit, cannot 
happen"
+                );
+                return;
+            }
+        }
+    }
+
+    /// Removes an entry from the cache and returns it, if it exists.
+    fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
+        if let Some((_, old_metadata)) = self.lru_queue.remove(&k.location) {
+            self.memory_used -= old_metadata.memory_size();
+            Some(old_metadata)
+        } else {
+            None
+        }
+    }
+
+    /// Returns the number of entries currently cached.
+    fn len(&self) -> usize {
+        self.lru_queue.len()
+    }
+
+    /// Removes all entries from the cache.
+    fn clear(&mut self) {
+        self.lru_queue.clear();
+        self.memory_used = 0;
+    }
+}
+
 /// Collected file embedded metadata cache.
 /// The metadata for some file is invalided when the file size or last 
modification time have been
 /// changed.
+/// The `memory_limit` passed in the constructor controls the maximum size of 
the cache, which uses
+/// a Least Recently Used eviction algorithm.
 /// Users should use the `get` and `put` methods. The `get_with_extra` and 
`put_with_extra` methods
 /// simply call `get` and `put`, respectively.
-#[derive(Default)]
 pub struct DefaultFilesMetadataCache {
-    metadata: DashMap<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
+    // the state is wrapped in a Mutex to ensure the operations are atomic
+    state: Mutex<DefaultFilesMetadataCacheState>,
+}
+
+impl DefaultFilesMetadataCache {
+    /// The `memory_limit` parameter controls the maximum size of the cache, 
in bytes, using a Least
+    /// Recently Used eviction algorithm.
+    pub fn new(memory_limit: usize) -> Self {
+        Self {
+            state: 
Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)),
+        }
+    }
+
+    /// Returns the size of the cached memory, in bytes.
+    pub fn memory_used(&self) -> usize {
+        let state = self.state.lock().unwrap();
+        state.memory_used
+    }
 }
 
-impl FileMetadataCache for DefaultFilesMetadataCache {}
+impl FileMetadataCache for DefaultFilesMetadataCache {
+    fn cache_limit(&self) -> usize {
+        let state = self.state.lock().unwrap();
+        state.memory_limit
+    }
+
+    fn update_cache_limit(&self, limit: usize) {
+        let mut state = self.state.lock().unwrap();
+        state.memory_limit = limit;
+        state.evict_entries();
+    }
+}
 
 impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for 
DefaultFilesMetadataCache {
     type Extra = ObjectMeta;
 
     fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
-        self.metadata
-            .get(&k.location)
-            .map(|s| {
-                let (extra, metadata) = s.value();
-                if extra.size != k.size || extra.last_modified != 
k.last_modified {
-                    None
-                } else {
-                    Some(Arc::clone(metadata))
-                }
-            })
-            .unwrap_or(None)
+        let mut state = self.state.lock().unwrap();
+        state.get(k)
     }
 
     fn get_with_extra(
@@ -200,9 +333,8 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for 
DefaultFilesMetadataCa
         key: &ObjectMeta,
         value: Arc<dyn FileMetadata>,
     ) -> Option<Arc<dyn FileMetadata>> {
-        self.metadata
-            .insert(key.location.clone(), (key.clone(), value))
-            .map(|x| x.1)
+        let mut state = self.state.lock().unwrap();
+        state.put(key.clone(), value)
     }
 
     fn put_with_extra(
@@ -215,25 +347,23 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for 
DefaultFilesMetadataCa
     }
 
     fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
-        self.metadata.remove(&k.location).map(|x| x.1 .1)
+        let mut state = self.state.lock().unwrap();
+        state.remove(k)
     }
 
     fn contains_key(&self, k: &ObjectMeta) -> bool {
-        self.metadata
-            .get(&k.location)
-            .map(|s| {
-                let (extra, _) = s.value();
-                extra.size == k.size && extra.last_modified == k.last_modified
-            })
-            .unwrap_or(false)
+        let state = self.state.lock().unwrap();
+        state.contains_key(k)
     }
 
     fn len(&self) -> usize {
-        self.metadata.len()
+        let state = self.state.lock().unwrap();
+        state.len()
     }
 
     fn clear(&self) {
-        self.metadata.clear();
+        let mut state = self.state.lock().unwrap();
+        state.clear();
     }
 
     fn name(&self) -> String {
@@ -245,7 +375,7 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for 
DefaultFilesMetadataCa
 mod tests {
     use std::sync::Arc;
 
-    use crate::cache::cache_manager::FileMetadata;
+    use crate::cache::cache_manager::{FileMetadata, FileMetadataCache};
     use crate::cache::cache_unit::{
         DefaultFileStatisticsCache, DefaultFilesMetadataCache, 
DefaultListFilesCache,
     };
@@ -330,10 +460,14 @@ mod tests {
         fn as_any(&self) -> &dyn std::any::Any {
             self
         }
+
+        fn memory_size(&self) -> usize {
+            self.metadata.len()
+        }
     }
 
     #[test]
-    fn test_file_metadata_cache() {
+    fn test_default_file_metadata_cache() {
         let object_meta = ObjectMeta {
             location: Path::from("test"),
             last_modified: 
DateTime::parse_from_rfc3339("2025-07-29T12:12:12+00:00")
@@ -348,11 +482,11 @@ mod tests {
             metadata: "retrieved_metadata".to_owned(),
         });
 
-        let mut cache = DefaultFilesMetadataCache::default();
+        let mut cache = DefaultFilesMetadataCache::new(1024 * 1024);
         assert!(cache.get(&object_meta).is_none());
 
         // put
-        cache.put(&object_meta, metadata);
+        cache.put(&object_meta, Arc::clone(&metadata));
 
         // get and contains of a valid entry
         assert!(cache.contains_key(&object_meta));
@@ -387,5 +521,146 @@ mod tests {
         cache.remove(&object_meta);
         assert!(cache.get(&object_meta).is_none());
         assert!(!cache.contains_key(&object_meta));
+
+        // len and clear
+        cache.put(&object_meta, Arc::clone(&metadata));
+        cache.put(&object_meta2, metadata);
+        assert_eq!(cache.len(), 2);
+        cache.clear();
+        assert_eq!(cache.len(), 0);
+    }
+
+    fn generate_test_metadata_with_size(
+        path: &str,
+        size: usize,
+    ) -> (ObjectMeta, Arc<dyn FileMetadata>) {
+        let object_meta = ObjectMeta {
+            location: Path::from(path),
+            last_modified: chrono::Utc::now(),
+            size: size as u64,
+            e_tag: None,
+            version: None,
+        };
+        let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
+            metadata: "a".repeat(size),
+        });
+
+        (object_meta, metadata)
+    }
+
+    #[test]
+    fn test_default_file_metadata_cache_with_limit() {
+        let mut cache = DefaultFilesMetadataCache::new(1000);
+        let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 
100);
+        let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 
500);
+        let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 
300);
+
+        cache.put(&object_meta1, metadata1);
+        cache.put(&object_meta2, metadata2);
+        cache.put(&object_meta3, metadata3);
+
+        // all entries will fit
+        assert_eq!(cache.len(), 3);
+        assert_eq!(cache.memory_used(), 900);
+        assert!(cache.contains_key(&object_meta1));
+        assert!(cache.contains_key(&object_meta2));
+        assert!(cache.contains_key(&object_meta3));
+
+        // add a new entry which will remove the least recently used ("1")
+        let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 
200);
+        cache.put(&object_meta4, metadata4);
+        assert_eq!(cache.len(), 3);
+        assert_eq!(cache.memory_used(), 1000);
+        assert!(!cache.contains_key(&object_meta1));
+        assert!(cache.contains_key(&object_meta4));
+
+        // get entry "2", which will move it to the top of the queue, and add 
a new one which will
+        // remove the new least recently used ("3")
+        cache.get(&object_meta2);
+        let (object_meta5, metadata5) = generate_test_metadata_with_size("5", 
100);
+        cache.put(&object_meta5, metadata5);
+        assert_eq!(cache.len(), 3);
+        assert_eq!(cache.memory_used(), 800);
+        assert!(!cache.contains_key(&object_meta3));
+        assert!(cache.contains_key(&object_meta5));
+
+        // new entry which will not be able to fit in the 1000 bytes allocated
+        let (object_meta6, metadata6) = generate_test_metadata_with_size("6", 
1200);
+        cache.put(&object_meta6, metadata6);
+        assert_eq!(cache.len(), 3);
+        assert_eq!(cache.memory_used(), 800);
+        assert!(!cache.contains_key(&object_meta6));
+
+        // new entry which is able to fit without removing any entry
+        let (object_meta7, metadata7) = generate_test_metadata_with_size("7", 
200);
+        cache.put(&object_meta7, metadata7);
+        assert_eq!(cache.len(), 4);
+        assert_eq!(cache.memory_used(), 1000);
+        assert!(cache.contains_key(&object_meta7));
+
+        // new entry which will remove all other entries
+        let (object_meta8, metadata8) = generate_test_metadata_with_size("8", 
999);
+        cache.put(&object_meta8, metadata8);
+        assert_eq!(cache.len(), 1);
+        assert_eq!(cache.memory_used(), 999);
+        assert!(cache.contains_key(&object_meta8));
+
+        // when updating an entry, the previous ones are not unnecessarily 
removed
+        let (object_meta9, metadata9) = generate_test_metadata_with_size("9", 
300);
+        let (object_meta10, metadata10) = 
generate_test_metadata_with_size("10", 200);
+        let (object_meta11_v1, metadata11_v1) =
+            generate_test_metadata_with_size("11", 400);
+        cache.put(&object_meta9, metadata9);
+        cache.put(&object_meta10, metadata10);
+        cache.put(&object_meta11_v1, metadata11_v1);
+        assert_eq!(cache.memory_used(), 900);
+        assert_eq!(cache.len(), 3);
+        let (object_meta11_v2, metadata11_v2) =
+            generate_test_metadata_with_size("11", 500);
+        cache.put(&object_meta11_v2, metadata11_v2);
+        assert_eq!(cache.memory_used(), 1000);
+        assert_eq!(cache.len(), 3);
+        assert!(cache.contains_key(&object_meta9));
+        assert!(cache.contains_key(&object_meta10));
+        assert!(cache.contains_key(&object_meta11_v2));
+        assert!(!cache.contains_key(&object_meta11_v1));
+
+        // when updating an entry that now exceeds the limit, the LRU ("9") 
needs to be removed
+        let (object_meta11_v3, metadata11_v3) =
+            generate_test_metadata_with_size("11", 501);
+        cache.put(&object_meta11_v3, metadata11_v3);
+        assert_eq!(cache.memory_used(), 701);
+        assert_eq!(cache.len(), 2);
+        assert!(cache.contains_key(&object_meta10));
+        assert!(cache.contains_key(&object_meta11_v3));
+        assert!(!cache.contains_key(&object_meta11_v2));
+
+        // manually removing an entry that is not the LRU
+        cache.remove(&object_meta11_v3);
+        assert_eq!(cache.len(), 1);
+        assert_eq!(cache.memory_used(), 200);
+        assert!(cache.contains_key(&object_meta10));
+        assert!(!cache.contains_key(&object_meta11_v3));
+
+        // clear
+        cache.clear();
+        assert_eq!(cache.len(), 0);
+        assert_eq!(cache.memory_used(), 0);
+
+        // resizing the cache should clear the extra entries
+        let (object_meta12, metadata12) = 
generate_test_metadata_with_size("12", 300);
+        let (object_meta13, metadata13) = 
generate_test_metadata_with_size("13", 200);
+        let (object_meta14, metadata14) = 
generate_test_metadata_with_size("14", 500);
+        cache.put(&object_meta12, metadata12);
+        cache.put(&object_meta13, metadata13);
+        cache.put(&object_meta14, metadata14);
+        assert_eq!(cache.len(), 3);
+        assert_eq!(cache.memory_used(), 1000);
+        cache.update_cache_limit(600);
+        assert_eq!(cache.len(), 1);
+        assert_eq!(cache.memory_used(), 500);
+        assert!(!cache.contains_key(&object_meta12));
+        assert!(!cache.contains_key(&object_meta13));
+        assert!(cache.contains_key(&object_meta14));
     }
 }
diff --git a/datafusion/execution/src/cache/lru_queue.rs 
b/datafusion/execution/src/cache/lru_queue.rs
new file mode 100644
index 0000000000..3dc308dc3f
--- /dev/null
+++ b/datafusion/execution/src/cache/lru_queue.rs
@@ -0,0 +1,537 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::HashMap,
+    hash::Hash,
+    sync::{Arc, Weak},
+};
+
+use parking_lot::Mutex;
+
+#[derive(Default)]
+/// Provides a Least Recently Used queue with unbounded capacity.
+///
+/// # Examples
+///
+/// ```
+/// use datafusion_execution::cache::lru_queue::LruQueue;
+///
+/// let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+/// lru_queue.put(1, 10);
+/// lru_queue.put(2, 20);
+/// lru_queue.put(3, 30);
+/// assert_eq!(lru_queue.get(&2), Some(&20));
+/// assert_eq!(lru_queue.pop(), Some((1, 10)));
+/// assert_eq!(lru_queue.pop(), Some((3, 30)));
+/// assert_eq!(lru_queue.pop(), Some((2, 20)));
+/// assert_eq!(lru_queue.pop(), None);
+/// ```
+pub struct LruQueue<K: Eq + Hash + Clone, V> {
+    data: LruData<K, V>,
+    queue: LruList<K>,
+}
+
+/// Maps the key to the [`LruNode`] in queue and the value.
+type LruData<K, V> = HashMap<K, (Arc<Mutex<LruNode<K>>>, V)>;
+
+#[derive(Default)]
+/// Doubly-linked list that maintains the LRU order
+struct LruList<K> {
+    head: Link<K>,
+    tail: Link<K>,
+}
+
+/// Doubly-linked list node.
+struct LruNode<K> {
+    key: K,
+    prev: Link<K>,
+    next: Link<K>,
+}
+
+/// Weak pointer to a [`LruNode`], used to connect nodes in the doubly-linked 
list.
+/// The strong reference is guaranteed to be stored in the `data` map of the 
[`LruQueue`].
+type Link<K> = Option<Weak<Mutex<LruNode<K>>>>;
+
+impl<K: Eq + Hash + Clone, V> LruQueue<K, V> {
+    pub fn new() -> Self {
+        Self {
+            data: HashMap::new(),
+            queue: LruList {
+                head: None,
+                tail: None,
+            },
+        }
+    }
+
+    /// Returns a reference to value mapped by `key`, if it exists.
+    /// If the entry exists, it becomes the most recently used.
+    pub fn get(&mut self, key: &K) -> Option<&V> {
+        if let Some(value) = self.remove(key) {
+            self.put(key.clone(), value);
+        }
+        self.data.get(key).map(|(_, value)| value)
+    }
+
+    /// Returns a reference to value mapped by `key`, if it exists.
+    /// Does not affect the queue order.
+    pub fn peek(&self, key: &K) -> Option<&V> {
+        self.data.get(key).map(|(_, value)| value)
+    }
+
+    /// Checks whether there is an entry with key `key` in the queue.
+    /// Does not affect the queue order.
+    pub fn contains_key(&self, key: &K) -> bool {
+        self.data.contains_key(key)
+    }
+
+    /// Inserts an entry in the queue, becoming the most recently used.
+    /// If the entry already exists, returns the previous value.
+    pub fn put(&mut self, key: K, value: V) -> Option<V> {
+        let old_value = self.remove(&key);
+
+        let node = Arc::new(Mutex::new(LruNode {
+            key: key.clone(),
+            prev: None,
+            next: None,
+        }));
+
+        match self.queue.head {
+            // queue is not empty
+            Some(ref old_head) => {
+                old_head
+                    .upgrade()
+                    .expect("value has been unexpectedly dropped")
+                    .lock()
+                    .prev = Some(Arc::downgrade(&node));
+                node.lock().next = Some(Weak::clone(old_head));
+                self.queue.head = Some(Arc::downgrade(&node));
+            }
+            // queue is empty
+            _ => {
+                self.queue.head = Some(Arc::downgrade(&node));
+                self.queue.tail = Some(Arc::downgrade(&node));
+            }
+        }
+
+        self.data.insert(key, (node, value));
+
+        old_value
+    }
+
+    /// Removes and returns the least recently used value.
+    /// Returns `None` if the queue is empty.
+    pub fn pop(&mut self) -> Option<(K, V)> {
+        let key_to_remove = self.queue.tail.as_ref().map(|n| {
+            n.upgrade()
+                .expect("value has been unexpectedly dropped")
+                .lock()
+                .key
+                .clone()
+        });
+        if let Some(k) = key_to_remove {
+            let value = self.remove(&k).unwrap(); // confirmed above that the 
entry exists
+            Some((k, value))
+        } else {
+            None
+        }
+    }
+
+    /// Removes a specific entry from the queue, if it exists.
+    pub fn remove(&mut self, key: &K) -> Option<V> {
+        if let Some((old_node, old_value)) = self.data.remove(key) {
+            let LruNode { key: _, prev, next } = &*old_node.lock();
+            match (prev, next) {
+                // single node in the queue
+                (None, None) => {
+                    self.queue.head = None;
+                    self.queue.tail = None;
+                }
+                // removed the head node
+                (None, Some(n)) => {
+                    let n_strong =
+                        n.upgrade().expect("value has been unexpectedly 
dropped");
+                    n_strong.lock().prev = None;
+                    self.queue.head = Some(Weak::clone(n));
+                }
+                // removed the tail node
+                (Some(p), None) => {
+                    let p_strong =
+                        p.upgrade().expect("value has been unexpectedly 
dropped");
+                    p_strong.lock().next = None;
+                    self.queue.tail = Some(Weak::clone(p));
+                }
+                // removed a middle node
+                (Some(p), Some(n)) => {
+                    let n_strong =
+                        n.upgrade().expect("value has been unexpectedly 
dropped");
+                    let p_strong =
+                        p.upgrade().expect("value has been unexpectedly 
dropped");
+                    n_strong.lock().prev = Some(Weak::clone(p));
+                    p_strong.lock().next = Some(Weak::clone(n));
+                }
+            };
+            Some(old_value)
+        } else {
+            None
+        }
+    }
+
+    /// Returns the number of entries in the queue.
+    pub fn len(&self) -> usize {
+        self.data.len()
+    }
+
+    /// Checks whether the queue has no items.
+    pub fn is_empty(&self) -> bool {
+        self.data.is_empty()
+    }
+
+    // Removes all entries from the queue.
+    pub fn clear(&mut self) {
+        self.queue.head = None;
+        self.queue.tail = None;
+        self.data.clear();
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use rand::seq::IndexedRandom;
+
+    use crate::cache::lru_queue::LruQueue;
+
+    #[test]
+    fn test_get() {
+        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+
+        // value does not exist
+        assert_eq!(lru_queue.get(&1), None);
+
+        // value exists
+        lru_queue.put(1, 10);
+        assert_eq!(lru_queue.get(&1), Some(&10));
+        assert_eq!(lru_queue.get(&1), Some(&10));
+
+        // value is removed
+        lru_queue.remove(&1);
+        assert_eq!(lru_queue.get(&1), None);
+    }
+
+    #[test]
+    fn test_peek() {
+        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+
+        // value does not exist
+        assert_eq!(lru_queue.peek(&1), None);
+
+        // value exists
+        lru_queue.put(1, 10);
+        assert_eq!(lru_queue.peek(&1), Some(&10));
+        assert_eq!(lru_queue.peek(&1), Some(&10));
+
+        // value is removed
+        lru_queue.remove(&1);
+        assert_eq!(lru_queue.peek(&1), None);
+    }
+
+    #[test]
+    fn test_put() {
+        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+
+        // no previous value
+        assert_eq!(lru_queue.put(1, 10), None);
+
+        // update, the previous value is returned
+        assert_eq!(lru_queue.put(1, 11), Some(10));
+        assert_eq!(lru_queue.put(1, 12), Some(11));
+        assert_eq!(lru_queue.put(1, 13), Some(12));
+    }
+
+    #[test]
+    fn test_remove() {
+        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+
+        // value does not exist
+        assert_eq!(lru_queue.remove(&1), None);
+
+        // value exists and is returned
+        lru_queue.put(1, 10);
+        assert_eq!(lru_queue.remove(&1), Some(10));
+
+        // value does not exist
+        assert_eq!(lru_queue.remove(&1), None);
+    }
+
+    #[test]
+    fn test_contains_key() {
+        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+
+        // value does not exist
+        assert!(!lru_queue.contains_key(&1));
+
+        // value exists
+        lru_queue.put(1, 10);
+        assert!(lru_queue.contains_key(&1));
+
+        // value is removed
+        lru_queue.remove(&1);
+        assert!(!lru_queue.contains_key(&1));
+    }
+
+    #[test]
+    fn test_len() {
+        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+
+        // empty
+        assert_eq!(lru_queue.len(), 0);
+
+        // puts
+        lru_queue.put(1, 10);
+        assert_eq!(lru_queue.len(), 1);
+        lru_queue.put(2, 20);
+        assert_eq!(lru_queue.len(), 2);
+        lru_queue.put(3, 30);
+        assert_eq!(lru_queue.len(), 3);
+        lru_queue.put(1, 11);
+        lru_queue.put(3, 31);
+        assert_eq!(lru_queue.len(), 3);
+
+        // removes
+        lru_queue.remove(&1);
+        assert_eq!(lru_queue.len(), 2);
+        lru_queue.remove(&1);
+        assert_eq!(lru_queue.len(), 2);
+        lru_queue.remove(&4);
+        assert_eq!(lru_queue.len(), 2);
+        lru_queue.remove(&3);
+        assert_eq!(lru_queue.len(), 1);
+        lru_queue.remove(&2);
+        assert_eq!(lru_queue.len(), 0);
+        lru_queue.remove(&2);
+        assert_eq!(lru_queue.len(), 0);
+
+        // clear
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        assert_eq!(lru_queue.len(), 3);
+        lru_queue.clear();
+        assert_eq!(lru_queue.len(), 0);
+    }
+
+    #[test]
+    fn test_is_empty() {
+        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+
+        // empty
+        assert!(lru_queue.is_empty());
+
+        // puts
+        lru_queue.put(1, 10);
+        assert!(!lru_queue.is_empty());
+        lru_queue.put(2, 20);
+        assert!(!lru_queue.is_empty());
+
+        // removes
+        lru_queue.remove(&1);
+        assert!(!lru_queue.is_empty());
+        lru_queue.remove(&1);
+        assert!(!lru_queue.is_empty());
+        lru_queue.remove(&2);
+        assert!(lru_queue.is_empty());
+
+        // clear
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        assert!(!lru_queue.is_empty());
+        lru_queue.clear();
+        assert!(lru_queue.is_empty());
+    }
+
+    #[test]
+    fn test_clear() {
+        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+
+        // empty
+        lru_queue.clear();
+
+        // filled
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        assert_eq!(lru_queue.get(&1), Some(&10));
+        assert_eq!(lru_queue.get(&2), Some(&20));
+        assert_eq!(lru_queue.get(&3), Some(&30));
+        lru_queue.clear();
+        assert_eq!(lru_queue.get(&1), None);
+        assert_eq!(lru_queue.get(&2), None);
+        assert_eq!(lru_queue.get(&3), None);
+        assert_eq!(lru_queue.len(), 0);
+    }
+
+    #[test]
+    fn test_pop() {
+        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+
+        // empty queue
+        assert_eq!(lru_queue.pop(), None);
+
+        // simplest case
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        assert_eq!(lru_queue.pop(), Some((1, 10)));
+        assert_eq!(lru_queue.pop(), Some((2, 20)));
+        assert_eq!(lru_queue.pop(), Some((3, 30)));
+        assert_eq!(lru_queue.pop(), None);
+
+        // 'get' changes the order
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        lru_queue.get(&2);
+        assert_eq!(lru_queue.pop(), Some((1, 10)));
+        assert_eq!(lru_queue.pop(), Some((3, 30)));
+        assert_eq!(lru_queue.pop(), Some((2, 20)));
+        assert_eq!(lru_queue.pop(), None);
+
+        // multiple 'gets'
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        lru_queue.get(&2);
+        lru_queue.get(&3);
+        lru_queue.get(&1);
+        assert_eq!(lru_queue.pop(), Some((2, 20)));
+        assert_eq!(lru_queue.pop(), Some((3, 30)));
+        assert_eq!(lru_queue.pop(), Some((1, 10)));
+        assert_eq!(lru_queue.pop(), None);
+
+        // 'peak' does not change the order
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        lru_queue.peek(&2);
+        assert_eq!(lru_queue.pop(), Some((1, 10)));
+        assert_eq!(lru_queue.pop(), Some((2, 20)));
+        assert_eq!(lru_queue.pop(), Some((3, 30)));
+        assert_eq!(lru_queue.pop(), None);
+
+        // 'contains' does not change the order
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        lru_queue.contains_key(&2);
+        assert_eq!(lru_queue.pop(), Some((1, 10)));
+        assert_eq!(lru_queue.pop(), Some((2, 20)));
+        assert_eq!(lru_queue.pop(), Some((3, 30)));
+        assert_eq!(lru_queue.pop(), None);
+
+        // 'put' on the same key promotes it
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        lru_queue.put(2, 21);
+        assert_eq!(lru_queue.pop(), Some((1, 10)));
+        assert_eq!(lru_queue.pop(), Some((3, 30)));
+        assert_eq!(lru_queue.pop(), Some((2, 21)));
+        assert_eq!(lru_queue.pop(), None);
+
+        // multiple 'puts'
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        lru_queue.put(2, 21);
+        lru_queue.put(3, 31);
+        lru_queue.put(1, 11);
+        assert_eq!(lru_queue.pop(), Some((2, 21)));
+        assert_eq!(lru_queue.pop(), Some((3, 31)));
+        assert_eq!(lru_queue.pop(), Some((1, 11)));
+        assert_eq!(lru_queue.pop(), None);
+
+        // 'remove' an element in the middle of the queue
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        lru_queue.remove(&2);
+        assert_eq!(lru_queue.pop(), Some((1, 10)));
+        assert_eq!(lru_queue.pop(), Some((3, 30)));
+        assert_eq!(lru_queue.pop(), None);
+
+        // 'remove' the LRU
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        lru_queue.remove(&1);
+        assert_eq!(lru_queue.pop(), Some((2, 20)));
+        assert_eq!(lru_queue.pop(), Some((3, 30)));
+        assert_eq!(lru_queue.pop(), None);
+
+        // 'remove' the MRU
+        lru_queue.put(1, 10);
+        lru_queue.put(2, 20);
+        lru_queue.put(3, 30);
+        lru_queue.remove(&3);
+        assert_eq!(lru_queue.pop(), Some((1, 10)));
+        assert_eq!(lru_queue.pop(), Some((2, 20)));
+        assert_eq!(lru_queue.pop(), None);
+    }
+
+    #[test]
+    /// Fuzzy test using an hashmap as the base to check the methods.
+    fn test_fuzzy() {
+        let mut lru_queue: LruQueue<i32, i32> = LruQueue::new();
+        let mut map: HashMap<i32, i32> = HashMap::new();
+        let max_keys = 1_000;
+        let methods = ["get", "put", "remove", "pop", "contains", "len"];
+        let mut rng = rand::rng();
+
+        for i in 0..1_000_000 {
+            match *methods.choose(&mut rng).unwrap() {
+                "get" => {
+                    assert_eq!(lru_queue.get(&(i % max_keys)), map.get(&(i % 
max_keys)))
+                }
+                "put" => assert_eq!(
+                    lru_queue.put(i % max_keys, i),
+                    map.insert(i % max_keys, i)
+                ),
+                "remove" => assert_eq!(
+                    lru_queue.remove(&(i % max_keys)),
+                    map.remove(&(i % max_keys))
+                ),
+                "pop" => {
+                    let removed = lru_queue.pop();
+                    if let Some((k, v)) = removed {
+                        assert_eq!(Some(v), map.remove(&k))
+                    }
+                }
+                "contains" => {
+                    assert_eq!(
+                        lru_queue.contains_key(&(i % max_keys)),
+                        map.contains_key(&(i % max_keys))
+                    )
+                }
+                "len" => assert_eq!(lru_queue.len(), map.len()),
+                _ => unreachable!(),
+            }
+        }
+    }
+}
diff --git a/datafusion/execution/src/cache/mod.rs 
b/datafusion/execution/src/cache/mod.rs
index 4271bebd0b..b1857c94fa 100644
--- a/datafusion/execution/src/cache/mod.rs
+++ b/datafusion/execution/src/cache/mod.rs
@@ -17,6 +17,7 @@
 
 pub mod cache_manager;
 pub mod cache_unit;
+pub mod lru_queue;
 
 /// The cache accessor, users usually working on this interface while 
manipulating caches.
 /// This interface does not get `mut` references and thus has to handle its own
diff --git a/datafusion/execution/src/runtime_env.rs 
b/datafusion/execution/src/runtime_env.rs
index bb9025391f..fc26f997a2 100644
--- a/datafusion/execution/src/runtime_env.rs
+++ b/datafusion/execution/src/runtime_env.rs
@@ -268,6 +268,12 @@ impl RuntimeEnvBuilder {
         
self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
     }
 
+    /// Specify the limit of the file-embedded metadata cache, in bytes.
+    pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
+        self.cache_manager = 
self.cache_manager.with_metadata_cache_limit(limit);
+        self
+    }
+
     /// Build a RuntimeEnv
     pub fn build(self) -> Result<RuntimeEnv> {
         let Self {
@@ -305,7 +311,10 @@ impl RuntimeEnvBuilder {
                 .cache_manager
                 .get_file_statistic_cache(),
             list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
-            file_metadata_cache: 
runtime_env.cache_manager.get_file_metadata_cache(),
+            file_metadata_cache: Some(
+                runtime_env.cache_manager.get_file_metadata_cache(),
+            ),
+            metadata_cache_limit: 
runtime_env.cache_manager.get_metadata_cache_limit(),
         };
 
         Self {
@@ -337,6 +346,11 @@ impl RuntimeEnvBuilder {
                 key: "datafusion.runtime.temp_directory".to_string(),
                 value: None, // Default is system-dependent
                 description: "The path to the temporary file directory.",
+            },
+            ConfigEntry {
+                key: "datafusion.runtime.metadata_cache_limit".to_string(),
+                value: Some("50M".to_owned()),
+                description: "Maximum memory to use for file metadata cache 
such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G 
(gigabytes). Example: '2G' for 2 gigabytes.",
             }
         ]
     }
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 3fc8e98437..9190452c0a 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -187,11 +187,12 @@ SET datafusion.runtime.memory_limit = '2G';
 
 The following runtime configuration settings are available:
 
-| key                                        | default | description           
                                                                                
                                      |
-| ------------------------------------------ | ------- | 
-------------------------------------------------------------------------------------------------------------------------------------------
 |
-| datafusion.runtime.max_temp_directory_size | 100G    | Maximum temporary 
file directory size. Supports suffixes K (kilobytes), M (megabytes), and G 
(gigabytes). Example: '2G' for 2 gigabytes.    |
-| datafusion.runtime.memory_limit            | NULL    | Maximum memory limit 
for query execution. Supports suffixes K (kilobytes), M (megabytes), and G 
(gigabytes). Example: '2G' for 2 gigabytes. |
-| datafusion.runtime.temp_directory          | NULL    | The path to the 
temporary file directory.                                                       
                                            |
+| key                                        | default | description           
                                                                                
                                                                    |
+| ------------------------------------------ | ------- | 
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |
+| datafusion.runtime.max_temp_directory_size | 100G    | Maximum temporary 
file directory size. Supports suffixes K (kilobytes), M (megabytes), and G 
(gigabytes). Example: '2G' for 2 gigabytes.                                  |
+| datafusion.runtime.memory_limit            | NULL    | Maximum memory limit 
for query execution. Supports suffixes K (kilobytes), M (megabytes), and G 
(gigabytes). Example: '2G' for 2 gigabytes.                               |
+| datafusion.runtime.metadata_cache_limit    | 50M     | Maximum memory to use 
for file metadata cache such as Parquet metadata. Supports suffixes K 
(kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. |
+| datafusion.runtime.temp_directory          | NULL    | The path to the 
temporary file directory.                                                       
                                                                          |
 
 # Tuning Guide
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to