zhuqi-lucas commented on issue #7363: URL: https://github.com/apache/arrow-rs/issues/7363#issuecomment-2861995127
I tried it now, and it seems some regression from testing result, the 3 page caches have better performance: ```rust commit 3321f73128022e2868af91a7fe87799a2a0b06ad Author: zhuqi-lucas <821684...@qq.com> Date: Thu May 8 14:42:27 2025 +0800 Add dynamic page cache based on target batch size diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index 92e585756..4cfec5552 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -19,7 +19,6 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::{Mutex, MutexGuard}; use std::{collections::VecDeque, sync::Arc}; - use arrow_array::ArrayRef; use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; @@ -220,53 +219,95 @@ impl RecordBatchReader for FilteredParquetRecordBatchReader { struct CachedPage { dict: Option<(usize, Page)>, // page offset -> page - data: Option<(usize, Page)>, // page offset -> page + data: VecDeque<(usize, Page)>, // page offset -> page, use dynamic pages according to the batch size + total_data_rows: usize, } struct PredicatePageCacheInner { pages: HashMap<usize, CachedPage>, // col_id (Parquet's leaf column index) -> CachedPage + /// How many rows the reader is currently asking for per batch + batch_size: usize, } impl PredicatePageCacheInner { pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> Option<Page> { self.pages.get(&col_id).and_then(|pages| { - pages - .dict - .iter() - .chain(pages.data.iter()) - .find(|(page_offset, _)| *page_offset == offset) - .map(|(_, page)| page.clone()) + + if let Some((off, page)) = &pages.dict { + if *off == offset { + return Some(page.clone()); + } + } + + pages.data.iter().find(|(off, _)| *off == offset).map(|(_, page)| page.clone()) }) } - /// Insert a page into the cache. - /// Inserting a page will override the existing page, if any. - /// This is because we only need to cache 2 pages per column, see below. + /// Insert or refresh a page in the per-column cache. + /// + /// This cache maintains: + /// 1. **One dictionary page** (`PageType::DICTIONARY_PAGE`), replacing any prior dict page. + /// 2. **A dynamic set of data pages**, each tracked by its file offset and row count, + /// stored in a FIFO `VecDeque<(offset, Page)>`. + /// + /// As you process batches across multiple pages, we must keep every page + /// needed to emit one full batch. After inserting a new data page, we: + /// 1. Increase `total_data_rows` by that page’s `num_values()`. + /// 2. **Evict the oldest page** only while: + /// - Removing it still leaves **at least** `batch_size` rows in cache, **and** + /// - More than **one** data page remains. + /// + /// This policy ensures: + /// - **Complete coverage**: all pages required to serve one batch stay cached. + /// - **Memory boundedness**: total cached rows ≥ `batch_size`, never far above it. + /// - **At least one page retained**: even if a single page exceeds `batch_size`. + /// + /// # Parameters + /// - `col_id`: Parquet leaf column index, used as the cache key. + /// - `offset`: File offset for this page (unique identifier). + /// - `page`: The decoded `Page` to insert or refresh. pub(crate) fn insert_page(&mut self, col_id: usize, offset: usize, page: Page) { let is_dict = page.page_type() == PageType::DICTIONARY_PAGE; + let rows_in_page = page.num_values() as usize; - let cached_pages = self.pages.entry(col_id); - match cached_pages { - Entry::Occupied(mut entry) => { + match self.pages.entry(col_id) { + Entry::Occupied(mut occ) => { + let cp = occ.get_mut(); if is_dict { - entry.get_mut().dict = Some((offset, page)); + // refresh dictionary page + cp.dict = Some((offset, page)); } else { - entry.get_mut().data = Some((offset, page)); + // add new data page + cp.data.push_back((offset, page.clone())); + cp.total_data_rows += rows_in_page; + + // evict only while the oldest page is not needed to cover batch_size + while cp.data.len() > 1 { + // look at the front (oldest) page’s row count + let &(_, ref oldest) = cp.data.front().unwrap(); + let oldest_rows = oldest.num_values() as usize; + // if removing it still leaves enough rows for one batch, pop it + if cp.total_data_rows - oldest_rows >= self.batch_size { + cp.data.pop_front(); + cp.total_data_rows -= oldest_rows; + } else { + break; + } + } } } - Entry::Vacant(entry) => { - let cached_page = if is_dict { - CachedPage { - dict: Some((offset, page)), - data: None, - } + + Entry::Vacant(vac) => { + // first insertion for this column + let mut data = VecDeque::new(); + let dict = if is_dict { + Some((offset, page)) } else { - CachedPage { - dict: None, - data: Some((offset, page)), - } + data.push_back((offset, page.clone())); + None }; - entry.insert(cached_page); + let total_data_rows = if is_dict { 0 } else { rows_in_page }; + vac.insert(CachedPage { dict, data, total_data_rows }); } } } @@ -328,10 +369,11 @@ pub(crate) struct PredicatePageCache { } impl PredicatePageCache { - pub(crate) fn new(capacity: usize) -> Self { + pub(crate) fn new(capacity: usize, batch_size: usize) -> Self { Self { inner: Mutex::new(PredicatePageCacheInner { pages: HashMap::with_capacity(capacity), + batch_size }), } } @@ -513,7 +555,7 @@ mod tests { fn test_predicate_page_cache_basic_operations() { use super::*; - let cache = PredicatePageCache::new(2); + let cache = PredicatePageCache::new(2, 8192); let page1 = Page::dummy_page(PageType::DATA_PAGE, 100); let page2 = Page::dummy_page(PageType::DICTIONARY_PAGE, 200); @@ -538,7 +580,7 @@ mod tests { fn test_predicate_page_cache_replacement() { use super::*; - let cache = PredicatePageCache::new(2); + let cache = PredicatePageCache::new(2, 8192); let data_page1 = Page::dummy_page(PageType::DATA_PAGE, 100); let data_page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200); @@ -556,7 +598,7 @@ mod tests { fn test_predicate_page_cache_multiple_columns() { use super::*; - let cache = PredicatePageCache::new(2); + let cache = PredicatePageCache::new(2, 8192); let page1 = Page::dummy_page(PageType::DATA_PAGE, 100); let page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index a034e927f..3f2936826 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -599,6 +599,7 @@ where self.metadata.as_ref(), offset_index, projection_to_cache, + batch_size ); let mut selection = @@ -902,6 +903,7 @@ impl<'a> InMemoryRowGroup<'a> { metadata: &'a ParquetMetaData, offset_index: Option<&'a [OffsetIndexMetaData]>, projection_to_cache: Option<ProjectionMask>, + batch_size: usize, ) -> Self { let rg_metadata = metadata.row_group(row_group_idx); let to_cache_column_cnt = projection_to_cache @@ -920,7 +922,7 @@ impl<'a> InMemoryRowGroup<'a> { row_count: rg_metadata.num_rows() as usize, metadata, row_group_idx, - cache: Arc::new(PredicatePageCache::new(to_cache_column_cnt)), + cache: Arc::new(PredicatePageCache::new(to_cache_column_cnt, batch_size)), projection_to_cache, } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org