alamb commented on code in PR #17031:
URL: https://github.com/apache/datafusion/pull/17031#discussion_r2252448318
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -449,17 +449,14 @@ impl FileFormat for ParquetFormat {
// Use the CachedParquetFileReaderFactory when metadata caching is
enabled
if self.options.global.cache_metadata {
Review Comment:
What are your thoughts about (in a follow on PR) removing the
`options.cache_metadata` and always trying to save the metadata (which will be
a noop if there is no room)?
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -65,30 +73,38 @@ impl Debug for dyn FileMetadataCache {
}
}
-#[derive(Default, Debug)]
+#[derive(Debug)]
pub struct CacheManager {
file_statistic_cache: Option<FileStatisticsCache>,
list_files_cache: Option<ListFilesCache>,
- file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
+ file_metadata_cache: Arc<dyn FileMetadataCache>,
Review Comment:
Seeing the idea of having a default file_metadata_cache installed got me
thinking about @BlakeOrth's comment here:
https://github.com/apache/datafusion/pull/16971#issuecomment-3145516316
After this work to cache file metadata, it seems like we may want to
consider adding default caches for ListFiles and FileStatistics as well 🤔 (as
a follow on PR of course)
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -158,33 +159,168 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for
DefaultListFilesCache {
}
}
+/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct.
+struct DefaultFilesMetadataCacheState {
+ lru_cache: LruCache<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
+ memory_limit: Option<usize>,
+ memory_used: usize,
+}
+
+impl DefaultFilesMetadataCacheState {
+ fn new(memory_limit: Option<usize>) -> Self {
+ Self {
+ lru_cache: LruCache::unbounded(),
+ memory_limit,
+ memory_used: 0,
+ }
+ }
+
+ /// Returns the respective entry from the cache, if it exists and the
`size` and `last_modified`
+ /// properties from [`ObjectMeta`] match.
+ /// If the entry exists, it becomes the most recently used.
+ fn get(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
+ self.lru_cache
+ .get(&k.location)
+ .map(|(object_meta, metadata)| {
+ if object_meta.size != k.size
+ || object_meta.last_modified != k.last_modified
+ {
+ None
+ } else {
+ Some(Arc::clone(metadata))
+ }
+ })
+ .unwrap_or(None)
+ }
+
+ /// Checks if the metadata is currently cached (entry exists and the
`size` and `last_modified`
+ /// properties of [`ObjectMeta`] match).
+ /// The LRU queue is not updated.
+ fn contains_key(&self, k: &ObjectMeta) -> bool {
+ self.lru_cache
+ .peek(&k.location)
+ .map(|(object_meta, _)| {
+ object_meta.size == k.size && object_meta.last_modified ==
k.last_modified
+ })
+ .unwrap_or(false)
+ }
+
+ /// Adds a new key-value pair to cache, meaning LRU entries might be
evicted if required.
+ /// If the key is already in the cache, the previous metadata is returned.
+ /// If the size of the metadata is greater than the `memory_limit`, the
value is not inserted.
+ fn put(
+ &mut self,
+ key: ObjectMeta,
+ value: Arc<dyn FileMetadata>,
+ ) -> Option<Arc<dyn FileMetadata>> {
+ let value_size = value.memory_size();
+
+ if let Some(limit) = self.memory_limit {
+ // no point in trying to add this value to the cache if it cannot
fit entirely
+ if value_size > limit {
+ return None;
+ }
+ }
+
+ // if the key is already in the cache, the old value is removed
+ let old_value = self.lru_cache.put(key.location.clone(), (key, value));
+ self.memory_used += value_size;
+ if let Some((_, ref old_metadata)) = old_value {
+ self.memory_used -= old_metadata.memory_size();
+ }
+
+ self.evict_entries();
+
+ old_value.map(|v| v.1)
+ }
+
+ /// Evicts entries from the LRU cache until `memory_used` is lower than
`memory_limit`.
+ /// If `memory_limit` is `None`, no entries are removed.
+ fn evict_entries(&mut self) {
+ let Some(memory_limit) = self.memory_limit else {
+ return;
+ };
+
+ while self.memory_used > memory_limit {
+ if let Some(removed) = self.lru_cache.pop_lru() {
+ let metadata: Arc<dyn FileMetadata> = removed.1 .1;
+ self.memory_used -= metadata.memory_size();
+ } else {
+ // cache is empty while memory_used > memory_limit, cannot
happen
+ unreachable!();
Review Comment:
would it be safer to `return` here rather than panic!?
Maybe as a half way thing we could leave a debug_assert so debug versions
hit an error but release builds are unaffected:
```suggestion
// use debug assert to find issue in debug builds, but don't
panic release builds
debug_assert!(false, "cache is empty while memory_used >
memory_limit, cannot happen")
return;
```
##########
datafusion/execution/src/runtime_env.rs:
##########
@@ -337,6 +349,11 @@ impl RuntimeEnvBuilder {
key: "datafusion.runtime.temp_directory".to_string(),
value: None, // Default is system-dependent
description: "The path to the temporary file directory.",
+ },
+ ConfigEntry {
+ key:
"datafusion.runtime.file_metadata_cache_limit".to_string(),
+ value: Some("1G".to_owned()),
+ description: "Maximum memory limit for the file-embedded
metadata cache. Supports suffixes K (kilobytes), M (megabytes), and G
(gigabytes). Example: '2G' for 2 gigabytes.",
Review Comment:
What would you think about making this configuration setting more general,
looking forward to adding more things to this cache over time. For example, how
about this:
`datafusion.runtime.metadata_cache_limit`
"Maximum memory to use for per-file metadata cache such as Parquet metadata.
Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example:
'2G' for 2 gigabytes."
##########
Cargo.toml:
##########
@@ -153,6 +153,7 @@ hex = { version = "0.4.3" }
indexmap = "2.10.0"
itertools = "0.14"
log = "^0.4"
+lru = "0.16.0"
Review Comment:
I agree it seems to be a reasonable crate. However, I think in general if we
can avoid new dependencies in DataFusion that would be good -- our dependency
trail is already quite large, and I realize one new dependency doesn't seem
like much (but that is what we said when introducing all the existing ones too
😢 )
Note `lru` is also a net new dependency (no existing DataFusion dependency
uses it)
It also has a bunch of `unsafe` which isn't necessarily a deal breaker
itself, but unless it is performance critical I think we should avoid a
potential source of crashing / non deterministic bugs
##########
Cargo.toml:
##########
@@ -153,6 +153,7 @@ hex = { version = "0.4.3" }
indexmap = "2.10.0"
itertools = "0.14"
log = "^0.4"
+lru = "0.16.0"
Review Comment:
However, I did some research and I think implementing a LRU cache in Rust
that actually has O(1) properties will be non trivial (there is a good writeup
here: https://seanchen1991.github.io/posts/lru-cache/)
My personal preference would be to implement something custom but I am
really torn about this, especially given it would be nice to implement other
LRU caches (like listing and statistics, for example) 🤔
The best I could come up with was using a `HashMap<Path, usize>` that maps
to an index, in a VecDequeue that implements the linked list implemented as
described in the blog. I don't think it would be simple though
##########
datafusion/core/tests/sql/runtime_config.rs:
##########
@@ -200,6 +200,40 @@ async fn test_max_temp_directory_size_enforcement() {
);
}
+#[tokio::test]
+async fn test_file_metadata_cache_limit() {
+ let ctx = SessionContext::new();
+
+ let update_limit = async |ctx: &SessionContext, limit: &str| {
+ ctx.sql(
+ format!("SET datafusion.runtime.file_metadata_cache_limit =
'{limit}'")
+ .as_str(),
+ )
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+ };
+
+ let get_limit = |ctx: &SessionContext| -> Option<usize> {
+ ctx.task_ctx()
+ .runtime_env()
+ .cache_manager
+ .get_file_metadata_cache()
+ .cache_limit()
+ };
+
+ update_limit(&ctx, "100M").await;
+ assert_eq!(get_limit(&ctx), Some(100 * 1024 * 1024));
+
+ update_limit(&ctx, "2G").await;
+ assert_eq!(get_limit(&ctx), Some(2 * 1024 * 1024 * 1024));
+
+ update_limit(&ctx, "123K").await;
Review Comment:
nice
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -39,12 +39,20 @@ pub trait FileMetadata: Any + Send + Sync {
/// Returns the file metadata as [`Any`] so that it can be downcasted to a
specific
/// implementation.
fn as_any(&self) -> &dyn Any;
+
+ /// Returns the size of the metadata in bytes.
+ fn memory_size(&self) -> usize;
}
/// Cache to store file-embedded metadata.
pub trait FileMetadataCache:
CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
{
+ // Returns the cache's memory limit in bytes, or `None` for no limit.
Review Comment:
I don't think the API should allow for unlimited memory usage personally as
memory is finite -- if a user has to set a giant number to get a unlimited
cache, it will at least be clear they are doing so.
With this `Option` API I worry that someone will set it to `None` by mistake
thinking that will mean "don't cache anything" and end up with a memory
explosion on their hands
```suggestion
// Returns the cache's memory limit in bytes, or `None` for no limit.
```
##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -1068,6 +1068,10 @@ impl SessionContext {
builder.with_max_temp_directory_size(directory_size as u64)
}
"temp_directory" => builder.with_temp_file_path(value),
+ "file_metadata_cache_limit" => {
+ let limit = Self::parse_memory_limit(value)?;
+ builder.with_file_metadata_cache_limit(Some(limit))
+ }
Review Comment:
I think having people set the size to something giant (for example,
`u64::MAX`) if they really want to allow unbounded memory usage is a reasonable
UX. It gives a hint that maybe that isn't a great idea to use an unbounded
memory consumer
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -215,25 +350,23 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for
DefaultFilesMetadataCa
}
fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
Review Comment:
Yeah, I suggest we propose this cleanup in a follow on PR / ticket so we can
discuss it separately if desired
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]