alamb commented on code in PR #16971:
URL: https://github.com/apache/datafusion/pull/16971#discussion_r2243780405
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -157,9 +158,79 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for
DefaultListFilesCache {
}
}
+/// Collected file embedded metadata cache.
+/// The metadata for some file is invalided when the file size or last
modification time have been
+/// changed.
+#[derive(Default)]
+pub struct DefaultFilesMetadataCache {
+ metadata: DashMap<Path, (ObjectMeta, Arc<FileMetadata>)>,
+}
+
+impl CacheAccessor<Path, Arc<FileMetadata>> for DefaultFilesMetadataCache {
+ type Extra = ObjectMeta;
+
+ fn get(&self, _k: &Path) -> Option<Arc<FileMetadata>> {
+ panic!("get in DefaultFilesMetadataCache is not supported, please use
get_with_extra")
+ }
+
+ fn get_with_extra(&self, k: &Path, e: &Self::Extra) ->
Option<Arc<FileMetadata>> {
+ self.metadata
+ .get(k)
+ .map(|s| {
+ let (extra, metadata) = s.value();
+ if extra.size != e.size || extra.last_modified !=
e.last_modified {
+ None
+ } else {
+ Some(Arc::clone(metadata))
+ }
+ })
+ .unwrap_or(None)
+ }
+
+ fn put(&self, _key: &Path, _value: Arc<FileMetadata>) ->
Option<Arc<FileMetadata>> {
+ panic!("put in DefaultFilesMetadataCache is not supported, please use
put_with_extra")
Review Comment:
🤔 a panic like that is unfortunate -- maybe we should change the API so
this function can return an error (in a follow on PR)
##########
datafusion/datasource-parquet/src/reader.rs:
##########
@@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for
DefaultParquetFileReaderFactory {
}))
}
}
+
+/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of
footer and page
+/// metadata. Reads and updates the [`FileMetadataCache`] with the
[`ParquetMetaData`] data.
+/// This reader always loads the entire metadata (including page index, unless
the file is
Review Comment:
👍
##########
datafusion/sqllogictest/test_files/parquet.slt:
##########
@@ -750,3 +750,122 @@ drop table int96_from_spark;
statement ok
set datafusion.execution.parquet.coerce_int96 = ns;
+
+
+### Tests for metadata caching
+
+# Create temporary data
+query I
+COPY (
+ SELECT 'k-' || i as k, i as v
+ FROM generate_series(1, 20000) t(i)
+ ORDER BY k
+)
+TO 'test_files/scratch/parquet/cache_metadata.parquet'
+OPTIONS (MAX_ROW_GROUP_SIZE 4096, DATA_PAGE_ROW_COUNT_LIMIT 2048);
+----
+20000
+
+# Enable the cache
+statement ok
+set datafusion.execution.parquet.cache_metadata = true;
+
+statement ok
+CREATE EXTERNAL TABLE t
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet/cache_metadata.parquet';
+
+query TI
+select * from t where k = 'k-1000' or k = 'k-9999' order by k
+----
+k-1000 1000
+k-9999 9999
+
+query IT
+select v, k from t where (v between 1 and 2) or (v between 9999 and 10000)
order by v
+----
+1 k-1
+2 k-2
+9999 k-9999
+10000 k-10000
+
+# Updating the file should invalidate the cache. Otherwise, the following
queries would fail
+# (e.g., with "Arrow: Parquet argument error: External: incomplete frame").
+query I
Review Comment:
amazing
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -232,4 +303,52 @@ mod tests {
meta.clone()
);
}
+
+ #[test]
Review Comment:
😍
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -86,6 +114,10 @@ pub struct CacheManagerConfig {
/// location.
/// Default is disable.
pub list_files_cache: Option<ListFilesCache>,
+ /// Cache of file-embedded metadata, used to avoid reading it multiple
times when processing a
+ /// data file (e.g., Parquet footer and page metadata).
+ /// If not provided, the [`CacheManager`] will create a
[`DefaultFilesMetadataCache`].
+ pub file_metadata_cache: Option<FileMetadataCache>,
Review Comment:
if it is already an option, why do we need a `DefaultFilesMetadataCache`? 🤔
Couldn't we just leave it as `None`?
##########
datafusion/common/src/config.rs:
##########
@@ -549,6 +549,12 @@ config_namespace! {
/// (reading) Use any available bloom filters when reading parquet
files
pub bloom_filter_on_read: bool, default = true
+ /// (reading) Whether or not to enable the caching of embedded
metadata of Parquet files
+ /// (footer and page metadata). Enabling it can offer substantial
performance improvements
+ /// for repeated queries over large files. By default, the cache is
automatically
+ /// invalidated when the underlying file is modified.
+ pub cache_metadata: bool, default = false
Review Comment:
Eventually, I think it would be better to have this be a size setting
`metadata_cache_size` as then that can represent both disabled (`0` size) and a
memory cap.
We can do this in a follow on PR
##########
datafusion/datasource-parquet/src/reader.rs:
##########
@@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for
DefaultParquetFileReaderFactory {
}))
}
}
+
+/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of
footer and page
+/// metadata. Reads and updates the [`FileMetadataCache`] with the
[`ParquetMetaData`] data.
+/// This reader always loads the entire metadata (including page index, unless
the file is
+/// encrypted), even if not required by the current query, to ensure it is
always available for
+/// those that need it.
+#[derive(Debug)]
+pub struct CachedParquetFileReaderFactory {
+ store: Arc<dyn ObjectStore>,
+ metadata_cache: FileMetadataCache,
+}
+
+impl CachedParquetFileReaderFactory {
+ pub fn new(store: Arc<dyn ObjectStore>, metadata_cache: FileMetadataCache)
-> Self {
+ Self {
+ store,
+ metadata_cache,
+ }
+ }
+}
+
+impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
+ fn create_reader(
+ &self,
+ partition_index: usize,
+ file_meta: FileMeta,
+ metadata_size_hint: Option<usize>,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
+ let file_metrics = ParquetFileMetrics::new(
+ partition_index,
+ file_meta.location().as_ref(),
+ metrics,
+ );
+ let store = Arc::clone(&self.store);
+
+ let mut inner =
+ ParquetObjectReader::new(store,
file_meta.object_meta.location.clone())
+ .with_file_size(file_meta.object_meta.size);
+
+ if let Some(hint) = metadata_size_hint {
+ inner = inner.with_footer_size_hint(hint)
+ };
+
+ Ok(Box::new(CachedParquetFileReader {
+ inner,
+ file_metrics,
+ file_meta,
+ metadata_cache: Arc::clone(&self.metadata_cache),
+ }))
+ }
+}
+
+/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads
the file metadata
+/// from the [`FileMetadataCache`], if available, otherwise reads it directly
from the file and then
+/// updates the cache.
+pub(crate) struct CachedParquetFileReader {
+ pub file_metrics: ParquetFileMetrics,
+ pub inner: ParquetObjectReader,
+ file_meta: FileMeta,
+ metadata_cache: FileMetadataCache,
+}
+
+impl AsyncFileReader for CachedParquetFileReader {
+ fn get_bytes(
+ &mut self,
+ range: Range<u64>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+ let bytes_scanned = range.end - range.start;
+ self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
+ self.inner.get_bytes(range)
+ }
+
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<u64>>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+ where
+ Self: Send,
+ {
+ let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
+ self.file_metrics.bytes_scanned.add(total as usize);
+ self.inner.get_byte_ranges(ranges)
+ }
+
+ fn get_metadata<'a>(
+ &'a mut self,
+ options: Option<&'a ArrowReaderOptions>,
+ ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
+ let file_meta = self.file_meta.clone();
+ let metadata_cache = Arc::clone(&self.metadata_cache);
+
+ async move {
Review Comment:
it is impressive that you worked out this API dance -- it is something I
really don't like about the current API of the parquet reader.
BTW I am working on improving it (no changes needed or suggested here, I am
just self-promoting):
- https://github.com/apache/arrow-rs/issues/7983
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -32,6 +34,13 @@ pub type FileStatisticsCache =
pub type ListFilesCache =
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
+/// Represents generic file-embedded metadata.
+pub type FileMetadata = dyn Any + Send + Sync;
Review Comment:
I realize you are just following along with the pattern that is already in
this file, which is good.
However, I recommend we make this a trait, partly to improve the
documentation and partly because I think we are going to need to add a method
like `memory_size()` to report on the memory consumed by the object
```rust
pub trait FileMetadata: Any + Send + Sync {
...
}
```
I would recommend the same for the FileMetadataCache below
##########
docs/source/user-guide/configs.md:
##########
@@ -60,6 +60,7 @@ Environment variables are read during `SessionConfig`
initialisation so they mus
| datafusion.execution.parquet.binary_as_string |
false | (reading) If true, parquet reader will read columns
of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet
files generated by some legacy writers do not correctly set the UTF8 flag for
strings, causing string columns to be loaded as BLOB instead.
|
| datafusion.execution.parquet.coerce_int96 |
NULL | (reading) If true, parquet reader will read columns
of physical type int96 as originating from a different resolution than
nanosecond. This is useful for reading data from systems like Spark which
stores microsecond resolution timestamps in an int96 allowing it to write
values with a larger date range than 64-bit timestamps with nanosecond
resolution.
|
| datafusion.execution.parquet.bloom_filter_on_read |
true | (reading) Use any available bloom filters when
reading parquet files
|
+| datafusion.execution.parquet.cache_metadata |
false | (reading) Whether or not to enable the caching of
embedded metadata of Parquet files (footer and page metadata). Enabling it can
offer substantial performance improvements for repeated queries over large
files. By default, the cache is automatically invalidated when the underlying
file is modified.
|
Review Comment:
I do think it is a problem if we wanted to turn this feature on by default,
which I do. However, i don't think we need to make any changes in this
particular PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]