martin-g commented on code in PR #20047: URL: https://github.com/apache/datafusion/pull/20047#discussion_r2786165373
########## datafusion/common/src/heap_size.rs: ########## @@ -0,0 +1,459 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::stats::Precision; +use crate::{ColumnStatistics, ScalarValue, Statistics}; +use arrow::array::{ + Array, FixedSizeListArray, LargeListArray, ListArray, MapArray, StructArray, +}; +use arrow::datatypes::{ + DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano, IntervalUnit, + TimeUnit, UnionFields, UnionMode, i256, +}; +use chrono::{DateTime, Utc}; +use half::f16; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +/// This is a temporary solution until <https://github.com/apache/datafusion/pull/19599> and +/// <https://github.com/apache/arrow-rs/pull/9138> are resolved. +/// Trait for calculating the size of various containers +pub trait DFHeapSize { + /// Return the size of any bytes allocated on the heap by this object, + /// including heap memory in those structures + /// + /// Note that the size of the type itself is not included in the result -- + /// instead, that size is added by the caller (e.g. container). + fn heap_size(&self) -> usize; +} + +impl DFHeapSize for Statistics { + fn heap_size(&self) -> usize { + self.num_rows.heap_size() + + self.total_byte_size.heap_size() + + self + .column_statistics + .iter() + .map(|s| s.heap_size()) + .sum::<usize>() + } +} + +impl<T: Debug + Clone + PartialEq + Eq + PartialOrd + DFHeapSize> DFHeapSize + for Precision<T> +{ + fn heap_size(&self) -> usize { + self.get_value().map_or_else(|| 0, |v| v.heap_size()) + } +} + +impl DFHeapSize for ColumnStatistics { + fn heap_size(&self) -> usize { + self.null_count.heap_size() + + self.max_value.heap_size() + + self.min_value.heap_size() + + self.sum_value.heap_size() + + self.distinct_count.heap_size() + + self.byte_size.heap_size() + } +} + +impl DFHeapSize for ScalarValue { + fn heap_size(&self) -> usize { + use crate::scalar::ScalarValue::*; + match self { + Null => 0, + Boolean(b) => b.heap_size(), + Float16(f) => f.heap_size(), + Float32(f) => f.heap_size(), + Float64(f) => f.heap_size(), + Decimal32(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal64(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal128(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal256(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Int8(i) => i.heap_size(), + Int16(i) => i.heap_size(), + Int32(i) => i.heap_size(), + Int64(i) => i.heap_size(), + UInt8(u) => u.heap_size(), + UInt16(u) => u.heap_size(), + UInt32(u) => u.heap_size(), + UInt64(u) => u.heap_size(), + Utf8(u) => u.heap_size(), + Utf8View(u) => u.heap_size(), + LargeUtf8(l) => l.heap_size(), + Binary(b) => b.heap_size(), + BinaryView(b) => b.heap_size(), + FixedSizeBinary(a, b) => a.heap_size() + b.heap_size(), + LargeBinary(l) => l.heap_size(), + FixedSizeList(f) => f.heap_size(), + List(l) => l.heap_size(), + LargeList(l) => l.heap_size(), + Struct(s) => s.heap_size(), + Map(m) => m.heap_size(), + Date32(d) => d.heap_size(), + Date64(d) => d.heap_size(), + Time32Second(t) => t.heap_size(), + Time32Millisecond(t) => t.heap_size(), + Time64Microsecond(t) => t.heap_size(), + Time64Nanosecond(t) => t.heap_size(), + TimestampSecond(a, b) => a.heap_size() + b.heap_size(), + TimestampMillisecond(a, b) => a.heap_size() + b.heap_size(), + TimestampMicrosecond(a, b) => a.heap_size() + b.heap_size(), + TimestampNanosecond(a, b) => a.heap_size() + b.heap_size(), + IntervalYearMonth(i) => i.heap_size(), + IntervalDayTime(i) => i.heap_size(), + IntervalMonthDayNano(i) => i.heap_size(), + DurationSecond(d) => d.heap_size(), + DurationMillisecond(d) => d.heap_size(), + DurationMicrosecond(d) => d.heap_size(), + DurationNanosecond(d) => d.heap_size(), + Union(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Dictionary(a, b) => a.heap_size() + b.heap_size(), + RunEndEncoded(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + } + } +} + +impl DFHeapSize for DataType { + fn heap_size(&self) -> usize { + use DataType::*; + match self { + Null => 0, + Boolean => 0, + Int8 => 0, + Int16 => 0, + Int32 => 0, + Int64 => 0, + UInt8 => 0, + UInt16 => 0, + UInt32 => 0, + UInt64 => 0, + Float16 => 0, + Float32 => 0, + Float64 => 0, + Timestamp(t, s) => t.heap_size() + s.heap_size(), + Date32 => 0, + Date64 => 0, + Time32(t) => t.heap_size(), + Time64(t) => t.heap_size(), + Duration(t) => t.heap_size(), + Interval(i) => i.heap_size(), + Binary => 0, + FixedSizeBinary(i) => i.heap_size(), + LargeBinary => 0, + BinaryView => 0, + Utf8 => 0, + LargeUtf8 => 0, + Utf8View => 0, + List(v) => v.heap_size(), + ListView(v) => v.heap_size(), + FixedSizeList(f, i) => f.heap_size() + i.heap_size(), + LargeList(l) => l.heap_size(), + LargeListView(l) => l.heap_size(), + Struct(s) => s.heap_size(), + Union(u, m) => u.heap_size() + m.heap_size(), + Dictionary(a, b) => a.heap_size() + b.heap_size(), + Decimal32(u8, i8) => u8.heap_size() + i8.heap_size(), + Decimal64(u8, i8) => u8.heap_size() + i8.heap_size(), + Decimal128(u8, i8) => u8.heap_size() + i8.heap_size(), + Decimal256(u8, i8) => u8.heap_size() + i8.heap_size(), + Map(m, b) => m.heap_size() + b.heap_size(), + RunEndEncoded(a, b) => a.heap_size() + b.heap_size(), + } + } +} + +impl<T: DFHeapSize> DFHeapSize for Vec<T> { + fn heap_size(&self) -> usize { + let item_size = size_of::<T>(); + // account for the contents of the Vec + (self.capacity() * item_size) + + // add any heap allocations by contents + self.iter().map(|t| t.heap_size()).sum::<usize>() + } +} + +impl<K: DFHeapSize, V: DFHeapSize> DFHeapSize for HashMap<K, V> { + fn heap_size(&self) -> usize { + let capacity = self.capacity(); + if capacity == 0 { + return 0; + } + + // HashMap doesn't provide a way to get its heap size, so this is an approximation based on + // the behavior of hashbrown::HashMap as at version 0.16.0, and may become inaccurate + // if the implementation changes. + let key_val_size = size_of::<(K, V)>(); + // Overhead for the control tags group, which may be smaller depending on architecture + let group_size = 16; + // 1 byte of metadata stored per bucket. + let metadata_size = 1; + + // Compute the number of buckets for the capacity. Based on hashbrown's capacity_to_buckets + let buckets = if capacity < 15 { + let min_cap = match key_val_size { + 0..=1 => 14, + 2..=3 => 7, + _ => 3, + }; + let cap = min_cap.max(capacity); + if cap < 4 { + 4 + } else if cap < 8 { + 8 + } else { + 16 + } + } else { + (capacity.saturating_mul(8) / 7).next_power_of_two() + }; + + group_size + + (buckets * (key_val_size + metadata_size)) + + self.keys().map(|k| k.heap_size()).sum::<usize>() + + self.values().map(|v| v.heap_size()).sum::<usize>() + } +} + +impl<T: DFHeapSize> DFHeapSize for Arc<T> { + fn heap_size(&self) -> usize { + // Arc stores weak and strong counts on the heap alongside an instance of T + 2 * size_of::<usize>() + size_of::<T>() + self.as_ref().heap_size() Review Comment: This won't be accurate. ```rust let a1 = Arc::new(vec![1, 2, 3]); let a2 = a1.clone(); let a3 = a1.clone(); let a4 = a3.clone(); // this should be true because all `a`s point to the same object in memory // but the current implementation does not detect this and counts them separately assert_eq!(a4.heap_size(), a1.heap_size() + a2.heap_size() + a3.heap_size() + a4.heap_size()); ``` The only solution I imagine is the caller to keep track of the pointer addresses which have been "sized" and ignore any Arc's which point to an address which has been "sized" earlier. ########## datafusion/core/tests/sql/runtime_config.rs: ########## @@ -325,6 +326,51 @@ async fn test_list_files_cache_ttl() { assert_eq!(get_limit(&ctx), Duration::from_secs(90)); } +#[tokio::test] +async fn test_file_statistics_cache_limit() { + let list_files_cache = Arc::new(DefaultFileStatisticsCache::default()); + + let rt = RuntimeEnvBuilder::new() + .with_cache_manager( + CacheManagerConfig::default() + .with_file_statistics_cache(Some(list_files_cache)), Review Comment: ```suggestion .with_file_statistics_cache(Some(file_statistics_cache)), ``` ########## datafusion/execution/src/cache/cache_unit.rs: ########## Review Comment: This is no more true ########## datafusion/core/src/execution/context/mod.rs: ########## @@ -1187,6 +1188,10 @@ impl SessionContext { let duration = Self::parse_duration(value)?; builder.with_object_list_cache_ttl(Some(duration)) } + "file_statistics_cache_limit" => { + let limit = Self::parse_memory_limit(value)?; Review Comment: Not caused this PR but `parse_memory_limit()` panics when the value is an empty string (`attempt to subtract with overflow`). Needs to be improved either in this PR or a follow-up. ########## datafusion/execution/src/cache/cache_unit.rs: ########## @@ -41,32 +41,142 @@ pub use crate::cache::DefaultFilesMetadataCache; /// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache #[derive(Default)] pub struct DefaultFileStatisticsCache { - cache: DashMap<Path, CachedFileMetadata>, + state: Mutex<DefaultFileStatisticsCacheState>, +} + +impl DefaultFileStatisticsCache { + pub fn new(memory_limit: usize) -> Self { + Self { + state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)), + } + } + + /// Returns the size of the cached memory, in bytes. + pub fn memory_used(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_used + } +} + +pub struct DefaultFileStatisticsCacheState { + lru_queue: LruQueue<Path, CachedFileMetadata>, + memory_limit: usize, + memory_used: usize, +} + +pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +impl Default for DefaultFileStatisticsCacheState { + fn default() -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + memory_used: 0, + } + } } +impl DefaultFileStatisticsCacheState { + fn new(memory_limit: usize) -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit, + memory_used: 0, + } + } + fn get(&mut self, key: &Path) -> Option<CachedFileMetadata> { + self.lru_queue.get(key).cloned() + } + + fn put( + &mut self, + key: &Path, + value: CachedFileMetadata, + ) -> Option<CachedFileMetadata> { + let entry_size = value.heap_size(); + + if entry_size > self.memory_limit { + return None; + } + + let old_value = self.lru_queue.put(key.clone(), value); + self.memory_used += entry_size; + + if let Some(old_entry) = &old_value { + self.memory_used -= old_entry.heap_size(); + } Review Comment: ```suggestion } else { self.memory_used += key.heap_size(); } ``` ########## datafusion/execution/src/cache/cache_unit.rs: ########## @@ -406,11 +527,101 @@ mod tests { num_rows: Precision::Absent, num_columns: 1, table_size_bytes: Precision::Absent, - statistics_size_bytes: 0, + statistics_size_bytes: 72, has_ordering: true, } ), ]) ); } + + #[test] + fn test_cache_entry_added_when_entries_are_within_cache_limit() { + let (meta_1, value_1) = create_cached_file_metadata_with_stats("test1.parquet"); + let (meta_2, value_2) = create_cached_file_metadata_with_stats("test2.parquet"); + let (meta_3, value_3) = create_cached_file_metadata_with_stats("test3.parquet"); + + let limit_for_2_entries = value_1.heap_size() + value_2.heap_size(); + + // create a cache with a limit which fits exactly 2 entries + let cache = DefaultFileStatisticsCache::new(limit_for_2_entries); + + cache.put(&meta_1.location, value_1.clone()); + cache.put(&meta_2.location, value_2.clone()); + + assert_eq!(cache.len(), 2); + assert_eq!(cache.memory_used(), limit_for_2_entries); + + let result_1 = cache.get(&meta_1.location); + let result_2 = cache.get(&meta_2.location); + assert_eq!(result_1.unwrap(), value_1); + assert_eq!(result_2.unwrap(), value_2); + + // adding the third entry evicts the first entry + cache.put(&meta_3.location, value_3.clone()); + assert_eq!(cache.len(), 2); + assert_eq!(cache.memory_used(), limit_for_2_entries); + + let result_1 = cache.get(&meta_1.location); + assert!(result_1.is_none()); + + let result_2 = cache.get(&meta_2.location); + let result_3 = cache.get(&meta_3.location); + + assert_eq!(result_2.unwrap(), value_2); + assert_eq!(result_3.unwrap(), value_3); + + cache.remove(&meta_2.location); + assert_eq!(cache.len(), 1); + assert_eq!(cache.memory_used(), value_3.heap_size()); + + cache.clear(); + assert_eq!(cache.len(), 0); + assert_eq!(cache.memory_used(), 0); + } + + #[test] + fn test_cache_rejects_entry_which_is_too_large() { + let (meta, value) = create_cached_file_metadata_with_stats("test1.parquet"); + + let limit_less_than_the_entry = value.heap_size() - 1; + + // create a cache with a size less than the entry + let cache = DefaultFileStatisticsCache::new(limit_less_than_the_entry); + + cache.put(&meta.location, value); + + assert_eq!(cache.len(), 0); + assert_eq!(cache.memory_used(), 0); + } + + fn create_cached_file_metadata_with_stats( + file_name: &str, + ) -> (ObjectMeta, CachedFileMetadata) { + let series: Vec<i32> = (0..=10).step_by(1).collect(); Review Comment: ```suggestion let series: Vec<i32> = (0..=10).collect(); ``` ########## datafusion/execution/src/cache/cache_unit.rs: ########## @@ -41,32 +41,142 @@ pub use crate::cache::DefaultFilesMetadataCache; /// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache #[derive(Default)] pub struct DefaultFileStatisticsCache { - cache: DashMap<Path, CachedFileMetadata>, + state: Mutex<DefaultFileStatisticsCacheState>, +} + +impl DefaultFileStatisticsCache { + pub fn new(memory_limit: usize) -> Self { + Self { + state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)), + } + } + + /// Returns the size of the cached memory, in bytes. + pub fn memory_used(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_used + } +} + +pub struct DefaultFileStatisticsCacheState { + lru_queue: LruQueue<Path, CachedFileMetadata>, + memory_limit: usize, + memory_used: usize, +} + +pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +impl Default for DefaultFileStatisticsCacheState { + fn default() -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + memory_used: 0, + } + } } +impl DefaultFileStatisticsCacheState { + fn new(memory_limit: usize) -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit, + memory_used: 0, + } + } + fn get(&mut self, key: &Path) -> Option<CachedFileMetadata> { + self.lru_queue.get(key).cloned() + } + + fn put( + &mut self, + key: &Path, + value: CachedFileMetadata, + ) -> Option<CachedFileMetadata> { + let entry_size = value.heap_size(); + + if entry_size > self.memory_limit { + return None; + } + + let old_value = self.lru_queue.put(key.clone(), value); + self.memory_used += entry_size; + + if let Some(old_entry) = &old_value { + self.memory_used -= old_entry.heap_size(); + } + + self.evict_entries(); + + old_value + } + + fn remove(&mut self, k: &Path) -> Option<CachedFileMetadata> { + if let Some(old_entry) = self.lru_queue.remove(k) { + self.memory_used -= old_entry.heap_size(); + Some(old_entry) + } else { + None + } + } + + fn contains_key(&self, k: &Path) -> bool { + self.lru_queue.contains_key(k) + } + + fn len(&self) -> usize { + self.lru_queue.len() + } + + fn clear(&mut self) { + self.lru_queue.clear(); + self.memory_used = 0; + } + + fn evict_entries(&mut self) { + while self.memory_used > self.memory_limit { + if let Some(removed) = self.lru_queue.pop() { + self.memory_used -= removed.1.heap_size(); + } else { + // cache is empty while memory_used > memory_limit, cannot happen + debug_assert!( + false, + "cache is empty while memory_used > memory_limit, cannot happen" Review Comment: The error message should probably say something like: "This is a bug! Please report it to the Apache DataFusion developers" ########## datafusion/execution/src/cache/cache_unit.rs: ########## @@ -41,32 +41,142 @@ pub use crate::cache::DefaultFilesMetadataCache; /// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache #[derive(Default)] pub struct DefaultFileStatisticsCache { - cache: DashMap<Path, CachedFileMetadata>, + state: Mutex<DefaultFileStatisticsCacheState>, +} + +impl DefaultFileStatisticsCache { + pub fn new(memory_limit: usize) -> Self { + Self { + state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)), + } + } + + /// Returns the size of the cached memory, in bytes. + pub fn memory_used(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_used + } +} + +pub struct DefaultFileStatisticsCacheState { + lru_queue: LruQueue<Path, CachedFileMetadata>, + memory_limit: usize, + memory_used: usize, +} + +pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +impl Default for DefaultFileStatisticsCacheState { + fn default() -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + memory_used: 0, + } + } } +impl DefaultFileStatisticsCacheState { + fn new(memory_limit: usize) -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit, + memory_used: 0, + } + } + fn get(&mut self, key: &Path) -> Option<CachedFileMetadata> { + self.lru_queue.get(key).cloned() + } + + fn put( + &mut self, + key: &Path, + value: CachedFileMetadata, + ) -> Option<CachedFileMetadata> { + let entry_size = value.heap_size(); + + if entry_size > self.memory_limit { + return None; + } + + let old_value = self.lru_queue.put(key.clone(), value); + self.memory_used += entry_size; + + if let Some(old_entry) = &old_value { + self.memory_used -= old_entry.heap_size(); + } + + self.evict_entries(); + + old_value + } + + fn remove(&mut self, k: &Path) -> Option<CachedFileMetadata> { + if let Some(old_entry) = self.lru_queue.remove(k) { + self.memory_used -= old_entry.heap_size(); + Some(old_entry) + } else { + None + } + } + + fn contains_key(&self, k: &Path) -> bool { + self.lru_queue.contains_key(k) + } + + fn len(&self) -> usize { + self.lru_queue.len() + } + + fn clear(&mut self) { + self.lru_queue.clear(); + self.memory_used = 0; + } + + fn evict_entries(&mut self) { + while self.memory_used > self.memory_limit { + if let Some(removed) = self.lru_queue.pop() { + self.memory_used -= removed.1.heap_size(); Review Comment: ```suggestion self.memory_used -= removed.0.heap_size(); self.memory_used -= removed.1.heap_size(); ``` ########## datafusion/execution/src/cache/cache_manager.rs: ########## @@ -92,10 +96,26 @@ impl CachedFileMetadata { /// /// See [`crate::runtime_env::RuntimeEnv`] for more details pub trait FileStatisticsCache: CacheAccessor<Path, CachedFileMetadata> { + fn cache_limit(&self) -> usize; + + /// Updates the cache with a new memory limit in bytes. + fn update_cache_limit(&self, limit: usize); + /// Retrieves the information about the entries currently cached. fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>; } +impl DFHeapSize for CachedFileMetadata { + fn heap_size(&self) -> usize { + self.meta.size.heap_size() + + self.meta.last_modified.heap_size() + + self.meta.version.heap_size() + + self.meta.e_tag.heap_size() + + self.meta.location.as_ref().heap_size() + + self.statistics.heap_size() Review Comment: The `ordering` is not calculated. Please add a comment why it is not here or a TODO to add it once `LexOrdering `/`PhysicalExpr` implements `DFHeapSize` ########## datafusion/execution/src/cache/cache_manager.rs: ########## @@ -448,14 +489,19 @@ impl Default for CacheManagerConfig { } impl CacheManagerConfig { - /// Set the cache for files statistics. + /// Set the cache for file statistics. /// /// Default is `None` (disabled). - pub fn with_files_statistics_cache( + pub fn with_file_statistics_cache( mut self, cache: Option<Arc<dyn FileStatisticsCache>>, ) -> Self { - self.table_files_statistics_cache = cache; + self.file_statistics_cache = cache; + self + } + + pub fn with_file_statistics_cache_limit(mut self, limit: usize) -> Self { Review Comment: ```suggestion /// Specifies the memory limit for the file statistics cache, in bytes. pub fn with_file_statistics_cache_limit(mut self, limit: usize) -> Self { ``` ########## datafusion/core/tests/sql/runtime_config.rs: ########## @@ -325,6 +326,51 @@ async fn test_list_files_cache_ttl() { assert_eq!(get_limit(&ctx), Duration::from_secs(90)); } +#[tokio::test] +async fn test_file_statistics_cache_limit() { + let list_files_cache = Arc::new(DefaultFileStatisticsCache::default()); Review Comment: ```suggestion let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); ``` ########## datafusion/execution/src/cache/cache_unit.rs: ########## @@ -41,32 +41,142 @@ pub use crate::cache::DefaultFilesMetadataCache; /// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache #[derive(Default)] pub struct DefaultFileStatisticsCache { - cache: DashMap<Path, CachedFileMetadata>, + state: Mutex<DefaultFileStatisticsCacheState>, +} + +impl DefaultFileStatisticsCache { + pub fn new(memory_limit: usize) -> Self { + Self { + state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)), + } + } + + /// Returns the size of the cached memory, in bytes. + pub fn memory_used(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_used + } +} + +pub struct DefaultFileStatisticsCacheState { + lru_queue: LruQueue<Path, CachedFileMetadata>, + memory_limit: usize, + memory_used: usize, +} + +pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +impl Default for DefaultFileStatisticsCacheState { + fn default() -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + memory_used: 0, + } + } } +impl DefaultFileStatisticsCacheState { + fn new(memory_limit: usize) -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit, + memory_used: 0, + } + } + fn get(&mut self, key: &Path) -> Option<CachedFileMetadata> { + self.lru_queue.get(key).cloned() + } + + fn put( + &mut self, + key: &Path, + value: CachedFileMetadata, + ) -> Option<CachedFileMetadata> { + let entry_size = value.heap_size(); + + if entry_size > self.memory_limit { + return None; + } + + let old_value = self.lru_queue.put(key.clone(), value); + self.memory_used += entry_size; + + if let Some(old_entry) = &old_value { + self.memory_used -= old_entry.heap_size(); + } + + self.evict_entries(); + + old_value + } + + fn remove(&mut self, k: &Path) -> Option<CachedFileMetadata> { + if let Some(old_entry) = self.lru_queue.remove(k) { + self.memory_used -= old_entry.heap_size(); Review Comment: ```suggestion self.memory_used -= old_entry.heap_size(); self.memory_used -= k.heap_size(); ``` ########## datafusion/common/src/heap_size.rs: ########## @@ -0,0 +1,459 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::stats::Precision; +use crate::{ColumnStatistics, ScalarValue, Statistics}; +use arrow::array::{ + Array, FixedSizeListArray, LargeListArray, ListArray, MapArray, StructArray, +}; +use arrow::datatypes::{ + DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano, IntervalUnit, + TimeUnit, UnionFields, UnionMode, i256, +}; +use chrono::{DateTime, Utc}; +use half::f16; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +/// This is a temporary solution until <https://github.com/apache/datafusion/pull/19599> and +/// <https://github.com/apache/arrow-rs/pull/9138> are resolved. +/// Trait for calculating the size of various containers +pub trait DFHeapSize { + /// Return the size of any bytes allocated on the heap by this object, + /// including heap memory in those structures + /// + /// Note that the size of the type itself is not included in the result -- + /// instead, that size is added by the caller (e.g. container). + fn heap_size(&self) -> usize; +} + +impl DFHeapSize for Statistics { + fn heap_size(&self) -> usize { + self.num_rows.heap_size() + + self.total_byte_size.heap_size() + + self + .column_statistics + .iter() + .map(|s| s.heap_size()) + .sum::<usize>() + } +} + +impl<T: Debug + Clone + PartialEq + Eq + PartialOrd + DFHeapSize> DFHeapSize + for Precision<T> +{ + fn heap_size(&self) -> usize { + self.get_value().map_or_else(|| 0, |v| v.heap_size()) + } +} + +impl DFHeapSize for ColumnStatistics { + fn heap_size(&self) -> usize { + self.null_count.heap_size() + + self.max_value.heap_size() + + self.min_value.heap_size() + + self.sum_value.heap_size() + + self.distinct_count.heap_size() + + self.byte_size.heap_size() + } +} + +impl DFHeapSize for ScalarValue { + fn heap_size(&self) -> usize { + use crate::scalar::ScalarValue::*; + match self { + Null => 0, + Boolean(b) => b.heap_size(), + Float16(f) => f.heap_size(), + Float32(f) => f.heap_size(), + Float64(f) => f.heap_size(), + Decimal32(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal64(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal128(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal256(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Int8(i) => i.heap_size(), + Int16(i) => i.heap_size(), + Int32(i) => i.heap_size(), + Int64(i) => i.heap_size(), + UInt8(u) => u.heap_size(), + UInt16(u) => u.heap_size(), + UInt32(u) => u.heap_size(), + UInt64(u) => u.heap_size(), + Utf8(u) => u.heap_size(), + Utf8View(u) => u.heap_size(), + LargeUtf8(l) => l.heap_size(), + Binary(b) => b.heap_size(), + BinaryView(b) => b.heap_size(), + FixedSizeBinary(a, b) => a.heap_size() + b.heap_size(), + LargeBinary(l) => l.heap_size(), + FixedSizeList(f) => f.heap_size(), + List(l) => l.heap_size(), + LargeList(l) => l.heap_size(), + Struct(s) => s.heap_size(), + Map(m) => m.heap_size(), + Date32(d) => d.heap_size(), + Date64(d) => d.heap_size(), + Time32Second(t) => t.heap_size(), + Time32Millisecond(t) => t.heap_size(), + Time64Microsecond(t) => t.heap_size(), + Time64Nanosecond(t) => t.heap_size(), + TimestampSecond(a, b) => a.heap_size() + b.heap_size(), + TimestampMillisecond(a, b) => a.heap_size() + b.heap_size(), + TimestampMicrosecond(a, b) => a.heap_size() + b.heap_size(), + TimestampNanosecond(a, b) => a.heap_size() + b.heap_size(), + IntervalYearMonth(i) => i.heap_size(), + IntervalDayTime(i) => i.heap_size(), + IntervalMonthDayNano(i) => i.heap_size(), + DurationSecond(d) => d.heap_size(), + DurationMillisecond(d) => d.heap_size(), + DurationMicrosecond(d) => d.heap_size(), + DurationNanosecond(d) => d.heap_size(), + Union(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Dictionary(a, b) => a.heap_size() + b.heap_size(), + RunEndEncoded(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + } + } +} + +impl DFHeapSize for DataType { + fn heap_size(&self) -> usize { + use DataType::*; + match self { + Null => 0, + Boolean => 0, + Int8 => 0, + Int16 => 0, + Int32 => 0, + Int64 => 0, + UInt8 => 0, + UInt16 => 0, + UInt32 => 0, + UInt64 => 0, + Float16 => 0, + Float32 => 0, + Float64 => 0, + Timestamp(t, s) => t.heap_size() + s.heap_size(), + Date32 => 0, + Date64 => 0, + Time32(t) => t.heap_size(), + Time64(t) => t.heap_size(), + Duration(t) => t.heap_size(), + Interval(i) => i.heap_size(), + Binary => 0, + FixedSizeBinary(i) => i.heap_size(), + LargeBinary => 0, + BinaryView => 0, + Utf8 => 0, + LargeUtf8 => 0, + Utf8View => 0, + List(v) => v.heap_size(), + ListView(v) => v.heap_size(), + FixedSizeList(f, i) => f.heap_size() + i.heap_size(), + LargeList(l) => l.heap_size(), + LargeListView(l) => l.heap_size(), + Struct(s) => s.heap_size(), + Union(u, m) => u.heap_size() + m.heap_size(), + Dictionary(a, b) => a.heap_size() + b.heap_size(), + Decimal32(u8, i8) => u8.heap_size() + i8.heap_size(), Review Comment: nit: `u8` and `i8` shadow the built-in types. Maybe use other names to avoid confusion. ```suggestion Decimal32(p, s) => p.heap_size() + s.heap_size(), ``` Same below. ########## datafusion/execution/src/cache/cache_unit.rs: ########## @@ -41,32 +41,142 @@ pub use crate::cache::DefaultFilesMetadataCache; /// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache #[derive(Default)] pub struct DefaultFileStatisticsCache { - cache: DashMap<Path, CachedFileMetadata>, + state: Mutex<DefaultFileStatisticsCacheState>, +} + +impl DefaultFileStatisticsCache { + pub fn new(memory_limit: usize) -> Self { + Self { + state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)), + } + } + + /// Returns the size of the cached memory, in bytes. + pub fn memory_used(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_used + } +} + +pub struct DefaultFileStatisticsCacheState { + lru_queue: LruQueue<Path, CachedFileMetadata>, + memory_limit: usize, + memory_used: usize, +} + +pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +impl Default for DefaultFileStatisticsCacheState { + fn default() -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + memory_used: 0, + } + } } +impl DefaultFileStatisticsCacheState { + fn new(memory_limit: usize) -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit, + memory_used: 0, + } + } + fn get(&mut self, key: &Path) -> Option<CachedFileMetadata> { + self.lru_queue.get(key).cloned() + } + + fn put( + &mut self, + key: &Path, + value: CachedFileMetadata, + ) -> Option<CachedFileMetadata> { + let entry_size = value.heap_size(); + + if entry_size > self.memory_limit { + return None; Review Comment: Also remove the stale entry. ```suggestion self.remove(key); return None; ``` ########## datafusion/sqllogictest/test_files/set_variable.slt: ########## @@ -407,6 +413,15 @@ SHOW datafusion.runtime.max_temp_directory_size ---- datafusion.runtime.max_temp_directory_size 10G +# Test SET and SHOW rruntime.file_statistics_cache_limit Review Comment: ```suggestion # Test SET and SHOW runtime.file_statistics_cache_limit ``` ########## datafusion/execution/src/cache/cache_manager.rs: ########## @@ -411,7 +449,9 @@ pub struct CacheManagerConfig { /// Enable caching of file statistics when listing files. /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session. /// Default is disabled. Currently only Parquet files are supported. Review Comment: `Default is disabled` - seems wrong. DefaultFileStatisticsCache is created at https://github.com/mkleen/datafusion/blob/92899a7d6031bcc5623ed77fd05a1c9774243aad/datafusion/execution/src/cache/cache_manager.rs#L358 and the default for `file_statistics_cache_limit` is 1MiB -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
