This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new c37dd5ea45 feat: Cache Parquet metadata in built in parquet reader (#16971) c37dd5ea45 is described below commit c37dd5ea45a0d572dc8cb89c6c4c88cad0d50365 Author: Nuno Faria <nunofpfa...@gmail.com> AuthorDate: Sat Aug 2 11:18:56 2025 +0100 feat: Cache Parquet metadata in built in parquet reader (#16971) * feat: Cache Parquet metadata * Convert FileMetadata and FileMetadataCache to traits * Use as_any to respect MSRV * Use ObjectMeta as the key of FileMetadataCache --- datafusion/common/src/config.rs | 6 + .../common/src/file_options/parquet_writer.rs | 3 + .../core/src/datasource/file_format/options.rs | 15 ++ datafusion/datasource-parquet/src/file_format.rs | 18 ++- datafusion/datasource-parquet/src/reader.rs | 141 +++++++++++++++++- datafusion/execution/src/cache/cache_manager.rs | 46 ++++++ datafusion/execution/src/cache/cache_unit.rs | 158 ++++++++++++++++++++- datafusion/execution/src/runtime_env.rs | 1 + .../proto-common/proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 + datafusion/proto-common/src/generated/pbjson.rs | 18 +++ datafusion/proto-common/src/generated/prost.rs | 3 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../proto/src/generated/datafusion_proto_common.rs | 3 + datafusion/proto/src/logical_plan/file_formats.rs | 2 + .../sqllogictest/test_files/information_schema.slt | 2 + datafusion/sqllogictest/test_files/parquet.slt | 119 ++++++++++++++++ docs/source/user-guide/configs.md | 1 + 18 files changed, 536 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b75a5df8be..3a9b71e1bf 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -559,6 +559,12 @@ config_namespace! { /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true + /// (reading) Whether or not to enable the caching of embedded metadata of Parquet files + /// (footer and page metadata). Enabling it can offer substantial performance improvements + /// for repeated queries over large files. By default, the cache is automatically + /// invalidated when the underlying file is modified. + pub cache_metadata: bool, default = false + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index cde0ea1299..9ea2b6af82 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -245,6 +245,7 @@ impl ParquetOptions { binary_as_string: _, // not used for writer props coerce_int96: _, // not used for writer props skip_arrow_metadata: _, + cache_metadata: _, } = self; let mut builder = WriterProperties::builder() @@ -522,6 +523,7 @@ mod tests { binary_as_string: defaults.binary_as_string, skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, + cache_metadata: defaults.cache_metadata, } } @@ -634,6 +636,7 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + cache_metadata: global_options_defaults.cache_metadata, }, column_specific_options, key_value_metadata, diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 02b792823a..459e92a7a9 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -254,6 +254,11 @@ pub struct ParquetReadOptions<'a> { pub file_sort_order: Vec<Vec<SortExpr>>, /// Properties for decryption of Parquet files that use modular encryption pub file_decryption_properties: Option<ConfigFileDecryptionProperties>, + /// Whether or not to enable the caching of embedded metadata of this Parquet file (footer and + /// page metadata). Enabling it can offer substantial performance improvements for repeated + /// queries over large files. By default, the cache is automatically invalidated when the + /// underlying file is modified. + pub cache_metadata: Option<bool>, } impl Default for ParquetReadOptions<'_> { @@ -266,6 +271,7 @@ impl Default for ParquetReadOptions<'_> { schema: None, file_sort_order: vec![], file_decryption_properties: None, + cache_metadata: None, } } } @@ -325,6 +331,12 @@ impl<'a> ParquetReadOptions<'a> { self.file_decryption_properties = Some(file_decryption_properties); self } + + /// Specify whether to enable or not metadata caching + pub fn cache_metadata(mut self, cache_metadata: bool) -> Self { + self.cache_metadata = Some(cache_metadata); + self + } } /// Options that control the reading of ARROW files. @@ -590,6 +602,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { if let Some(file_decryption_properties) = &self.file_decryption_properties { options.crypto.file_decryption = Some(file_decryption_properties.clone()); } + if let Some(cache_metadata) = self.cache_metadata { + options.global.cache_metadata = cache_metadata; + } let mut file_format = ParquetFormat::new().with_options(options); if let Some(parquet_pruning) = self.parquet_pruning { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 43b0886193..7210cc09a0 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -63,6 +63,7 @@ use datafusion_physical_plan::Accumulator; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; +use crate::reader::CachedParquetFileReaderFactory; use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; @@ -435,7 +436,7 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, - _state: &dyn Session, + state: &dyn Session, conf: FileScanConfig, ) -> Result<Arc<dyn ExecutionPlan>> { let mut metadata_size_hint = None; @@ -446,6 +447,21 @@ impl FileFormat for ParquetFormat { let mut source = ParquetSource::new(self.options.clone()); + // Use the CachedParquetFileReaderFactory when metadata caching is enabled + if self.options.global.cache_metadata { + if let Some(metadata_cache) = + state.runtime_env().cache_manager.get_file_metadata_cache() + { + let store = state + .runtime_env() + .object_store(conf.object_store_url.clone())?; + let cached_parquet_read_factory = + Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache)); + source = + source.with_parquet_file_reader_factory(cached_parquet_read_factory); + } + } + if let Some(metadata_size_hint) = metadata_size_hint { source = source.with_metadata_size_hint(metadata_size_hint) } diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 27ec843c19..6ad9428770 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -21,12 +21,15 @@ use crate::ParquetFileMetrics; use bytes::Bytes; use datafusion_datasource::file_meta::FileMeta; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::future::BoxFuture; +use futures::FutureExt; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use std::any::Any; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -150,3 +153,139 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { })) } } + +/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page +/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data. +/// This reader always loads the entire metadata (including page index, unless the file is +/// encrypted), even if not required by the current query, to ensure it is always available for +/// those that need it. +#[derive(Debug)] +pub struct CachedParquetFileReaderFactory { + store: Arc<dyn ObjectStore>, + metadata_cache: Arc<dyn FileMetadataCache>, +} + +impl CachedParquetFileReaderFactory { + pub fn new( + store: Arc<dyn ObjectStore>, + metadata_cache: Arc<dyn FileMetadataCache>, + ) -> Self { + Self { + store, + metadata_cache, + } + } +} + +impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option<usize>, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> { + let file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + let store = Arc::clone(&self.store); + + let mut inner = + ParquetObjectReader::new(store, file_meta.object_meta.location.clone()) + .with_file_size(file_meta.object_meta.size); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + Ok(Box::new(CachedParquetFileReader { + inner, + file_metrics, + file_meta, + metadata_cache: Arc::clone(&self.metadata_cache), + })) + } +} + +/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata +/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then +/// updates the cache. +pub(crate) struct CachedParquetFileReader { + pub file_metrics: ParquetFileMetrics, + pub inner: ParquetObjectReader, + file_meta: FileMeta, + metadata_cache: Arc<dyn FileMetadataCache>, +} + +impl AsyncFileReader for CachedParquetFileReader { + fn get_bytes( + &mut self, + range: Range<u64>, + ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> { + let bytes_scanned = range.end - range.start; + self.file_metrics.bytes_scanned.add(bytes_scanned as usize); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec<Range<u64>>, + ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> + where + Self: Send, + { + let total: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + self.file_metrics.bytes_scanned.add(total as usize); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> { + let file_meta = self.file_meta.clone(); + let metadata_cache = Arc::clone(&self.metadata_cache); + + async move { + let object_meta = &file_meta.object_meta; + + // lookup if the metadata is already cached + if let Some(metadata) = metadata_cache.get(object_meta) { + if let Some(parquet_metadata) = + metadata.as_any().downcast_ref::<CachedParquetMetaData>() + { + return Ok(Arc::clone(&parquet_metadata.0)); + } + } + + let mut reader = ParquetMetaDataReader::new(); + // the page index can only be loaded with unencrypted files + if let Some(file_decryption_properties) = + options.and_then(|o| o.file_decryption_properties()) + { + reader = + reader.with_decryption_properties(Some(file_decryption_properties)); + } else { + reader = reader.with_page_indexes(true); + } + reader.try_load(&mut self.inner, object_meta.size).await?; + let metadata = Arc::new(reader.finish()?); + let cached_metadata = Arc::new(CachedParquetMetaData(Arc::clone(&metadata))); + + metadata_cache.put(object_meta, cached_metadata); + Ok(metadata) + } + .boxed() + } +} + +/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. +struct CachedParquetMetaData(Arc<ParquetMetaData>); + +impl FileMetadata for CachedParquetMetaData { + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index c2403e34c6..37f1baa17f 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::cache::cache_unit::DefaultFilesMetadataCache; use crate::cache::CacheAccessor; use datafusion_common::{Result, Statistics}; use object_store::path::Path; use object_store::ObjectMeta; +use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -32,6 +34,19 @@ pub type FileStatisticsCache = pub type ListFilesCache = Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>; +/// Represents generic file-embedded metadata. +pub trait FileMetadata: Any + Send + Sync { + /// Returns the file metadata as [`Any`] so that it can be downcasted to a specific + /// implementation. + fn as_any(&self) -> &dyn Any; +} + +/// Cache to store file-embedded metadata. +pub trait FileMetadataCache: + CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta> +{ +} + impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Cache name: {} with length: {}", self.name(), self.len()) @@ -44,10 +59,17 @@ impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta> } } +impl Debug for dyn FileMetadataCache { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Cache name: {} with length: {}", self.name(), self.len()) + } +} + #[derive(Default, Debug)] pub struct CacheManager { file_statistic_cache: Option<FileStatisticsCache>, list_files_cache: Option<ListFilesCache>, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, } impl CacheManager { @@ -59,6 +81,13 @@ impl CacheManager { if let Some(lc) = &config.list_files_cache { manager.list_files_cache = Some(Arc::clone(lc)) } + if let Some(mc) = &config.file_metadata_cache { + manager.file_metadata_cache = Some(Arc::clone(mc)); + } else { + manager.file_metadata_cache = + Some(Arc::new(DefaultFilesMetadataCache::default())); + } + Ok(Arc::new(manager)) } @@ -71,6 +100,11 @@ impl CacheManager { pub fn get_list_files_cache(&self) -> Option<ListFilesCache> { self.list_files_cache.clone() } + + /// Get the file embedded metadata cache. + pub fn get_file_metadata_cache(&self) -> Option<Arc<dyn FileMetadataCache>> { + self.file_metadata_cache.clone() + } } #[derive(Clone, Default)] @@ -86,6 +120,10 @@ pub struct CacheManagerConfig { /// location. /// Default is disable. pub list_files_cache: Option<ListFilesCache>, + /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a + /// data file (e.g., Parquet footer and page metadata). + /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. + pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, } impl CacheManagerConfig { @@ -101,4 +139,12 @@ impl CacheManagerConfig { self.list_files_cache = cache; self } + + pub fn with_file_metadata_cache( + mut self, + cache: Option<Arc<dyn FileMetadataCache>>, + ) -> Self { + self.file_metadata_cache = cache; + self + } } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index a9291659a3..70d007bf5b 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -17,6 +17,7 @@ use std::sync::Arc; +use crate::cache::cache_manager::{FileMetadata, FileMetadataCache}; use crate::cache::CacheAccessor; use datafusion_common::Statistics; @@ -157,9 +158,97 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache { } } +/// Collected file embedded metadata cache. +/// The metadata for some file is invalided when the file size or last modification time have been +/// changed. +/// Users should use the `get` and `put` methods. The `get_with_extra` and `put_with_extra` methods +/// simply call `get` and `put`, respectively. +#[derive(Default)] +pub struct DefaultFilesMetadataCache { + metadata: DashMap<Path, (ObjectMeta, Arc<dyn FileMetadata>)>, +} + +impl FileMetadataCache for DefaultFilesMetadataCache {} + +impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for DefaultFilesMetadataCache { + type Extra = ObjectMeta; + + fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> { + self.metadata + .get(&k.location) + .map(|s| { + let (extra, metadata) = s.value(); + if extra.size != k.size || extra.last_modified != k.last_modified { + None + } else { + Some(Arc::clone(metadata)) + } + }) + .unwrap_or(None) + } + + fn get_with_extra( + &self, + k: &ObjectMeta, + _e: &Self::Extra, + ) -> Option<Arc<dyn FileMetadata>> { + self.get(k) + } + + fn put( + &self, + key: &ObjectMeta, + value: Arc<dyn FileMetadata>, + ) -> Option<Arc<dyn FileMetadata>> { + self.metadata + .insert(key.location.clone(), (key.clone(), value)) + .map(|x| x.1) + } + + fn put_with_extra( + &self, + key: &ObjectMeta, + value: Arc<dyn FileMetadata>, + _e: &Self::Extra, + ) -> Option<Arc<dyn FileMetadata>> { + self.put(key, value) + } + + fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> { + self.metadata.remove(&k.location).map(|x| x.1 .1) + } + + fn contains_key(&self, k: &ObjectMeta) -> bool { + self.metadata + .get(&k.location) + .map(|s| { + let (extra, _) = s.value(); + extra.size == k.size && extra.last_modified == k.last_modified + }) + .unwrap_or(false) + } + + fn len(&self) -> usize { + self.metadata.len() + } + + fn clear(&self) { + self.metadata.clear(); + } + + fn name(&self) -> String { + "DefaultFilesMetadataCache".to_string() + } +} + #[cfg(test)] mod tests { - use crate::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache}; + use std::sync::Arc; + + use crate::cache::cache_manager::FileMetadata; + use crate::cache::cache_unit::{ + DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache, + }; use crate::cache::CacheAccessor; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; @@ -232,4 +321,71 @@ mod tests { meta.clone() ); } + + pub struct TestFileMetadata { + metadata: String, + } + + impl FileMetadata for TestFileMetadata { + fn as_any(&self) -> &dyn std::any::Any { + self + } + } + + #[test] + fn test_file_metadata_cache() { + let object_meta = ObjectMeta { + location: Path::from("test"), + last_modified: DateTime::parse_from_rfc3339("2025-07-29T12:12:12+00:00") + .unwrap() + .into(), + size: 1024, + e_tag: None, + version: None, + }; + + let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata { + metadata: "retrieved_metadata".to_owned(), + }); + + let mut cache = DefaultFilesMetadataCache::default(); + assert!(cache.get(&object_meta).is_none()); + + // put + cache.put(&object_meta, metadata); + + // get and contains of a valid entry + assert!(cache.contains_key(&object_meta)); + let value = cache.get(&object_meta); + assert!(value.is_some()); + let test_file_metadata = Arc::downcast::<TestFileMetadata>(value.unwrap()); + assert!(test_file_metadata.is_ok()); + assert_eq!(test_file_metadata.unwrap().metadata, "retrieved_metadata"); + + // file size changed + let mut object_meta2 = object_meta.clone(); + object_meta2.size = 2048; + assert!(cache.get(&object_meta2).is_none()); + assert!(!cache.contains_key(&object_meta2)); + + // file last_modified changed + let mut object_meta2 = object_meta.clone(); + object_meta2.last_modified = + DateTime::parse_from_rfc3339("2025-07-29T13:13:13+00:00") + .unwrap() + .into(); + assert!(cache.get(&object_meta2).is_none()); + assert!(!cache.contains_key(&object_meta2)); + + // different file + let mut object_meta2 = object_meta.clone(); + object_meta2.location = Path::from("test2"); + assert!(cache.get(&object_meta2).is_none()); + assert!(!cache.contains_key(&object_meta2)); + + // remove + cache.remove(&object_meta); + assert!(cache.get(&object_meta).is_none()); + assert!(!cache.contains_key(&object_meta)); + } } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 70b0f0a831..bb9025391f 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -305,6 +305,7 @@ impl RuntimeEnvBuilder { .cache_manager .get_file_statistic_cache(), list_files_cache: runtime_env.cache_manager.get_list_files_cache(), + file_metadata_cache: runtime_env.cache_manager.get_file_metadata_cache(), }; Self { diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 8cb2726058..ffdc29e429 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -504,6 +504,7 @@ message ParquetOptions { bool schema_force_view_types = 28; // default = false bool binary_as_string = 29; // default = false bool skip_arrow_metadata = 30; // default = false + bool cache_metadata = 33; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 0823e15026..98df86a21f 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -988,6 +988,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v), }).unwrap_or(None), skip_arrow_metadata: value.skip_arrow_metadata, + cache_metadata: value.cache_metadata, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index f35fd15946..89e85b0dc8 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5066,6 +5066,9 @@ impl serde::Serialize for ParquetOptions { if self.skip_arrow_metadata { len += 1; } + if self.cache_metadata { + len += 1; + } if self.dictionary_page_size_limit != 0 { len += 1; } @@ -5168,6 +5171,9 @@ impl serde::Serialize for ParquetOptions { if self.skip_arrow_metadata { struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?; } + if self.cache_metadata { + struct_ser.serialize_field("cacheMetadata", &self.cache_metadata)?; + } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5314,6 +5320,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "binaryAsString", "skip_arrow_metadata", "skipArrowMetadata", + "cache_metadata", + "cacheMetadata", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5362,6 +5370,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { SchemaForceViewTypes, BinaryAsString, SkipArrowMetadata, + CacheMetadata, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5414,6 +5423,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes), "binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString), "skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata), + "cacheMetadata" | "cache_metadata" => Ok(GeneratedField::CacheMetadata), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5464,6 +5474,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut schema_force_view_types__ = None; let mut binary_as_string__ = None; let mut skip_arrow_metadata__ = None; + let mut cache_metadata__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5585,6 +5596,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } skip_arrow_metadata__ = Some(map_.next_value()?); } + GeneratedField::CacheMetadata => { + if cache_metadata__.is_some() { + return Err(serde::de::Error::duplicate_field("cacheMetadata")); + } + cache_metadata__ = Some(map_.next_value()?); + } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -5700,6 +5717,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { schema_force_view_types: schema_force_view_types__.unwrap_or_default(), binary_as_string: binary_as_string__.unwrap_or_default(), skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(), + cache_metadata: cache_metadata__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index ac4a9ea4be..6ed32d7de0 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -764,6 +764,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "30")] pub skip_arrow_metadata: bool, + /// default = false + #[prost(bool, tag = "33")] + pub cache_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index b6cbe5759c..0bd6f09bb3 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -836,6 +836,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { binary_as_string: value.binary_as_string, skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), + cache_metadata: value.cache_metadata, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index ac4a9ea4be..6ed32d7de0 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -764,6 +764,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "30")] pub skip_arrow_metadata: bool, + /// default = false + #[prost(bool, tag = "33")] + pub cache_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 620442c79e..1e0d76bc67 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -414,6 +414,7 @@ impl TableParquetOptionsProto { coerce_int96_opt: global_options.global.coerce_int96.map(|compression| { parquet_options::CoerceInt96Opt::CoerceInt96(compression) }), + cache_metadata: global_options.global.cache_metadata, }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -513,6 +514,7 @@ impl From<&ParquetOptionsProto> for ParquetOptions { coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), }), + cache_metadata: proto.cache_metadata, } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 2d778bc9d6..de68352d0d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -231,6 +231,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false +datafusion.execution.parquet.cache_metadata false datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) @@ -345,6 +346,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files +datafusion.execution.parquet.cache_metadata false (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 51e40e3e68..0beb2e8f5d 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -750,3 +750,122 @@ drop table int96_from_spark; statement ok set datafusion.execution.parquet.coerce_int96 = ns; + + +### Tests for metadata caching + +# Create temporary data +query I +COPY ( + SELECT 'k-' || i as k, i as v + FROM generate_series(1, 20000) t(i) + ORDER BY k +) +TO 'test_files/scratch/parquet/cache_metadata.parquet' +OPTIONS (MAX_ROW_GROUP_SIZE 4096, DATA_PAGE_ROW_COUNT_LIMIT 2048); +---- +20000 + +# Enable the cache +statement ok +set datafusion.execution.parquet.cache_metadata = true; + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet/cache_metadata.parquet'; + +query TI +select * from t where k = 'k-1000' or k = 'k-9999' order by k +---- +k-1000 1000 +k-9999 9999 + +query IT +select v, k from t where (v between 1 and 2) or (v between 9999 and 10000) order by v +---- +1 k-1 +2 k-2 +9999 k-9999 +10000 k-10000 + +# Updating the file should invalidate the cache. Otherwise, the following queries would fail +# (e.g., with "Arrow: Parquet argument error: External: incomplete frame"). +query I +COPY ( + SELECT 'k-' || i as k, 20000 - i as v + FROM generate_series(1, 20000) t(i) + ORDER BY k +) +TO 'test_files/scratch/parquet/cache_metadata.parquet' +OPTIONS (MAX_ROW_GROUP_SIZE 4096, DATA_PAGE_ROW_COUNT_LIMIT 2048); +---- +20000 + +query TI +select * from t where k = 'k-1000' or k = 'k-9999' order by k +---- +k-1000 19000 +k-9999 10001 + +query IT +select v, k from t where (v between 1 and 2) or (v between 9999 and 10000) order by v +---- +1 k-19999 +2 k-19998 +9999 k-10001 +10000 k-10000 + +statement ok +DROP TABLE t; + +# Partitioned files should be independently cached. Otherwise, the following queries might fail. +statement ok +COPY ( + SELECT i % 10 as part, 'k-' || i as k, i as v + FROM generate_series(0, 9) t(i) + ORDER BY k +) +TO 'test_files/scratch/parquet/cache_metadata_partitioned.parquet' +PARTITIONED BY (part); + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +PARTITIONED BY (part) +LOCATION 'test_files/scratch/parquet/cache_metadata_partitioned.parquet'; + +query TTI +select part, k, v from t where k = 'k-0' +---- +0 k-0 0 + +query TTI +select part, k, v from t where k = 'k-5' +---- +5 k-5 5 + +query TTI +select part, k, v from t where k = 'k-9' +---- +9 k-9 9 + +query TTI +select part, k, v from t order by k +---- +0 k-0 0 +1 k-1 1 +2 k-2 2 +3 k-3 3 +4 k-4 4 +5 k-5 5 +6 k-6 6 +7 k-7 7 +8 k-8 8 +9 k-9 9 + +statement ok +DROP TABLE t; + +statement ok +set datafusion.execution.parquet.cache_metadata = false; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 07330a9a73..da162b741b 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -60,6 +60,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. [...] | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. [...] | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files [...] +| datafusion.execution.parquet.cache_metadata | false | (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. [...] | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes [...] | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes [...] | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" [...] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org