zhuqi-lucas commented on issue #7363:
URL: https://github.com/apache/arrow-rs/issues/7363#issuecomment-2861995127

   I tried it now, and it seems some regression from testing result, the 3 page 
caches have better performance:
   
   ```rust
   commit 3321f73128022e2868af91a7fe87799a2a0b06ad
   Author: zhuqi-lucas <821684...@qq.com>
   Date:   Thu May 8 14:42:27 2025 +0800
   
       Add dynamic page cache based on target batch size
   
   diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs 
b/parquet/src/arrow/async_reader/arrow_reader.rs
   index 92e585756..4cfec5552 100644
   --- a/parquet/src/arrow/async_reader/arrow_reader.rs
   +++ b/parquet/src/arrow/async_reader/arrow_reader.rs
   @@ -19,7 +19,6 @@ use std::collections::hash_map::Entry;
    use std::collections::HashMap;
    use std::sync::{Mutex, MutexGuard};
    use std::{collections::VecDeque, sync::Arc};
   -
    use arrow_array::ArrayRef;
    use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader};
    use arrow_schema::{ArrowError, DataType, Schema, SchemaRef};
   @@ -220,53 +219,95 @@ impl RecordBatchReader for 
FilteredParquetRecordBatchReader {
   
    struct CachedPage {
        dict: Option<(usize, Page)>, // page offset -> page
   -    data: Option<(usize, Page)>, // page offset -> page
   +    data: VecDeque<(usize, Page)>, // page offset -> page, use dynamic 
pages according to the batch size
   +    total_data_rows:   usize,
    }
   
    struct PredicatePageCacheInner {
        pages: HashMap<usize, CachedPage>, // col_id (Parquet's leaf column 
index) -> CachedPage
   +    /// How many rows the reader is currently asking for per batch
   +    batch_size: usize,
    }
   
    impl PredicatePageCacheInner {
        pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> 
Option<Page> {
            self.pages.get(&col_id).and_then(|pages| {
   -            pages
   -                .dict
   -                .iter()
   -                .chain(pages.data.iter())
   -                .find(|(page_offset, _)| *page_offset == offset)
   -                .map(|(_, page)| page.clone())
   +
   +            if let Some((off, page)) = &pages.dict {
   +                if *off == offset {
   +                    return Some(page.clone());
   +                }
   +            }
   +
   +            pages.data.iter().find(|(off, _)| *off == offset).map(|(_, 
page)| page.clone())
            })
        }
   
   -    /// Insert a page into the cache.
   -    /// Inserting a page will override the existing page, if any.
   -    /// This is because we only need to cache 2 pages per column, see below.
   +    /// Insert or refresh a page in the per-column cache.
   +    ///
   +    /// This cache maintains:
   +    /// 1. **One dictionary page** (`PageType::DICTIONARY_PAGE`), replacing 
any prior dict page.
   +    /// 2. **A dynamic set of data pages**, each tracked by its file offset 
and row count,
   +    ///    stored in a FIFO `VecDeque<(offset, Page)>`.
   +    ///
   +    /// As you process batches across multiple pages, we must keep every 
page
   +    /// needed to emit one full batch. After inserting a new data page, we:
   +    /// 1. Increase `total_data_rows` by that page’s `num_values()`.
   +    /// 2. **Evict the oldest page** only while:
   +    ///    - Removing it still leaves **at least** `batch_size` rows in 
cache, **and**
   +    ///    - More than **one** data page remains.
   +    ///
   +    /// This policy ensures:
   +    /// - **Complete coverage**: all pages required to serve one batch stay 
cached.
   +    /// - **Memory boundedness**: total cached rows ≥ `batch_size`, never 
far above it.
   +    /// - **At least one page retained**: even if a single page exceeds 
`batch_size`.
   +    ///
   +    /// # Parameters
   +    /// - `col_id`: Parquet leaf column index, used as the cache key.
   +    /// - `offset`: File offset for this page (unique identifier).
   +    /// - `page`:   The decoded `Page` to insert or refresh.
        pub(crate) fn insert_page(&mut self, col_id: usize, offset: usize, 
page: Page) {
            let is_dict = page.page_type() == PageType::DICTIONARY_PAGE;
   +        let rows_in_page = page.num_values() as usize;
   
   -        let cached_pages = self.pages.entry(col_id);
   -        match cached_pages {
   -            Entry::Occupied(mut entry) => {
   +        match self.pages.entry(col_id) {
   +            Entry::Occupied(mut occ) => {
   +                let cp = occ.get_mut();
                    if is_dict {
   -                    entry.get_mut().dict = Some((offset, page));
   +                    // refresh dictionary page
   +                    cp.dict = Some((offset, page));
                    } else {
   -                    entry.get_mut().data = Some((offset, page));
   +                    // add new data page
   +                    cp.data.push_back((offset, page.clone()));
   +                    cp.total_data_rows += rows_in_page;
   +
   +                    // evict only while the oldest page is not needed to 
cover batch_size
   +                    while cp.data.len() > 1 {
   +                        // look at the front (oldest) page’s row count
   +                        let &(_, ref oldest) = cp.data.front().unwrap();
   +                        let oldest_rows = oldest.num_values() as usize;
   +                        // if removing it still leaves enough rows for one 
batch, pop it
   +                        if cp.total_data_rows - oldest_rows >= 
self.batch_size {
   +                            cp.data.pop_front();
   +                            cp.total_data_rows -= oldest_rows;
   +                        } else {
   +                            break;
   +                        }
   +                    }
                    }
                }
   -            Entry::Vacant(entry) => {
   -                let cached_page = if is_dict {
   -                    CachedPage {
   -                        dict: Some((offset, page)),
   -                        data: None,
   -                    }
   +
   +            Entry::Vacant(vac) => {
   +                // first insertion for this column
   +                let mut data = VecDeque::new();
   +                let dict = if is_dict {
   +                    Some((offset, page))
                    } else {
   -                    CachedPage {
   -                        dict: None,
   -                        data: Some((offset, page)),
   -                    }
   +                    data.push_back((offset, page.clone()));
   +                    None
                    };
   -                entry.insert(cached_page);
   +                let total_data_rows = if is_dict { 0 } else { rows_in_page 
};
   +                vac.insert(CachedPage { dict, data, total_data_rows });
                }
            }
        }
   @@ -328,10 +369,11 @@ pub(crate) struct PredicatePageCache {
    }
   
    impl PredicatePageCache {
   -    pub(crate) fn new(capacity: usize) -> Self {
   +    pub(crate) fn new(capacity: usize, batch_size: usize) -> Self {
            Self {
                inner: Mutex::new(PredicatePageCacheInner {
                    pages: HashMap::with_capacity(capacity),
   +                batch_size
                }),
            }
        }
   @@ -513,7 +555,7 @@ mod tests {
        fn test_predicate_page_cache_basic_operations() {
            use super::*;
   
   -        let cache = PredicatePageCache::new(2);
   +        let cache = PredicatePageCache::new(2, 8192);
            let page1 = Page::dummy_page(PageType::DATA_PAGE, 100);
            let page2 = Page::dummy_page(PageType::DICTIONARY_PAGE, 200);
   
   @@ -538,7 +580,7 @@ mod tests {
        fn test_predicate_page_cache_replacement() {
            use super::*;
   
   -        let cache = PredicatePageCache::new(2);
   +        let cache = PredicatePageCache::new(2, 8192);
            let data_page1 = Page::dummy_page(PageType::DATA_PAGE, 100);
            let data_page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200);
   
   @@ -556,7 +598,7 @@ mod tests {
        fn test_predicate_page_cache_multiple_columns() {
            use super::*;
   
   -        let cache = PredicatePageCache::new(2);
   +        let cache = PredicatePageCache::new(2, 8192);
            let page1 = Page::dummy_page(PageType::DATA_PAGE, 100);
            let page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200);
   
   diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
   index a034e927f..3f2936826 100644
   --- a/parquet/src/arrow/async_reader/mod.rs
   +++ b/parquet/src/arrow/async_reader/mod.rs
   @@ -599,6 +599,7 @@ where
                self.metadata.as_ref(),
                offset_index,
                projection_to_cache,
   +            batch_size
            );
   
            let mut selection =
   @@ -902,6 +903,7 @@ impl<'a> InMemoryRowGroup<'a> {
            metadata: &'a ParquetMetaData,
            offset_index: Option<&'a [OffsetIndexMetaData]>,
            projection_to_cache: Option<ProjectionMask>,
   +        batch_size: usize,
        ) -> Self {
            let rg_metadata = metadata.row_group(row_group_idx);
            let to_cache_column_cnt = projection_to_cache
   @@ -920,7 +922,7 @@ impl<'a> InMemoryRowGroup<'a> {
                row_count: rg_metadata.num_rows() as usize,
                metadata,
                row_group_idx,
   -            cache: Arc::new(PredicatePageCache::new(to_cache_column_cnt)),
   +            cache: Arc::new(PredicatePageCache::new(to_cache_column_cnt, 
batch_size)),
                projection_to_cache,
            }
        }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to