This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new e9ea12b138 Implement Push Parquet Decoder (#7997)
e9ea12b138 is described below

commit e9ea12b138f8fa11c6046e94cb2b55b2830a0d7c
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Oct 29 08:52:15 2025 -0400

    Implement Push Parquet Decoder (#7997)
    
    # Which issue does this PR close?
    
    - Part of https://github.com/apache/arrow-rs/issues/8000
    
    - closes  https://github.com/apache/arrow-rs/issues/7983
    
    
    # Rationale for this change
    
    This PR is the first part of separating IO and decode operations in the
    rust parquet decoder.
    
    Decoupling IO and CPU enables several important usecases:
    1. Different IO patterns  (e.g. not buffer the entire row group at once)
    2. Different IO APIs e.g. use io_uring, or OpenDAL, etc.
    3. Deliberate prefetching within a file
    4. Avoid code duplication between the `ParquetRecordBatchStreamBuilder`
    and `ParquetRecordBatchReaderBuilder`
    
    
    # What changes are included in this PR?
    
    1. Add new `ParquetDecoderBuilder`, and `ParquetDecoder` and tests
    
    It is effectively an explicit version of the state machine that is used
    in existing async reader (where the state machine is encoded as Rust
    `async` / `await` structures)
    
    
    
    # Are these changes tested?
    Yes -- there are extensive tests for the new code
    
    Note that this PR actually adds a **3rd** path for control flow (when I
    claim this will remove duplication!) In follow on PRs I will convert the
    existing readers to use this new pattern, similarly to the sequence I
    did for the metadata decoder:
    - https://github.com/apache/arrow-rs/pull/8080
    - https://github.com/apache/arrow-rs/pull/8340
    
    Here is a preview of a PR that consolidates the async reader to use the
    push decoder internally (and removes one duplicate):
    - https://github.com/apache/arrow-rs/pull/8159
    
    - closes https://github.com/apache/arrow-rs/pull/8022
    
    # Are there any user-facing changes?
    
    Yes, a new API, but now changes to the existing APIs
    
    ---------
    
    Co-authored-by: Matthijs Brobbel <[email protected]>
    Co-authored-by: Adrian Garcia Badaracco 
<[email protected]>
---
 parquet/src/arrow/arrow_reader/mod.rs              |   16 +
 parquet/src/arrow/arrow_reader/read_plan.rs        |    5 +-
 parquet/src/arrow/arrow_reader/selection.rs        |    1 -
 parquet/src/arrow/async_reader/mod.rs              |  250 +----
 parquet/src/arrow/in_memory_row_group.rs           |  307 ++++++
 parquet/src/arrow/mod.rs                           |    6 +
 parquet/src/arrow/push_decoder/mod.rs              | 1151 ++++++++++++++++++++
 .../src/arrow/push_decoder/reader_builder/data.rs  |  233 ++++
 .../arrow/push_decoder/reader_builder/filter.rs    |  143 +++
 .../src/arrow/push_decoder/reader_builder/mod.rs   |  659 +++++++++++
 parquet/src/arrow/push_decoder/remaining.rs        |  118 ++
 parquet/src/file/metadata/mod.rs                   |    1 -
 parquet/src/lib.rs                                 |   16 +-
 parquet/src/util/push_buffers.rs                   |   25 +
 14 files changed, 2689 insertions(+), 242 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 4e91685519..1cc7673a57 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -58,6 +58,7 @@ pub mod statistics;
 ///
 /// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
 /// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
+/// * decoder API: [`ParquetDecoderBuilder::new`]
 ///
 /// # Features
 /// * Projection pushdown: [`Self::with_projection`]
@@ -93,6 +94,7 @@ pub mod statistics;
 /// Millisecond Latency] Arrow blog post.
 ///
 /// [`ParquetRecordBatchStreamBuilder::new`]: 
crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
+/// [`ParquetDecoderBuilder::new`]: 
crate::arrow::push_decoder::ParquetPushDecoderBuilder::new
 /// [Apache Arrow]: https://arrow.apache.org/
 /// [`StatisticsConverter`]: statistics::StatisticsConverter
 /// [Querying Parquet with Millisecond Latency]: 
https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
@@ -992,12 +994,26 @@ impl<T: ChunkReader + 'static> PageIterator for 
ReaderPageIterator<T> {}
 
 /// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
 /// read from a parquet data source
+///
+/// This reader is created by [`ParquetRecordBatchReaderBuilder`], and has all
+/// the buffered state (DataPages, etc) necessary to decode the parquet data 
into
+/// Arrow arrays.
 pub struct ParquetRecordBatchReader {
     array_reader: Box<dyn ArrayReader>,
     schema: SchemaRef,
     read_plan: ReadPlan,
 }
 
+impl Debug for ParquetRecordBatchReader {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ParquetRecordBatchReader")
+            .field("array_reader", &"...")
+            .field("schema", &self.schema)
+            .field("read_plan", &self.read_plan)
+            .finish()
+    }
+}
+
 impl Iterator for ParquetRecordBatchReader {
     type Item = Result<RecordBatch, ArrowError>;
 
diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs 
b/parquet/src/arrow/arrow_reader/read_plan.rs
index 754fcd339c..2210f47df2 100644
--- a/parquet/src/arrow/arrow_reader/read_plan.rs
+++ b/parquet/src/arrow/arrow_reader/read_plan.rs
@@ -28,7 +28,7 @@ use arrow_select::filter::prep_null_mask_filter;
 use std::collections::VecDeque;
 
 /// A builder for [`ReadPlan`]
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub struct ReadPlanBuilder {
     batch_size: usize,
     /// Current to apply, includes all filters
@@ -51,7 +51,6 @@ impl ReadPlanBuilder {
     }
 
     /// Returns the current selection, if any
-    #[cfg(feature = "async")]
     pub fn selection(&self) -> Option<&RowSelection> {
         self.selection.as_ref()
     }
@@ -76,7 +75,6 @@ impl ReadPlanBuilder {
     }
 
     /// Returns the number of rows selected, or `None` if all rows are 
selected.
-    #[cfg(feature = "async")]
     pub fn num_rows_selected(&self) -> Option<usize> {
         self.selection.as_ref().map(|s| s.row_count())
     }
@@ -230,6 +228,7 @@ impl LimitedReadPlanBuilder {
 /// A plan reading specific rows from a Parquet Row Group.
 ///
 /// See [`ReadPlanBuilder`] to create `ReadPlan`s
+#[derive(Debug)]
 pub struct ReadPlan {
     /// The number of rows to read in each batch
     batch_size: usize,
diff --git a/parquet/src/arrow/arrow_reader/selection.rs 
b/parquet/src/arrow/arrow_reader/selection.rs
index adbbff1ca2..1eb7c85d1d 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -447,7 +447,6 @@ impl RowSelection {
     /// Expands the selection to align with batch boundaries.
     /// This is needed when using cached array readers to ensure that
     /// the cached data covers full batches.
-    #[cfg(feature = "async")]
     pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, 
total_rows: usize) -> Self {
         if batch_size == 0 {
             return self.clone();
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 27f90e3d7b..9b81e8e569 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -29,7 +29,7 @@ use std::pin::Pin;
 use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
 
-use bytes::{Buf, Bytes};
+use bytes::Bytes;
 use futures::future::{BoxFuture, FutureExt};
 use futures::ready;
 use futures::stream::Stream;
@@ -38,10 +38,6 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, 
AsyncSeekExt};
 use arrow_array::RecordBatch;
 use arrow_schema::{DataType, Fields, Schema, SchemaRef};
 
-use crate::arrow::ProjectionMask;
-use crate::arrow::array_reader::{
-    ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
-};
 use crate::arrow::arrow_reader::{
     ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
     RowFilter, RowSelection,
@@ -51,11 +47,8 @@ use crate::basic::{BloomFilterAlgorithm, 
BloomFilterCompression, BloomFilterHash
 use crate::bloom_filter::{
     SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
 };
-use crate::column::page::{PageIterator, PageReader};
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, 
ParquetMetaDataReader};
-use crate::file::page_index::offset_index::OffsetIndexMetaData;
-use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
 
 mod metadata;
 pub use metadata::*;
@@ -63,8 +56,11 @@ pub use metadata::*;
 #[cfg(feature = "object_store")]
 mod store;
 
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder, 
RowGroupCache};
 use crate::arrow::arrow_reader::ReadPlanBuilder;
 use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup};
 use crate::arrow::schema::ParquetField;
 #[cfg(feature = "object_store")]
 pub use store::*;
@@ -571,6 +567,8 @@ struct ReaderFactory<T> {
     metrics: ArrowReaderMetrics,
 
     /// Maximum size of the predicate cache
+    ///
+    /// See [`RowGroupCache`] for details.
     max_predicate_cache_size: usize,
 }
 
@@ -967,23 +965,16 @@ where
     }
 }
 
-/// An in-memory collection of column chunks
-struct InMemoryRowGroup<'a> {
-    offset_index: Option<&'a [OffsetIndexMetaData]>,
-    /// Column chunks for this row group
-    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
-    row_count: usize,
-    row_group_idx: usize,
-    metadata: &'a ParquetMetaData,
-}
-
+// Note this implementation is not with the rest of the InMemoryRowGroup
+// implementation because it relies on several async traits and types
+// that are only available when the "async" feature is enabled.
 impl InMemoryRowGroup<'_> {
     /// Fetches any additional column data specified in `projection` that is 
not already
     /// present in `self.column_chunks`.
     ///
     /// If `selection` is provided, only the pages required for the selection
     /// are fetched. Otherwise, all pages are fetched.
-    async fn fetch<T: AsyncFileReader + Send>(
+    pub(crate) async fn fetch<T: AsyncFileReader + Send>(
         &mut self,
         input: &mut T,
         projection: &ProjectionMask,
@@ -991,221 +982,18 @@ impl InMemoryRowGroup<'_> {
         batch_size: usize,
         cache_mask: Option<&ProjectionMask>,
     ) -> Result<()> {
-        let metadata = self.metadata.row_group(self.row_group_idx);
-        if let Some((selection, offset_index)) = 
selection.zip(self.offset_index) {
-            let expanded_selection =
-                selection.expand_to_batch_boundaries(batch_size, 
self.row_count);
-            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
-            // `RowSelection`
-            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
-
-            let fetch_ranges = self
-                .column_chunks
-                .iter()
-                .zip(metadata.columns())
-                .enumerate()
-                .filter(|&(idx, (chunk, _chunk_meta))| {
-                    chunk.is_none() && projection.leaf_included(idx)
-                })
-                .flat_map(|(idx, (_chunk, chunk_meta))| {
-                    // If the first page does not start at the beginning of 
the column,
-                    // then we need to also fetch a dictionary page.
-                    let mut ranges: Vec<Range<u64>> = vec![];
-                    let (start, _len) = chunk_meta.byte_range();
-                    match offset_index[idx].page_locations.first() {
-                        Some(first) if first.offset as u64 != start => {
-                            ranges.push(start..first.offset as u64);
-                        }
-                        _ => (),
-                    }
-
-                    // Expand selection to batch boundaries only for cached 
columns
-                    let use_expanded = cache_mask.map(|m| 
m.leaf_included(idx)).unwrap_or(false);
-                    if use_expanded {
-                        ranges.extend(
-                            
expanded_selection.scan_ranges(&offset_index[idx].page_locations),
-                        );
-                    } else {
-                        
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
-                    }
-                    page_start_offsets.push(ranges.iter().map(|range| 
range.start).collect());
-
-                    ranges
-                })
-                .collect();
-
-            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
-            let mut page_start_offsets = page_start_offsets.into_iter();
-
-            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
-                if chunk.is_some() || !projection.leaf_included(idx) {
-                    continue;
-                }
-
-                if let Some(offsets) = page_start_offsets.next() {
-                    let mut chunks = Vec::with_capacity(offsets.len());
-                    for _ in 0..offsets.len() {
-                        chunks.push(chunk_data.next().unwrap());
-                    }
-
-                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
-                        length: metadata.column(idx).byte_range().1 as usize,
-                        data: offsets
-                            .into_iter()
-                            .map(|x| x as usize)
-                            .zip(chunks.into_iter())
-                            .collect(),
-                    }))
-                }
-            }
-        } else {
-            let fetch_ranges = self
-                .column_chunks
-                .iter()
-                .enumerate()
-                .filter(|&(idx, chunk)| chunk.is_none() && 
projection.leaf_included(idx))
-                .map(|(idx, _chunk)| {
-                    let column = metadata.column(idx);
-                    let (start, length) = column.byte_range();
-                    start..(start + length)
-                })
-                .collect();
-
-            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
-
-            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
-                if chunk.is_some() || !projection.leaf_included(idx) {
-                    continue;
-                }
-
-                if let Some(data) = chunk_data.next() {
-                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
-                        offset: metadata.column(idx).byte_range().0 as usize,
-                        data,
-                    }));
-                }
-            }
-        }
-
+        // Figure out what ranges to fetch
+        let FetchRanges {
+            ranges,
+            page_start_offsets,
+        } = self.fetch_ranges(projection, selection, batch_size, cache_mask);
+        // do the actual fetch
+        let chunk_data = input.get_byte_ranges(ranges).await?.into_iter();
+        // update our in memory buffers (self.column_chunks) with the fetched 
data
+        self.fill_column_chunks(projection, page_start_offsets, chunk_data);
         Ok(())
     }
 }
-
-impl RowGroups for InMemoryRowGroup<'_> {
-    fn num_rows(&self) -> usize {
-        self.row_count
-    }
-
-    /// Return chunks for column i
-    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
-        match &self.column_chunks[i] {
-            None => Err(ParquetError::General(format!(
-                "Invalid column index {i}, column was not fetched"
-            ))),
-            Some(data) => {
-                let page_locations = self
-                    .offset_index
-                    // filter out empty offset indexes (old versions specified 
Some(vec![]) when no present)
-                    .filter(|index| !index.is_empty())
-                    .map(|index| index[i].page_locations.clone());
-                let column_chunk_metadata = 
self.metadata.row_group(self.row_group_idx).column(i);
-                let page_reader = SerializedPageReader::new(
-                    data.clone(),
-                    column_chunk_metadata,
-                    self.row_count,
-                    page_locations,
-                )?;
-                let page_reader = page_reader.add_crypto_context(
-                    self.row_group_idx,
-                    i,
-                    self.metadata,
-                    column_chunk_metadata,
-                )?;
-
-                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
-
-                Ok(Box::new(ColumnChunkIterator {
-                    reader: Some(Ok(page_reader)),
-                }))
-            }
-        }
-    }
-}
-
-/// An in-memory column chunk
-#[derive(Clone)]
-enum ColumnChunkData {
-    /// Column chunk data representing only a subset of data pages
-    Sparse {
-        /// Length of the full column chunk
-        length: usize,
-        /// Subset of data pages included in this sparse chunk.
-        ///
-        /// Each element is a tuple of (page offset within file, page data).
-        /// Each entry is a complete page and the list is ordered by offset.
-        data: Vec<(usize, Bytes)>,
-    },
-    /// Full column chunk and the offset within the original file
-    Dense { offset: usize, data: Bytes },
-}
-
-impl ColumnChunkData {
-    /// Return the data for this column chunk at the given offset
-    fn get(&self, start: u64) -> Result<Bytes> {
-        match &self {
-            ColumnChunkData::Sparse { data, .. } => data
-                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
-                .map(|idx| data[idx].1.clone())
-                .map_err(|_| {
-                    ParquetError::General(format!(
-                        "Invalid offset in sparse column chunk data: {start}"
-                    ))
-                }),
-            ColumnChunkData::Dense { offset, data } => {
-                let start = start as usize - *offset;
-                Ok(data.slice(start..))
-            }
-        }
-    }
-}
-
-impl Length for ColumnChunkData {
-    /// Return the total length of the full column chunk
-    fn len(&self) -> u64 {
-        match &self {
-            ColumnChunkData::Sparse { length, .. } => *length as u64,
-            ColumnChunkData::Dense { data, .. } => data.len() as u64,
-        }
-    }
-}
-
-impl ChunkReader for ColumnChunkData {
-    type T = bytes::buf::Reader<Bytes>;
-
-    fn get_read(&self, start: u64) -> Result<Self::T> {
-        Ok(self.get(start)?.reader())
-    }
-
-    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
-        Ok(self.get(start)?.slice(..length))
-    }
-}
-
-/// Implements [`PageIterator`] for a single column chunk, yielding a single 
[`PageReader`]
-struct ColumnChunkIterator {
-    reader: Option<Result<Box<dyn PageReader>>>,
-}
-
-impl Iterator for ColumnChunkIterator {
-    type Item = Result<Box<dyn PageReader>>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.reader.take()
-    }
-}
-
-impl PageIterator for ColumnChunkIterator {}
-
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/parquet/src/arrow/in_memory_row_group.rs 
b/parquet/src/arrow/in_memory_row_group.rs
new file mode 100644
index 0000000000..34e46cd34e
--- /dev/null
+++ b/parquet/src/arrow/in_memory_row_group.rs
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::RowGroups;
+use crate::arrow::arrow_reader::RowSelection;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
+use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
+use bytes::{Buf, Bytes};
+use std::ops::Range;
+use std::sync::Arc;
+
+/// An in-memory collection of column chunks
+#[derive(Debug)]
+pub(crate) struct InMemoryRowGroup<'a> {
+    pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
+    /// Column chunks for this row group
+    pub(crate) column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
+    pub(crate) row_count: usize,
+    pub(crate) row_group_idx: usize,
+    pub(crate) metadata: &'a ParquetMetaData,
+}
+
+/// What ranges to fetch for the columns in this row group
+#[derive(Debug)]
+pub(crate) struct FetchRanges {
+    /// The byte ranges to fetch
+    pub(crate) ranges: Vec<Range<u64>>,
+    /// If `Some`, the start offsets of each page for each column chunk
+    pub(crate) page_start_offsets: Option<Vec<Vec<u64>>>,
+}
+
+impl InMemoryRowGroup<'_> {
+    /// Returns the byte ranges to fetch for the columns specified in
+    /// `projection` and `selection`.
+    ///
+    /// `cache_mask` indicates which columns, if any, are being cached by
+    /// [`RowGroupCache`](crate::arrow::array_reader::RowGroupCache).
+    /// The `selection` for Cached columns is expanded to batch boundaries to 
simplify
+    /// accounting for what data is cached.
+    pub(crate) fn fetch_ranges(
+        &self,
+        projection: &ProjectionMask,
+        selection: Option<&RowSelection>,
+        batch_size: usize,
+        cache_mask: Option<&ProjectionMask>,
+    ) -> FetchRanges {
+        let metadata = self.metadata.row_group(self.row_group_idx);
+        if let Some((selection, offset_index)) = 
selection.zip(self.offset_index) {
+            let expanded_selection =
+                selection.expand_to_batch_boundaries(batch_size, 
self.row_count);
+
+            // If we have a `RowSelection` and an `OffsetIndex` then only fetch
+            // pages required for the `RowSelection`
+            // Consider preallocating outer vec: 
https://github.com/apache/arrow-rs/issues/8667
+            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
+
+            let ranges = self
+                .column_chunks
+                .iter()
+                .zip(metadata.columns())
+                .enumerate()
+                .filter(|&(idx, (chunk, _chunk_meta))| {
+                    chunk.is_none() && projection.leaf_included(idx)
+                })
+                .flat_map(|(idx, (_chunk, chunk_meta))| {
+                    // If the first page does not start at the beginning of 
the column,
+                    // then we need to also fetch a dictionary page.
+                    let mut ranges: Vec<Range<u64>> = vec![];
+                    let (start, _len) = chunk_meta.byte_range();
+                    match offset_index[idx].page_locations.first() {
+                        Some(first) if first.offset as u64 != start => {
+                            ranges.push(start..first.offset as u64);
+                        }
+                        _ => (),
+                    }
+
+                    // Expand selection to batch boundaries if needed for 
caching
+                    // (see doc comment for this function for details on 
`cache_mask`)
+                    let use_expanded = cache_mask.map(|m| 
m.leaf_included(idx)).unwrap_or(false);
+                    if use_expanded {
+                        ranges.extend(
+                            
expanded_selection.scan_ranges(&offset_index[idx].page_locations),
+                        );
+                    } else {
+                        
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
+                    }
+                    page_start_offsets.push(ranges.iter().map(|range| 
range.start).collect());
+
+                    ranges
+                })
+                .collect();
+            FetchRanges {
+                ranges,
+                page_start_offsets: Some(page_start_offsets),
+            }
+        } else {
+            let ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .filter(|&(idx, chunk)| chunk.is_none() && 
projection.leaf_included(idx))
+                .map(|(idx, _chunk)| {
+                    let column = metadata.column(idx);
+                    let (start, length) = column.byte_range();
+                    start..(start + length)
+                })
+                .collect();
+            FetchRanges {
+                ranges,
+                page_start_offsets: None,
+            }
+        }
+    }
+
+    /// Fills in `self.column_chunks` with the data fetched from `chunk_data`.
+    ///
+    /// This function **must** be called with the data from the ranges 
returned by
+    /// `fetch_ranges` and the corresponding page_start_offsets, with the 
exact same and `selection`.
+    pub(crate) fn fill_column_chunks<I>(
+        &mut self,
+        projection: &ProjectionMask,
+        page_start_offsets: Option<Vec<Vec<u64>>>,
+        chunk_data: I,
+    ) where
+        I: IntoIterator<Item = Bytes>,
+    {
+        let mut chunk_data = chunk_data.into_iter();
+        let metadata = self.metadata.row_group(self.row_group_idx);
+        if let Some(page_start_offsets) = page_start_offsets {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut page_start_offsets = page_start_offsets.into_iter();
+
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(offsets) = page_start_offsets.next() {
+                    let mut chunks = Vec::with_capacity(offsets.len());
+                    for _ in 0..offsets.len() {
+                        chunks.push(chunk_data.next().unwrap());
+                    }
+
+                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
+                        length: metadata.column(idx).byte_range().1 as usize,
+                        data: offsets
+                            .into_iter()
+                            .map(|x| x as usize)
+                            .zip(chunks.into_iter())
+                            .collect(),
+                    }))
+                }
+            }
+        } else {
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(data) = chunk_data.next() {
+                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
+                        offset: metadata.column(idx).byte_range().0 as usize,
+                        data,
+                    }));
+                }
+            }
+        }
+    }
+}
+
+impl RowGroups for InMemoryRowGroup<'_> {
+    fn num_rows(&self) -> usize {
+        self.row_count
+    }
+
+    /// Return chunks for column i
+    fn column_chunks(&self, i: usize) -> crate::errors::Result<Box<dyn 
PageIterator>> {
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {i}, column was not fetched"
+            ))),
+            Some(data) => {
+                let page_locations = self
+                    .offset_index
+                    // filter out empty offset indexes (old versions specified 
Some(vec![]) when no present)
+                    .filter(|index| !index.is_empty())
+                    .map(|index| index[i].page_locations.clone());
+                let column_chunk_metadata = 
self.metadata.row_group(self.row_group_idx).column(i);
+                let page_reader = SerializedPageReader::new(
+                    data.clone(),
+                    column_chunk_metadata,
+                    self.row_count,
+                    page_locations,
+                )?;
+                let page_reader = page_reader.add_crypto_context(
+                    self.row_group_idx,
+                    i,
+                    self.metadata,
+                    column_chunk_metadata,
+                )?;
+
+                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
+
+                Ok(Box::new(ColumnChunkIterator {
+                    reader: Some(Ok(page_reader)),
+                }))
+            }
+        }
+    }
+}
+
+/// An in-memory column chunk.
+/// This allows us to hold either dense column chunks or sparse column chunks 
and easily
+/// access them by offset.
+#[derive(Clone, Debug)]
+pub(crate) enum ColumnChunkData {
+    /// Column chunk data representing only a subset of data pages.
+    /// For example if a row selection (possibly caused by a filter in a 
query) causes us to read only
+    /// a subset of the rows in the column.
+    Sparse {
+        /// Length of the full column chunk
+        length: usize,
+        /// Subset of data pages included in this sparse chunk.
+        ///
+        /// Each element is a tuple of (page offset within file, page data).
+        /// Each entry is a complete page and the list is ordered by offset.
+        data: Vec<(usize, Bytes)>,
+    },
+    /// Full column chunk and the offset within the original file
+    Dense { offset: usize, data: Bytes },
+}
+
+impl ColumnChunkData {
+    /// Return the data for this column chunk at the given offset
+    fn get(&self, start: u64) -> crate::errors::Result<Bytes> {
+        match &self {
+            ColumnChunkData::Sparse { data, .. } => data
+                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
+                .map(|idx| data[idx].1.clone())
+                .map_err(|_| {
+                    ParquetError::General(format!(
+                        "Invalid offset in sparse column chunk data: {start}"
+                    ))
+                }),
+            ColumnChunkData::Dense { offset, data } => {
+                let start = start as usize - *offset;
+                Ok(data.slice(start..))
+            }
+        }
+    }
+}
+
+impl Length for ColumnChunkData {
+    /// Return the total length of the full column chunk
+    fn len(&self) -> u64 {
+        match &self {
+            ColumnChunkData::Sparse { length, .. } => *length as u64,
+            ColumnChunkData::Dense { data, .. } => data.len() as u64,
+        }
+    }
+}
+
+impl ChunkReader for ColumnChunkData {
+    type T = bytes::buf::Reader<Bytes>;
+
+    fn get_read(&self, start: u64) -> crate::errors::Result<Self::T> {
+        Ok(self.get(start)?.reader())
+    }
+
+    fn get_bytes(&self, start: u64, length: usize) -> 
crate::errors::Result<Bytes> {
+        Ok(self.get(start)?.slice(..length))
+    }
+}
+
+/// Implements [`PageIterator`] for a single column chunk, yielding a single 
[`PageReader`]
+struct ColumnChunkIterator {
+    reader: Option<crate::errors::Result<Box<dyn PageReader>>>,
+}
+
+impl Iterator for ColumnChunkIterator {
+    type Item = crate::errors::Result<Box<dyn PageReader>>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.reader.take()
+    }
+}
+
+impl PageIterator for ColumnChunkIterator {}
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index 94e3590651..34aac8b08a 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -190,9 +190,15 @@ pub mod async_reader;
 #[cfg(feature = "async")]
 pub mod async_writer;
 
+pub mod push_decoder;
+
+mod in_memory_row_group;
 mod record_reader;
+
 experimental!(mod schema);
 
+use std::fmt::Debug;
+
 pub use self::arrow_writer::ArrowWriter;
 #[cfg(feature = "async")]
 pub use self::async_reader::ParquetRecordBatchStreamBuilder;
diff --git a/parquet/src/arrow/push_decoder/mod.rs 
b/parquet/src/arrow/push_decoder/mod.rs
new file mode 100644
index 0000000000..4b932fb080
--- /dev/null
+++ b/parquet/src/arrow/push_decoder/mod.rs
@@ -0,0 +1,1151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
+//! caller (rather than from an underlying reader).
+
+mod reader_builder;
+mod remaining;
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{
+    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
+};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use reader_builder::RowGroupReaderBuilder;
+use remaining::RemainingRowGroups;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// A builder for [`ParquetPushDecoder`].
+///
+/// To create a new decoder, use 
[`ParquetPushDecoderBuilder::try_new_decoder`] and pass
+/// the file length and metadata of the Parquet file to decode.
+///
+/// You can decode the metadata from a Parquet file using either
+/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
+///
+/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
+/// [`ParquetMetaDataPushDecoder`]: 
crate::file::metadata::ParquetMetaDataPushDecoder
+///
+/// Note the "input" type is `u64` which represents the length of the Parquet 
file
+/// being decoded. This is needed to initialize the internal buffers that track
+/// what data has been provided to the decoder.
+///
+/// # Example
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # let file_bytes = {
+/// #   let mut buffer = vec![];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!("failed to decode 
metadata") };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// // The file length and metadata are required to create the decoder
+/// let mut decoder =
+///     ParquetPushDecoderBuilder::try_new_decoder(file_length, 
parquet_metadata)
+///       .unwrap()
+///       // Optionally configure the decoder, e.g. batch size
+///       .with_batch_size(1024)
+///       // Build the decoder
+///       .build()
+///       .unwrap();
+///
+///     // In a loop, ask the decoder what it needs next, and provide it with 
the required data
+///     loop {
+///         match decoder.try_decode().unwrap() {
+///             DecodeResult::NeedsData(ranges) => {
+///                 // The decoder needs more data. Fetch the data for the 
given ranges
+///                 let data = ranges.iter().map(|r| 
get_range(r)).collect::<Vec<_>>();
+///                 // Push the data to the decoder
+///                 decoder.push_ranges(ranges, data).unwrap();
+///                 // After pushing the data, we can try to decode again on 
the next iteration
+///             }
+///             DecodeResult::Data(batch) => {
+///                 // Successfully decoded a batch of data
+///                 assert!(batch.num_rows() > 0);
+///             }
+///             DecodeResult::Finished => {
+///                 // The decoder has finished decoding exit the loop
+///                 break;
+///             }
+///         }
+///     }
+/// ```
+pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<u64>;
+
+/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] 
for
+/// more options that can be configured.
+impl ParquetPushDecoderBuilder {
+    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder 
for the given file.
+    ///
+    /// See [`ParquetMetadataDecoder`] for a builder that can read the 
metadata from a Parquet file.
+    ///
+    /// [`ParquetMetadataDecoder`]: 
crate::file::metadata::ParquetMetaDataPushDecoder
+    ///
+    /// See example on [`ParquetPushDecoderBuilder`]
+    pub fn try_new_decoder(
+        file_len: u64,
+        parquet_metadata: Arc<ParquetMetaData>,
+    ) -> Result<Self, ParquetError> {
+        Self::try_new_decoder_with_options(
+            file_len,
+            parquet_metadata,
+            ArrowReaderOptions::default(),
+        )
+    }
+
+    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder 
for the given file
+    /// with the given reader options.
+    ///
+    /// This is similar to [`Self::try_new_decoder`] but allows configuring
+    /// options such as Arrow schema
+    pub fn try_new_decoder_with_options(
+        file_len: u64,
+        parquet_metadata: Arc<ParquetMetaData>,
+        arrow_reader_options: ArrowReaderOptions,
+    ) -> Result<Self, ParquetError> {
+        let arrow_reader_metadata =
+            ArrowReaderMetadata::try_new(parquet_metadata, 
arrow_reader_options)?;
+        Ok(Self::new_with_metadata(file_len, arrow_reader_metadata))
+    }
+
+    /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
+    ///
+    /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata 
from
+    /// the Parquet metadata and reader options.
+    pub fn new_with_metadata(file_len: u64, arrow_reader_metadata: 
ArrowReaderMetadata) -> Self {
+        Self::new_builder(file_len, arrow_reader_metadata)
+    }
+
+    /// Create a [`ParquetPushDecoder`] with the configured options
+    pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
+        let Self {
+            input: file_len,
+            metadata: parquet_metadata,
+            schema: _,
+            fields,
+            batch_size,
+            row_groups,
+            projection,
+            filter,
+            selection,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+        } = self;
+
+        // If no row groups were specified, read all of them
+        let row_groups =
+            row_groups.unwrap_or_else(|| 
(0..parquet_metadata.num_row_groups()).collect());
+
+        // Prepare to build RowGroup readers
+        let buffers = PushBuffers::new(file_len);
+        let row_group_reader_builder = RowGroupReaderBuilder::new(
+            batch_size,
+            projection,
+            Arc::clone(&parquet_metadata),
+            fields,
+            filter,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+            buffers,
+        );
+
+        // Initialize the decoder with the configured options
+        let remaining_row_groups = RemainingRowGroups::new(
+            parquet_metadata,
+            row_groups,
+            selection,
+            row_group_reader_builder,
+        );
+
+        Ok(ParquetPushDecoder {
+            state: ParquetDecoderState::ReadingRowGroup {
+                remaining_row_groups: Box::new(remaining_row_groups),
+            },
+        })
+    }
+}
+
+/// A push based Parquet Decoder
+///
+/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use 
the decoder.
+///
+/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data 
without an
+/// underlying reader for performing IO, and thus offers fine grained control
+/// over how data is fetched and decoded.
+///
+/// When more data is needed to make progress, instead of reading data directly
+/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
+/// are needed. Once the caller provides the requested ranges via
+/// [`Self::push_ranges`], they try to decode again by calling
+/// [`Self::try_decode`].
+///
+/// The decoder's internal state tracks what has been already decoded and what
+/// is needed next.
+#[derive(Debug)]
+pub struct ParquetPushDecoder {
+    /// The inner state.
+    ///
+    /// This state is consumed on every transition and a new state is produced
+    /// so the Rust compiler can ensure that the state is always valid and
+    /// transitions are not missed.
+    state: ParquetDecoderState,
+}
+
+impl ParquetPushDecoder {
+    /// Attempt to decode the next batch of data, or return what data is needed
+    ///
+    /// The the decoder communicates the next state with a [`DecodeResult`]
+    ///
+    /// See full example in [`ParquetPushDecoderBuilder`]
+    ///
+    /// ```no_run
+    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
+    /// use parquet::DecodeResult;
+    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
+    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: 
Vec<std::ops::Range<u64>>) { unimplemented!() }
+    /// let mut decoder = get_decoder();
+    /// loop {
+    ///    match decoder.try_decode().unwrap() {
+    ///       DecodeResult::NeedsData(ranges) => {
+    ///         // The decoder needs more data. Fetch the data for the given 
ranges
+    ///         // call decoder.push_ranges(ranges, data) and call again
+    ///         push_data(&mut decoder, ranges);
+    ///       }
+    ///       DecodeResult::Data(batch) => {
+    ///         // Successfully decoded the next batch of data
+    ///         println!("Got batch with {} rows", batch.num_rows());
+    ///       }
+    ///       DecodeResult::Finished => {
+    ///         // The decoder has finished decoding all data
+    ///         break;
+    ///       }
+    ///    }
+    /// }
+    ///```
+    pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, 
ParquetError> {
+        let current_state = std::mem::replace(&mut self.state, 
ParquetDecoderState::Finished);
+        let (new_state, decode_result) = current_state.try_transition()?;
+        self.state = new_state;
+        Ok(decode_result)
+    }
+
+    /// Push data into the decoder for processing
+    ///
+    /// This is a convenience wrapper around [`Self::push_ranges`] for pushing 
a
+    /// single range of data.
+    ///
+    /// Note this can be the entire file or just a part of it. If it is part 
of the file,
+    /// the ranges should correspond to the data ranges requested by the 
decoder.
+    ///
+    /// See example in [`ParquetPushDecoderBuilder`]
+    pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), 
ParquetError> {
+        self.push_ranges(vec![range], vec![data])
+    }
+
+    /// Push data into the decoder for processing
+    ///
+    /// This should correspond to the data ranges requested by the decoder
+    pub fn push_ranges(
+        &mut self,
+        ranges: Vec<Range<u64>>,
+        data: Vec<Bytes>,
+    ) -> Result<(), ParquetError> {
+        let current_state = std::mem::replace(&mut self.state, 
ParquetDecoderState::Finished);
+        self.state = current_state.push_data(ranges, data)?;
+        Ok(())
+    }
+
+    /// Returns the total number of buffered bytes in the decoder
+    ///
+    /// This is the sum of the size of all [`Bytes`] that has been pushed to 
the
+    /// decoder but not yet consumed.
+    ///
+    /// Note that this does not include any overhead of the internal data
+    /// structures and that since [`Bytes`] are ref counted memory, this may 
not
+    /// reflect additional memory usage.
+    ///
+    /// This can be used to monitor memory usage of the decoder.
+    pub fn buffered_bytes(&self) -> u64 {
+        self.state.buffered_bytes()
+    }
+}
+
+/// Internal state machine for the [`ParquetPushDecoder`]
+#[derive(Debug)]
+enum ParquetDecoderState {
+    /// Waiting for data needed to decode the next RowGroup
+    ReadingRowGroup {
+        remaining_row_groups: Box<RemainingRowGroups>,
+    },
+    /// The decoder is actively decoding a RowGroup
+    DecodingRowGroup {
+        /// Current active reader
+        record_batch_reader: Box<ParquetRecordBatchReader>,
+        remaining_row_groups: Box<RemainingRowGroups>,
+    },
+    /// The decoder has finished processing all data
+    Finished,
+}
+
+impl ParquetDecoderState {
+    /// Current state --> next state + output
+    ///
+    /// This function is called to check if the decoder has any RecordBatches
+    /// and [`Self::push_data`] is called when new data is available.
+    ///
+    /// # Notes
+    ///
+    /// This structure is used to reduce the indentation level of the main loop
+    /// in try_build
+    fn try_transition(self) -> Result<(Self, DecodeResult<RecordBatch>), 
ParquetError> {
+        match self {
+            Self::ReadingRowGroup {
+                mut remaining_row_groups,
+            } => {
+                match remaining_row_groups.try_next_reader()? {
+                    // If we have a next reader, we can transition to decoding 
it
+                    DecodeResult::Data(record_batch_reader) => {
+                        // Transition to decoding the row group
+                        Self::DecodingRowGroup {
+                            record_batch_reader: Box::new(record_batch_reader),
+                            remaining_row_groups,
+                        }
+                        .try_transition()
+                    }
+                    // If there are no more readers, we are finished
+                    DecodeResult::NeedsData(ranges) => {
+                        // If we need more data, we return the ranges needed 
and stay in Reading
+                        // RowGroup state
+                        Ok((
+                            Self::ReadingRowGroup {
+                                remaining_row_groups,
+                            },
+                            DecodeResult::NeedsData(ranges),
+                        ))
+                    }
+                    DecodeResult::Finished => {
+                        // No more row groups to read, we are finished
+                        Ok((Self::Finished, DecodeResult::Finished))
+                    }
+                }
+            }
+            Self::DecodingRowGroup {
+                mut record_batch_reader,
+                remaining_row_groups,
+            } => {
+                // Decide the next record batch
+                match record_batch_reader.next() {
+                    Some(Ok(batch)) => {
+                        // Successfully decoded a batch, return it
+                        Ok((
+                            Self::DecodingRowGroup {
+                                record_batch_reader,
+                                remaining_row_groups,
+                            },
+                            DecodeResult::Data(batch),
+                        ))
+                    }
+                    None => {
+                        // No more batches in this row group, move to the next 
row group
+                        // or finish if there are no more row groups
+                        Self::ReadingRowGroup {
+                            remaining_row_groups,
+                        }
+                        .try_transition()
+                    }
+                    Some(Err(e)) => Err(ParquetError::from(e)), // some error 
occurred while decoding
+                }
+            }
+            Self::Finished => Ok((Self::Finished, DecodeResult::Finished)),
+        }
+    }
+
+    /// Push data, and transition state if needed
+    ///
+    /// This should correspond to the data ranges requested by the decoder
+    pub fn push_data(
+        self,
+        ranges: Vec<Range<u64>>,
+        data: Vec<Bytes>,
+    ) -> Result<Self, ParquetError> {
+        match self {
+            ParquetDecoderState::ReadingRowGroup {
+                mut remaining_row_groups,
+            } => {
+                // Push data to the RowGroupReaderBuilder
+                remaining_row_groups.push_data(ranges, data);
+                Ok(ParquetDecoderState::ReadingRowGroup {
+                    remaining_row_groups,
+                })
+            }
+            // it is ok to get data before we asked for it
+            ParquetDecoderState::DecodingRowGroup {
+                record_batch_reader,
+                mut remaining_row_groups,
+            } => {
+                remaining_row_groups.push_data(ranges, data);
+                Ok(ParquetDecoderState::DecodingRowGroup {
+                    record_batch_reader,
+                    remaining_row_groups,
+                })
+            }
+            ParquetDecoderState::Finished => Err(ParquetError::General(
+                "Cannot push data to a finished decoder".to_string(),
+            )),
+        }
+    }
+
+    /// How many bytes are currently buffered in the decoder?
+    fn buffered_bytes(&self) -> u64 {
+        match self {
+            ParquetDecoderState::ReadingRowGroup {
+                remaining_row_groups,
+            } => remaining_row_groups.buffered_bytes(),
+            ParquetDecoderState::DecodingRowGroup {
+                record_batch_reader: _,
+                remaining_row_groups,
+            } => remaining_row_groups.buffered_bytes(),
+            ParquetDecoderState::Finished => 0,
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::DecodeResult;
+    use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, 
RowSelection, RowSelector};
+    use crate::arrow::push_decoder::{ParquetPushDecoder, 
ParquetPushDecoderBuilder};
+    use crate::arrow::{ArrowWriter, ProjectionMask};
+    use crate::errors::ParquetError;
+    use crate::file::metadata::ParquetMetaDataPushDecoder;
+    use crate::file::properties::WriterProperties;
+    use arrow::compute::kernels::cmp::{gt, lt};
+    use arrow_array::cast::AsArray;
+    use arrow_array::types::Int64Type;
+    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
+    use arrow_select::concat::concat_batches;
+    use bytes::Bytes;
+    use std::fmt::Debug;
+    use std::ops::Range;
+    use std::sync::{Arc, LazyLock};
+
+    /// Test decoder struct size (as they are copied around on each 
transition, they
+    /// should not grow too large)
+    #[test]
+    fn test_decoder_size() {
+        assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
+    }
+
+    /// Decode the entire file at once, simulating a scenario where all data is
+    /// available in memory
+    #[test]
+    fn test_decoder_all_data() {
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap()
+        .build()
+        .unwrap();
+
+        decoder
+            .push_range(test_file_range(), TEST_FILE_DATA.clone())
+            .unwrap();
+
+        let results = vec![
+            // first row group should be decoded without needing more data
+            expect_data(decoder.try_decode()),
+            // second row group should be decoded without needing more data
+            expect_data(decoder.try_decode()),
+        ];
+        expect_finished(decoder.try_decode());
+
+        let all_output = concat_batches(&TEST_BATCH.schema(), 
&results).unwrap();
+        // Check that the output matches the input batch
+        assert_eq!(all_output, *TEST_BATCH);
+    }
+
+    /// Decode the entire file incrementally, simulating a scenario where data 
is
+    /// fetched as needed
+    #[test]
+    fn test_decoder_incremental() {
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap()
+        .build()
+        .unwrap();
+
+        let mut results = vec![];
+
+        // First row group, expect a single request
+        let ranges = expect_needs_data(decoder.try_decode());
+        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - 
r.start).sum();
+        push_ranges_to_decoder(&mut decoder, ranges);
+        // The decoder should currently only store the data it needs to decode 
the first row group
+        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
+        results.push(expect_data(decoder.try_decode()));
+        // the decoder should have consumed the data for the first row group 
and freed it
+        assert_eq!(decoder.buffered_bytes(), 0);
+
+        // Second row group,
+        let ranges = expect_needs_data(decoder.try_decode());
+        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - 
r.start).sum();
+        push_ranges_to_decoder(&mut decoder, ranges);
+        // The decoder should currently only store the data it needs to decode 
the second row group
+        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
+        results.push(expect_data(decoder.try_decode()));
+        // the decoder should have consumed the data for the second row group 
and freed it
+        assert_eq!(decoder.buffered_bytes(), 0);
+        expect_finished(decoder.try_decode());
+
+        // Check that the output matches the input batch
+        let all_output = concat_batches(&TEST_BATCH.schema(), 
&results).unwrap();
+        assert_eq!(all_output, *TEST_BATCH);
+    }
+
+    /// Decode the entire file incrementally, simulating partial reads
+    #[test]
+    fn test_decoder_partial() {
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap()
+        .build()
+        .unwrap();
+
+        // First row group, expect a single request for all data needed to 
read "a" and "b"
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch1 = expect_data(decoder.try_decode());
+        let expected1 = TEST_BATCH.slice(0, 200);
+        assert_eq!(batch1, expected1);
+
+        // Second row group, this time provide the data in two steps
+        let ranges = expect_needs_data(decoder.try_decode());
+        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
+        assert!(!ranges1.is_empty());
+        assert!(!ranges2.is_empty());
+        // push first half to simulate partial read
+        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
+
+        // still expect more data
+        let ranges = expect_needs_data(decoder.try_decode());
+        assert_eq!(ranges, ranges2); // should be the remaining ranges
+        // push empty ranges should be a no-op
+        push_ranges_to_decoder(&mut decoder, vec![]);
+        let ranges = expect_needs_data(decoder.try_decode());
+        assert_eq!(ranges, ranges2); // should be the remaining ranges
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch2 = expect_data(decoder.try_decode());
+        let expected2 = TEST_BATCH.slice(200, 200);
+        assert_eq!(batch2, expected2);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    /// Decode multiple columns "a" and "b", expect that the decoder requests
+    /// only a single request per row group
+    #[test]
+    fn test_decoder_selection_does_one_request() {
+        let builder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap();
+
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        let mut decoder = builder
+            .with_projection(
+                ProjectionMask::columns(&schema_descr, ["a", "b"]), // read 
"a", "b"
+            )
+            .build()
+            .unwrap();
+
+        // First row group, expect a single request for all data needed to 
read "a" and "b"
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch1 = expect_data(decoder.try_decode());
+        let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
+        assert_eq!(batch1, expected1);
+
+        // Second row group, similarly expect a single request for all data 
needed to read "a" and "b"
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch2 = expect_data(decoder.try_decode());
+        let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
+        assert_eq!(batch2, expected2);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    /// Decode with a filter that requires multiple requests, but only provide 
part
+    /// of the data needed for the filter at a time simulating partial reads.
+    #[test]
+    fn test_decoder_single_filter_partial() {
+        let builder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap();
+
+        // Values in column "a" range 0..399
+        // First filter: "a" > 250  (nothing in Row Group 0, both data pages 
in Row Group 1)
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        // a > 250
+        let row_filter_a = ArrowPredicateFn::new(
+            // claim to use both a and b so we get two ranges requests for the 
filter pages
+            ProjectionMask::columns(&schema_descr, ["a", "b"]),
+            |batch: RecordBatch| {
+                let scalar_250 = Int64Array::new_scalar(250);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                gt(column, &scalar_250)
+            },
+        );
+
+        let mut decoder = builder
+            .with_projection(
+                // read only column "a" to test that filter pages are reused
+                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
+            )
+            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+            .build()
+            .unwrap();
+
+        // First row group, evaluating filters
+        let ranges = expect_needs_data(decoder.try_decode());
+        // only provide half the ranges
+        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
+        assert!(!ranges1.is_empty());
+        assert!(!ranges2.is_empty());
+        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
+        // still expect more data
+        let ranges = expect_needs_data(decoder.try_decode());
+        assert_eq!(ranges, ranges2); // should be the remaining ranges
+        let ranges = expect_needs_data(decoder.try_decode());
+        assert_eq!(ranges, ranges2); // should be the remaining ranges
+        push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
+
+        // Since no rows in the first row group pass the filters, there is no
+        // additional requests to read data pages for "b" here
+
+        // Second row group
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch = expect_data(decoder.try_decode());
+        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
+        assert_eq!(batch, expected);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    /// Decode with a filter where we also skip one of the RowGroups via a 
RowSelection
+    #[test]
+    fn test_decoder_single_filter_and_row_selection() {
+        let builder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap();
+
+        // Values in column "a" range 0..399
+        // First filter: "a" > 250  (nothing in Row Group 0, last data page in 
Row Group 1)
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        // a > 250
+        let row_filter_a = ArrowPredicateFn::new(
+            ProjectionMask::columns(&schema_descr, ["a"]),
+            |batch: RecordBatch| {
+                let scalar_250 = Int64Array::new_scalar(250);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                gt(column, &scalar_250)
+            },
+        );
+
+        let mut decoder = builder
+            .with_projection(
+                // read only column "a" to test that filter pages are reused
+                ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
+            )
+            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+            .with_row_selection(RowSelection::from(vec![
+                RowSelector::skip(200),   // skip first row group
+                RowSelector::select(100), // first 100 rows of second row group
+                RowSelector::skip(100),
+            ]))
+            .build()
+            .unwrap();
+
+        // expect the first row group to be filtered out (no filter is 
evaluated due to row selection)
+
+        // First row group, first filter (a > 250)
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // Second row group
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch = expect_data(decoder.try_decode());
+        let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
+        assert_eq!(batch, expected);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    /// Decode with multiple filters that require multiple requests
+    #[test]
+    fn test_decoder_multi_filters() {
+        // Create a decoder for decoding parquet data (note it does not have 
any IO / readers)
+        let builder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap();
+
+        // Values in column "a" range 0..399
+        // Values in column "b" range 400..799
+        // First filter: "a" > 175  (last data page in Row Group 0)
+        // Second filter: "b" < 625 (last data page in Row Group 0 and first 
DataPage in RowGroup 1)
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        // a > 175
+        let row_filter_a = ArrowPredicateFn::new(
+            ProjectionMask::columns(&schema_descr, ["a"]),
+            |batch: RecordBatch| {
+                let scalar_175 = Int64Array::new_scalar(175);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                gt(column, &scalar_175)
+            },
+        );
+
+        // b < 625
+        let row_filter_b = ArrowPredicateFn::new(
+            ProjectionMask::columns(&schema_descr, ["b"]),
+            |batch: RecordBatch| {
+                let scalar_625 = Int64Array::new_scalar(625);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                lt(column, &scalar_625)
+            },
+        );
+
+        let mut decoder = builder
+            .with_projection(
+                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
+            )
+            .with_row_filter(RowFilter::new(vec![
+                Box::new(row_filter_a),
+                Box::new(row_filter_b),
+            ]))
+            .build()
+            .unwrap();
+
+        // First row group, first filter (a > 175)
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // first row group, second filter (b < 625)
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // first row group, data pages for "c"
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // expect the first batch to be decoded: rows 176..199, column "c"
+        let batch1 = expect_data(decoder.try_decode());
+        let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
+        assert_eq!(batch1, expected1);
+
+        // Second row group, first filter (a > 175)
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // Second row group, second filter (b < 625)
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // Second row group, data pages for "c"
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // expect the second batch to be decoded: rows 200..224, column "c"
+        let batch2 = expect_data(decoder.try_decode());
+        let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
+        assert_eq!(batch2, expected2);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    /// Decode with a filter that uses a column that is also projected, and 
expect
+    /// that the filter pages are reused (don't refetch them)
+    #[test]
+    fn test_decoder_reuses_filter_pages() {
+        // Create a decoder for decoding parquet data (note it does not have 
any IO / readers)
+        let builder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap();
+
+        // Values in column "a" range 0..399
+        // First filter: "a" > 250  (nothing in Row Group 0, last data page in 
Row Group 1)
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        // a > 250
+        let row_filter_a = ArrowPredicateFn::new(
+            ProjectionMask::columns(&schema_descr, ["a"]),
+            |batch: RecordBatch| {
+                let scalar_250 = Int64Array::new_scalar(250);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                gt(column, &scalar_250)
+            },
+        );
+
+        let mut decoder = builder
+            .with_projection(
+                // read only column "a" to test that filter pages are reused
+                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
+            )
+            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+            .build()
+            .unwrap();
+
+        // First row group, first filter (a > 175)
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // expect the first row group to be filtered out (no rows match)
+
+        // Second row group, first filter (a > 250)
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // expect that the second row group is decoded: rows 251..399, column 
"a"
+        // Note that the filter pages for "a" should be reused and no 
additional data
+        // should be requested
+        let batch = expect_data(decoder.try_decode());
+        let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
+        assert_eq!(batch, expected);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    #[test]
+    fn test_decoder_empty_filters() {
+        let builder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap();
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        // only read column "c", but with empty filters
+        let mut decoder = builder
+            .with_projection(
+                ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
+            )
+            .with_row_filter(RowFilter::new(vec![
+                // empty filters should be ignored
+            ]))
+            .build()
+            .unwrap();
+
+        // First row group
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // expect the first batch to be decoded: rows 0..199, column "c"
+        let batch1 = expect_data(decoder.try_decode());
+        let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
+        assert_eq!(batch1, expected1);
+
+        // Second row group,
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // expect the second batch to be decoded: rows 200..399, column "c"
+        let batch2 = expect_data(decoder.try_decode());
+        let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
+
+        assert_eq!(batch2, expected2);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    #[test]
+    fn test_decoder_offset_limit() {
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap()
+        // skip entire first row group (200 rows) and first 25 rows of second 
row group
+        .with_offset(225)
+        // and limit to 20 rows
+        .with_limit(20)
+        .build()
+        .unwrap();
+
+        // First row group should be skipped,
+
+        // Second row group
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // expect the first and only batch to be decoded
+        let batch1 = expect_data(decoder.try_decode());
+        let expected1 = TEST_BATCH.slice(225, 20);
+        assert_eq!(batch1, expected1);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    #[test]
+    fn test_decoder_row_group_selection() {
+        // take only the second row group
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap()
+        .with_row_groups(vec![1])
+        .build()
+        .unwrap();
+
+        // First row group should be skipped,
+
+        // Second row group
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // expect the first and only batch to be decoded
+        let batch1 = expect_data(decoder.try_decode());
+        let expected1 = TEST_BATCH.slice(200, 200);
+        assert_eq!(batch1, expected1);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    #[test]
+    fn test_decoder_row_selection() {
+        // take only the second row group
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap()
+        .with_row_selection(RowSelection::from(vec![
+            RowSelector::skip(225),  // skip first row group and 25 rows of 
second])
+            RowSelector::select(20), // take 20 rows
+        ]))
+        .build()
+        .unwrap();
+
+        // First row group should be skipped,
+
+        // Second row group
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        // expect the first ane only batch to be decoded
+        let batch1 = expect_data(decoder.try_decode());
+        let expected1 = TEST_BATCH.slice(225, 20);
+        assert_eq!(batch1, expected1);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
+    ///
+    /// Note c is a different types (so the data page sizes will be different)
+    static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
+        let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
+        let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
+        let c: ArrayRef = 
Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
+            if i % 2 == 0 {
+                format!("string_{i}")
+            } else {
+                format!("A string larger than 12 bytes and thus not inlined 
{i}")
+            }
+        })));
+
+        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
+    });
+
+    /// Create a parquet file in memory for testing.
+    ///
+    /// See [`TEST_BATCH`] for the data in the file.
+    ///
+    /// Each column is written in 4 data pages, each with 100 rows, across 2
+    /// row groups. Each column in each row group has two data pages.
+    ///
+    /// The data is split across row groups like this
+    ///
+    /// Column |   Values                | Data Page | Row Group
+    /// -------|------------------------|-----------|-----------
+    /// a      | 0..99                  | 1         | 0
+    /// a      | 100..199               | 2         | 0
+    /// a      | 200..299               | 1         | 1
+    /// a      | 300..399               | 2         | 1
+    ///
+    /// b      | 400..499               | 1         | 0
+    /// b      | 500..599               | 2         | 0
+    /// b      | 600..699               | 1         | 1
+    /// b      | 700..799               | 2         | 1
+    ///
+    /// c      | "string_0".."string_99"        | 1         | 0
+    /// c      | "string_100".."string_199"     | 2         | 0
+    /// c      | "string_200".."string_299"     | 1         | 1
+    /// c      | "string_300".."string_399"     | 2         | 1
+    static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
+        let input_batch = &TEST_BATCH;
+        let mut output = Vec::new();
+
+        let writer_options = WriterProperties::builder()
+            .set_max_row_group_size(200)
+            .set_data_page_row_count_limit(100)
+            .build();
+        let mut writer =
+            ArrowWriter::try_new(&mut output, input_batch.schema(), 
Some(writer_options)).unwrap();
+
+        // since the limits are only enforced on batch boundaries, write the 
input
+        // batch in chunks of 50
+        let mut row_remain = input_batch.num_rows();
+        while row_remain > 0 {
+            let chunk_size = row_remain.min(50);
+            let chunk = input_batch.slice(input_batch.num_rows() - row_remain, 
chunk_size);
+            writer.write(&chunk).unwrap();
+            row_remain -= chunk_size;
+        }
+        writer.close().unwrap();
+        Bytes::from(output)
+    });
+
+    /// Return the length of [`TEST_FILE_DATA`], in bytes
+    fn test_file_len() -> u64 {
+        TEST_FILE_DATA.len() as u64
+    }
+
+    /// Return a range that covers the entire [`TEST_FILE_DATA`]
+    fn test_file_range() -> Range<u64> {
+        0..test_file_len()
+    }
+
+    /// Return a slice of the test file data from the given range
+    pub fn test_file_slice(range: Range<u64>) -> Bytes {
+        let start: usize = range.start.try_into().unwrap();
+        let end: usize = range.end.try_into().unwrap();
+        TEST_FILE_DATA.slice(start..end)
+    }
+
+    /// return the metadata for the test file
+    pub fn test_file_parquet_metadata() -> 
Arc<crate::file::metadata::ParquetMetaData> {
+        let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
+        push_ranges_to_metadata_decoder(&mut metadata_decoder, 
vec![test_file_range()]);
+        let metadata = metadata_decoder.try_decode().unwrap();
+        let DecodeResult::Data(metadata) = metadata else {
+            panic!("Expected metadata to be decoded successfully");
+        };
+        Arc::new(metadata)
+    }
+
+    /// Push the given ranges to the metadata decoder, simulating reading from 
a file
+    fn push_ranges_to_metadata_decoder(
+        metadata_decoder: &mut ParquetMetaDataPushDecoder,
+        ranges: Vec<Range<u64>>,
+    ) {
+        let data = ranges
+            .iter()
+            .map(|range| test_file_slice(range.clone()))
+            .collect::<Vec<_>>();
+        metadata_decoder.push_ranges(ranges, data).unwrap();
+    }
+
+    fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: 
Vec<Range<u64>>) {
+        let data = ranges
+            .iter()
+            .map(|range| test_file_slice(range.clone()))
+            .collect::<Vec<_>>();
+        decoder.push_ranges(ranges, data).unwrap();
+    }
+
+    /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and 
return the corresponding element
+    fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> 
T {
+        match result.expect("Expected Ok(DecodeResult::Data(T))") {
+            DecodeResult::Data(data) => data,
+            result => panic!("Expected DecodeResult::Data, got {result:?}"),
+        }
+    }
+
+    /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and 
return the corresponding ranges
+    fn expect_needs_data<T: Debug>(
+        result: Result<DecodeResult<T>, ParquetError>,
+    ) -> Vec<Range<u64>> {
+        match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
+            DecodeResult::NeedsData(ranges) => ranges,
+            result => panic!("Expected DecodeResult::NeedsData, got 
{result:?}"),
+        }
+    }
+
+    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, 
ParquetError>) {
+        match result.expect("Expected Ok(DecodeResult::Finished)") {
+            DecodeResult::Finished => {}
+            result => panic!("Expected DecodeResult::Finished, got 
{result:?}"),
+        }
+    }
+}
diff --git a/parquet/src/arrow/push_decoder/reader_builder/data.rs 
b/parquet/src/arrow/push_decoder/reader_builder/data.rs
new file mode 100644
index 0000000000..6fbc2090b0
--- /dev/null
+++ b/parquet/src/arrow/push_decoder/reader_builder/data.rs
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`DataRequest`] tracks and holds data needed to construct InMemoryRowGroups
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::arrow_reader::RowSelection;
+use crate::arrow::in_memory_row_group::{ColumnChunkData, FetchRanges, 
InMemoryRowGroup};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
+use crate::file::reader::ChunkReader;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// Contains in-progress state to construct InMemoryRowGroups
+///
+/// See [`DataRequestBuilder`] for creating new requests
+#[derive(Debug)]
+pub(super) struct DataRequest {
+    /// Any previously read column chunk data
+    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
+    /// The ranges of data that are needed next
+    ranges: Vec<Range<u64>>,
+    /// Optional page start offsets for each requested range. This is used
+    /// to create the relevant InMemoryRowGroup
+    page_start_offsets: Option<Vec<Vec<u64>>>,
+}
+
+impl DataRequest {
+    /// return what ranges are still needed to satisfy this request. Returns 
an empty vec
+    /// if all ranges are satisfied
+    pub fn needed_ranges(&self, buffers: &PushBuffers) -> Vec<Range<u64>> {
+        self.ranges
+            .iter()
+            .filter(|&range| !buffers.has_range(range))
+            .cloned()
+            .collect()
+    }
+
+    /// Returns the chunks from the buffers that satisfy this request
+    fn get_chunks(&self, buffers: &PushBuffers) -> Result<Vec<Bytes>, 
ParquetError> {
+        self.ranges
+            .iter()
+            .map(|range| {
+                let length: usize = (range.end - range.start)
+                    .try_into()
+                    .expect("overflow for offset");
+                // should have all the data due to the check above
+                buffers.get_bytes(range.start, length).map_err(|e| {
+                    ParquetError::General(format!(
+                        "Internal Error missing data for range {range:?} in 
buffers: {e}",
+                    ))
+                })
+            })
+            .collect()
+    }
+
+    /// Create a new InMemoryRowGroup, and fill it with provided data
+    ///
+    /// Assumes that all needed data is present in the buffers
+    /// and clears any explicitly requested ranges
+    pub fn try_into_in_memory_row_group<'a>(
+        self,
+        row_group_idx: usize,
+        row_count: usize,
+        parquet_metadata: &'a ParquetMetaData,
+        projection: &ProjectionMask,
+        buffers: &mut PushBuffers,
+    ) -> Result<InMemoryRowGroup<'a>, ParquetError> {
+        let chunks = self.get_chunks(buffers)?;
+
+        let Self {
+            column_chunks,
+            ranges,
+            page_start_offsets,
+        } = self;
+
+        // Create an InMemoryRowGroup to hold the column chunks, this is a
+        // temporary structure used to tell the ArrowReaders what pages are
+        // needed for decoding
+        let mut in_memory_row_group = InMemoryRowGroup {
+            row_count,
+            column_chunks,
+            offset_index: get_offset_index(parquet_metadata, row_group_idx),
+            row_group_idx,
+            metadata: parquet_metadata,
+        };
+
+        in_memory_row_group.fill_column_chunks(projection, page_start_offsets, 
chunks);
+
+        // Clear the ranges that were explicitly requested
+        buffers.clear_ranges(&ranges);
+
+        Ok(in_memory_row_group)
+    }
+}
+
+/// Builder for [`DataRequest`]
+pub(super) struct DataRequestBuilder<'a> {
+    /// The row group index
+    row_group_idx: usize,
+    /// The number of rows in the row group
+    row_count: usize,
+    /// The batch size to read
+    batch_size: usize,
+    /// The parquet metadata
+    parquet_metadata: &'a ParquetMetaData,
+    /// The projection mask (which columns to read)
+    projection: &'a ProjectionMask,
+    /// Optional row selection to apply
+    selection: Option<&'a RowSelection>,
+    /// Optional projection mask if using
+    /// [`RowGroupCache`](crate::arrow::array_reader::RowGroupCache)
+    /// for caching decoded columns.
+    cache_projection: Option<&'a ProjectionMask>,
+    /// Any previously read column chunks
+    column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+}
+
+impl<'a> DataRequestBuilder<'a> {
+    pub(super) fn new(
+        row_group_idx: usize,
+        row_count: usize,
+        batch_size: usize,
+        parquet_metadata: &'a ParquetMetaData,
+        projection: &'a ProjectionMask,
+    ) -> Self {
+        Self {
+            row_group_idx,
+            row_count,
+            batch_size,
+            parquet_metadata,
+            projection,
+            selection: None,
+            cache_projection: None,
+            column_chunks: None,
+        }
+    }
+
+    /// Set an optional row selection to apply
+    pub(super) fn with_selection(mut self, selection: Option<&'a 
RowSelection>) -> Self {
+        self.selection = selection;
+        self
+    }
+
+    /// set columns to cache, if any
+    pub(super) fn with_cache_projection(
+        mut self,
+        cache_projection: Option<&'a ProjectionMask>,
+    ) -> Self {
+        self.cache_projection = cache_projection;
+        self
+    }
+
+    /// Provide any previously read column chunks
+    pub(super) fn with_column_chunks(
+        mut self,
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+    ) -> Self {
+        self.column_chunks = column_chunks;
+        self
+    }
+
+    pub(crate) fn build(self) -> DataRequest {
+        let Self {
+            row_group_idx,
+            row_count,
+            batch_size,
+            parquet_metadata,
+            projection,
+            selection,
+            cache_projection,
+            column_chunks,
+        } = self;
+
+        let row_group_meta_data = parquet_metadata.row_group(row_group_idx);
+
+        // If no previously read column chunks are provided, create a new 
location to hold them
+        let column_chunks =
+            column_chunks.unwrap_or_else(|| vec![None; 
row_group_meta_data.columns().len()]);
+
+        // Create an InMemoryRowGroup to hold the column chunks, this is a
+        // temporary structure used to tell the ArrowReaders what pages are
+        // needed for decoding
+        let row_group = InMemoryRowGroup {
+            row_count,
+            column_chunks,
+            offset_index: get_offset_index(parquet_metadata, row_group_idx),
+            row_group_idx,
+            metadata: parquet_metadata,
+        };
+
+        let FetchRanges {
+            ranges,
+            page_start_offsets,
+        } = row_group.fetch_ranges(projection, selection, batch_size, 
cache_projection);
+
+        DataRequest {
+            // Save any previously read column chunks
+            column_chunks: row_group.column_chunks,
+            ranges,
+            page_start_offsets,
+        }
+    }
+}
+
+fn get_offset_index(
+    parquet_metadata: &ParquetMetaData,
+    row_group_idx: usize,
+) -> Option<&[OffsetIndexMetaData]> {
+    parquet_metadata
+        .offset_index()
+        // filter out empty offset indexes (old versions specified 
Some(vec![]) when no present)
+        .filter(|index| !index.is_empty())
+        .map(|x| x[row_group_idx].as_slice())
+}
diff --git a/parquet/src/arrow/push_decoder/reader_builder/filter.rs 
b/parquet/src/arrow/push_decoder/reader_builder/filter.rs
new file mode 100644
index 0000000000..4a3c38e959
--- /dev/null
+++ b/parquet/src/arrow/push_decoder/reader_builder/filter.rs
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates.
+///
+/// The `FilterInfo` owns the [`RowFilter`] being evaluated and tracks the 
current
+/// predicate to evaluate.
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// RowFilter is owned by `FilterInfo` because they may be mutated as part
+    /// of evaluation. Specifically, [`ArrowPredicate`] requires &mut self for
+    /// evaluation.
+    filter: RowFilter,
+    /// The next filter to be evaluated
+    next_predicate: NonZeroUsize,
+    /// Previously computed filter results
+    cache_info: CacheInfo,
+}
+
+/// Predicate cache
+///
+/// Note this is basically the same as CacheOptionsBuilder
+/// but it owns the ProjectionMask and RowGroupCache
+#[derive(Debug)]
+pub(super) struct CacheInfo {
+    /// The columns to cache in the predicate cache.
+    /// Normally these are the columns that filters may look at such that
+    /// if we have a filter like `(a + 10 > 5) AND (a + b = 0)` we cache `a` 
to avoid re-reading it between evaluating `a + 10 > 5` and `a + b = 0`.
+    cache_projection: ProjectionMask,
+    row_group_cache: Arc<Mutex<RowGroupCache>>,
+}
+
+impl CacheInfo {
+    pub(super) fn new(
+        cache_projection: ProjectionMask,
+        row_group_cache: Arc<Mutex<RowGroupCache>>,
+    ) -> Self {
+        Self {
+            cache_projection,
+            row_group_cache,
+        }
+    }
+
+    pub(super) fn builder(&self) -> CacheOptionsBuilder<'_> {
+        CacheOptionsBuilder::new(&self.cache_projection, &self.row_group_cache)
+    }
+}
+
+pub(super) enum AdvanceResult {
+    /// Advanced to the next predicate
+    Continue(FilterInfo),
+    /// No more predicates returns the row filter and cache info
+    Done(RowFilter, CacheInfo),
+}
+
+impl FilterInfo {
+    /// Create a new FilterInfo
+    pub(super) fn new(filter: RowFilter, cache_info: CacheInfo) -> Self {
+        Self {
+            filter,
+            next_predicate: NonZeroUsize::new(1).expect("1 is always 
non-zero"),
+            cache_info,
+        }
+    }
+
+    /// Advance to the next predicate
+    ///
+    /// Returns
+    /// * [`AdvanceResult::Continue`] returning the `FilterInfo` if there are
+    ///   more predicate to evaluate.
+    /// * [`AdvanceResult::Done`] with the inner [`RowFilter`] and 
[`CacheInfo]`
+    ///   if there are no more predicates
+    pub(super) fn advance(mut self) -> AdvanceResult {
+        if self.next_predicate.get() >= self.filter.predicates.len() {
+            AdvanceResult::Done(self.filter, self.cache_info)
+        } else {
+            self.next_predicate = self
+                .next_predicate
+                .checked_add(1)
+                .expect("no usize overflow");
+            AdvanceResult::Continue(self)
+        }
+    }
+
+    /// Return a mutable reference to the current predicate
+    pub(super) fn current_mut(&mut self) -> &mut dyn ArrowPredicate {
+        self.filter
+            .predicates
+            .get_mut(self.next_predicate.get() - 1)
+            // advance ensures next_predicate is always in bounds
+            .unwrap()
+            .as_mut()
+    }
+
+    /// Return the current predicate to evaluate
+    pub(super) fn current(&self) -> &dyn ArrowPredicate {
+        self.filter
+            .predicates
+            .get(self.next_predicate.get() - 1)
+            // advance ensures next_predicate is always in bounds
+            .unwrap()
+            .as_ref()
+    }
+
+    /// Return a reference to the cache projection
+    pub(super) fn cache_projection(&self) -> &ProjectionMask {
+        &self.cache_info.cache_projection
+    }
+
+    /// Return a cache builder to save the results of predicate evaluation
+    pub(super) fn cache_builder(&self) -> CacheOptionsBuilder<'_> {
+        self.cache_info.builder()
+    }
+
+    /// Returns the inner filter, consuming this FilterInfo
+    pub(super) fn into_filter(self) -> RowFilter {
+        self.filter
+    }
+}
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs 
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
new file mode 100644
index 0000000000..be9070ae8b
--- /dev/null
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+    row_group_idx: usize,
+    row_count: usize,
+    plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+    Start {
+        row_group_info: RowGroupInfo,
+    },
+    /// Planning filters, but haven't yet requested data to evaluate them
+    Filters {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from prior filters
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        filter_info: FilterInfo,
+    },
+    /// Needs data to evaluate current filter
+    WaitingOnFilterData {
+        row_group_info: RowGroupInfo,
+        filter_info: FilterInfo,
+        data_request: DataRequest,
+    },
+    /// Know what data to actually read, after all predicates
+    StartData {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from the filtering phase
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Needs data to proceed with reading the output
+    WaitingOnData {
+        row_group_info: RowGroupInfo,
+        data_request: DataRequest,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Finished (or not yet started) reading this group
+    Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+    next_state: RowGroupDecoderState,
+    /// result to return, if any
+    ///
+    /// * `Some`: the processing should stop and return the result
+    /// * `None`: processing should continue
+    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+    /// The next state with no result.
+    ///
+    /// This indicates processing should continue
+    fn again(next_state: RowGroupDecoderState) -> Self {
+        Self {
+            next_state,
+            result: None,
+        }
+    }
+
+    /// Create a NextState with a result that should be returned
+    fn result(
+        next_state: RowGroupDecoderState,
+        result: DecodeResult<ParquetRecordBatchReader>,
+    ) -> Self {
+        Self {
+            next_state,
+            result: Some(result),
+        }
+    }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+    /// The output batch size
+    batch_size: usize,
+
+    /// What columns to project (produce in each output batch)
+    projection: ProjectionMask,
+
+    /// The Parquet file metadata
+    metadata: Arc<ParquetMetaData>,
+
+    /// Top level parquet schema and arrow schema mapping
+    fields: Option<Arc<ParquetField>>,
+
+    /// Optional filter
+    filter: Option<RowFilter>,
+
+    /// Limit to apply to remaining row groups (decremented as rows are read)
+    limit: Option<usize>,
+
+    /// Offset to apply to remaining row groups (decremented as rows are read)
+    offset: Option<usize>,
+
+    /// The size in bytes of the predicate cache to use
+    ///
+    /// See [`RowGroupCache`] for details.
+    max_predicate_cache_size: usize,
+
+    /// The metrics collector
+    metrics: ArrowReaderMetrics,
+
+    /// Current state of the decoder.
+    ///
+    /// It is taken when processing, and must be put back before returning
+    /// it is a bug error if it is not put back after transitioning states.
+    state: Option<RowGroupDecoderState>,
+
+    /// The underlying data store
+    buffers: PushBuffers,
+}
+
+impl RowGroupReaderBuilder {
+    /// Create a new RowGroupReaderBuilder
+    #[expect(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        batch_size: usize,
+        projection: ProjectionMask,
+        metadata: Arc<ParquetMetaData>,
+        fields: Option<Arc<ParquetField>>,
+        filter: Option<RowFilter>,
+        limit: Option<usize>,
+        offset: Option<usize>,
+        metrics: ArrowReaderMetrics,
+        max_predicate_cache_size: usize,
+        buffers: PushBuffers,
+    ) -> Self {
+        Self {
+            batch_size,
+            projection,
+            metadata,
+            fields,
+            filter,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+            state: Some(RowGroupDecoderState::Finished),
+            buffers,
+        }
+    }
+
+    /// Push new data buffers that can be used to satisfy pending requests
+    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+        self.buffers.push_ranges(ranges, buffers);
+    }
+
+    /// Returns the total number of buffered bytes available
+    pub fn buffered_bytes(&self) -> u64 {
+        self.buffers.buffered_bytes()
+    }
+
+    /// take the current state, leaving None in its place.
+    ///
+    /// Returns an error if there the state wasn't put back after the previous
+    /// call to [`Self::take_state`].
+    ///
+    /// Any code that calls this method must ensure that the state is put back
+    /// before returning, otherwise the reader will error next time it is 
called
+    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
+        self.state.take().ok_or_else(|| {
+            ParquetError::General(String::from(
+                "Internal Error: RowGroupReader in invalid state",
+            ))
+        })
+    }
+
+    /// Setup this reader to read the next row group
+    pub(crate) fn next_row_group(
+        &mut self,
+        row_group_idx: usize,
+        row_count: usize,
+        selection: Option<RowSelection>,
+    ) -> Result<(), ParquetError> {
+        let state = self.take_state()?;
+        if !matches!(state, RowGroupDecoderState::Finished) {
+            return Err(ParquetError::General(format!(
+                "Internal Error: next_row_group called while still reading a 
row group. Expected Finished state, got {state:?}"
+            )));
+        }
+        let plan_builder = 
ReadPlanBuilder::new(self.batch_size).with_selection(selection);
+
+        let row_group_info = RowGroupInfo {
+            row_group_idx,
+            row_count,
+            plan_builder,
+        };
+
+        self.state = Some(RowGroupDecoderState::Start { row_group_info });
+        Ok(())
+    }
+
+    /// Try to build the next `ParquetRecordBatchReader` from this 
RowGroupReader.
+    ///
+    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
+    /// ranges of data that are needed to proceed.
+    ///
+    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
+    /// `DecodeResult::Data`.
+    pub(crate) fn try_build(
+        &mut self,
+    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+        loop {
+            let current_state = self.take_state()?;
+            // Try to transition the decoder.
+            match self.try_transition(current_state)? {
+                // Either produced a batch reader, needed input, or finished
+                NextState {
+                    next_state,
+                    result: Some(result),
+                } => {
+                    // put back the next state
+                    self.state = Some(next_state);
+                    return Ok(result);
+                }
+                // completed one internal state, maybe can proceed further
+                NextState {
+                    next_state,
+                    result: None,
+                } => {
+                    // continue processing
+                    self.state = Some(next_state);
+                }
+            }
+        }
+    }
+
+    /// Current state --> next state + optional output
+    ///
+    /// This is the main state transition function for the row group reader
+    /// and encodes the row group decoding state machine.
+    ///
+    /// # Notes
+    ///
+    /// This structure is used to reduce the indentation level of the main loop
+    /// in try_build
+    fn try_transition(
+        &mut self,
+        current_state: RowGroupDecoderState,
+    ) -> Result<NextState, ParquetError> {
+        let result = match current_state {
+            RowGroupDecoderState::Start { row_group_info } => {
+                let column_chunks = None; // no prior column chunks
+
+                let Some(filter) = self.filter.take() else {
+                    // no filter, start trying to read data immediately
+                    return Ok(NextState::again(RowGroupDecoderState::StartData 
{
+                        row_group_info,
+                        column_chunks,
+                        cache_info: None,
+                    }));
+                };
+                // no predicates in filter, so start reading immediately
+                if filter.predicates.is_empty() {
+                    return Ok(NextState::again(RowGroupDecoderState::StartData 
{
+                        row_group_info,
+                        column_chunks,
+                        cache_info: None,
+                    }));
+                };
+
+                // we have predicates to evaluate
+                let cache_projection =
+                    
self.compute_cache_projection(row_group_info.row_group_idx, &filter);
+
+                let cache_info = CacheInfo::new(
+                    cache_projection,
+                    Arc::new(Mutex::new(RowGroupCache::new(
+                        self.batch_size,
+                        self.max_predicate_cache_size,
+                    ))),
+                );
+
+                let filter_info = FilterInfo::new(filter, cache_info);
+                NextState::again(RowGroupDecoderState::Filters {
+                    row_group_info,
+                    filter_info,
+                    column_chunks,
+                })
+            }
+            // need to evaluate filters
+            RowGroupDecoderState::Filters {
+                row_group_info,
+                column_chunks,
+                filter_info,
+            } => {
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                } = row_group_info;
+
+                // If nothing is selected, we are done with this row group
+                if !plan_builder.selects_any() {
+                    // ruled out entire row group
+                    self.filter = Some(filter_info.into_filter());
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::Finished,
+                        DecodeResult::Finished,
+                    ));
+                }
+
+                // Make a request for the data needed to evaluate the current 
predicate
+                let predicate = filter_info.current();
+
+                // need to fetch pages the column needs for decoding, figure
+                // that out based on the current selection and projection
+                let data_request = DataRequestBuilder::new(
+                    row_group_idx,
+                    row_count,
+                    self.batch_size,
+                    &self.metadata,
+                    predicate.projection(), // use the predicate's projection
+                )
+                .with_selection(plan_builder.selection())
+                // Fetch predicate columns; expand selection only for cached 
predicate columns
+                .with_cache_projection(Some(filter_info.cache_projection()))
+                .with_column_chunks(column_chunks)
+                .build();
+
+                let row_group_info = RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                };
+
+                NextState::again(RowGroupDecoderState::WaitingOnFilterData {
+                    row_group_info,
+                    filter_info,
+                    data_request,
+                })
+            }
+            RowGroupDecoderState::WaitingOnFilterData {
+                row_group_info,
+                data_request,
+                mut filter_info,
+            } => {
+                // figure out what ranges we still need
+                let needed_ranges = data_request.needed_ranges(&self.buffers);
+                if !needed_ranges.is_empty() {
+                    // still need data
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::WaitingOnFilterData {
+                            row_group_info,
+                            filter_info,
+                            data_request,
+                        },
+                        DecodeResult::NeedsData(needed_ranges),
+                    ));
+                }
+
+                // otherwise we have all the data we need to evaluate the 
predicate
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    mut plan_builder,
+                } = row_group_info;
+
+                let predicate = filter_info.current();
+
+                let row_group = data_request.try_into_in_memory_row_group(
+                    row_group_idx,
+                    row_count,
+                    &self.metadata,
+                    predicate.projection(),
+                    &mut self.buffers,
+                )?;
+
+                let cache_options = filter_info.cache_builder().producer();
+
+                let array_reader = ArrayReaderBuilder::new(&row_group, 
&self.metrics)
+                    .with_cache_options(Some(&cache_options))
+                    .build_array_reader(self.fields.as_deref(), 
predicate.projection())?;
+
+                plan_builder =
+                    plan_builder.with_predicate(array_reader, 
filter_info.current_mut())?;
+
+                let row_group_info = RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                };
+
+                // Take back the column chunks that were read
+                let column_chunks = Some(row_group.column_chunks);
+
+                // advance to the next predicate, if any
+                match filter_info.advance() {
+                    AdvanceResult::Continue(filter_info) => {
+                        NextState::again(RowGroupDecoderState::Filters {
+                            row_group_info,
+                            column_chunks,
+                            filter_info,
+                        })
+                    }
+                    // done with predicates, proceed to reading data
+                    AdvanceResult::Done(filter, cache_info) => {
+                        // remember we need to put back the filter
+                        assert!(self.filter.is_none());
+                        self.filter = Some(filter);
+                        NextState::again(RowGroupDecoderState::StartData {
+                            row_group_info,
+                            column_chunks,
+                            cache_info: Some(cache_info),
+                        })
+                    }
+                }
+            }
+            RowGroupDecoderState::StartData {
+                row_group_info,
+                column_chunks,
+                cache_info,
+            } => {
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                } = row_group_info;
+
+                // Compute the number of rows in the selection before applying 
limit and offset
+                let rows_before = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+
+                if rows_before == 0 {
+                    // ruled out entire row group
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::Finished,
+                        DecodeResult::Finished,
+                    ));
+                }
+
+                // Apply any limit and offset
+                let plan_builder = plan_builder
+                    .limited(row_count)
+                    .with_offset(self.offset)
+                    .with_limit(self.limit)
+                    .build_limited();
+
+                let rows_after = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+
+                // Update running offset and limit for after the current row 
group is read
+                if let Some(offset) = &mut self.offset {
+                    // Reduction is either because of offset or limit, as 
limit is applied
+                    // after offset has been "exhausted" can just use 
saturating sub here
+                    *offset = offset.saturating_sub(rows_before - rows_after)
+                }
+
+                if rows_after == 0 {
+                    // no rows left after applying limit/offset
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::Finished,
+                        DecodeResult::Finished,
+                    ));
+                }
+
+                if let Some(limit) = &mut self.limit {
+                    *limit -= rows_after;
+                }
+
+                let data_request = DataRequestBuilder::new(
+                    row_group_idx,
+                    row_count,
+                    self.batch_size,
+                    &self.metadata,
+                    &self.projection,
+                )
+                .with_selection(plan_builder.selection())
+                .with_column_chunks(column_chunks)
+                // Final projection fetch shouldn't expand selection for cache
+                // so don't call with_cache_projection here
+                .build();
+
+                let row_group_info = RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                };
+
+                NextState::again(RowGroupDecoderState::WaitingOnData {
+                    row_group_info,
+                    data_request,
+                    cache_info,
+                })
+            }
+            // Waiting on data to proceed with reading the output
+            RowGroupDecoderState::WaitingOnData {
+                row_group_info,
+                data_request,
+                cache_info,
+            } => {
+                let needed_ranges = data_request.needed_ranges(&self.buffers);
+                if !needed_ranges.is_empty() {
+                    // still need data
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::WaitingOnData {
+                            row_group_info,
+                            data_request,
+                            cache_info,
+                        },
+                        DecodeResult::NeedsData(needed_ranges),
+                    ));
+                }
+
+                // otherwise we have all the data we need to proceed
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                } = row_group_info;
+
+                let row_group = data_request.try_into_in_memory_row_group(
+                    row_group_idx,
+                    row_count,
+                    &self.metadata,
+                    &self.projection,
+                    &mut self.buffers,
+                )?;
+
+                let plan = plan_builder.build();
+
+                // if we have any cached results, connect them up
+                let array_reader_builder = ArrayReaderBuilder::new(&row_group, 
&self.metrics);
+                let array_reader = if let Some(cache_info) = 
cache_info.as_ref() {
+                    let cache_options = cache_info.builder().consumer();
+                    array_reader_builder
+                        .with_cache_options(Some(&cache_options))
+                        .build_array_reader(self.fields.as_deref(), 
&self.projection)
+                } else {
+                    array_reader_builder
+                        .build_array_reader(self.fields.as_deref(), 
&self.projection)
+                }?;
+
+                let reader = ParquetRecordBatchReader::new(array_reader, plan);
+                NextState::result(RowGroupDecoderState::Finished, 
DecodeResult::Data(reader))
+            }
+            RowGroupDecoderState::Finished => {
+                // nothing left to read
+                NextState::result(RowGroupDecoderState::Finished, 
DecodeResult::Finished)
+            }
+        };
+        Ok(result)
+    }
+
+    /// Which columns should be cached?
+    ///
+    /// Returns the columns that are used by the filters *and* then used in the
+    /// final projection, excluding any nested columns.
+    fn compute_cache_projection(&self, row_group_idx: usize, filter: 
&RowFilter) -> ProjectionMask {
+        let meta = self.metadata.row_group(row_group_idx);
+        match self.compute_cache_projection_inner(filter) {
+            Some(projection) => projection,
+            None => ProjectionMask::none(meta.columns().len()),
+        }
+    }
+
+    fn compute_cache_projection_inner(&self, filter: &RowFilter) -> 
Option<ProjectionMask> {
+        let mut cache_projection = 
filter.predicates.first()?.projection().clone();
+        for predicate in filter.predicates.iter() {
+            cache_projection.union(predicate.projection());
+        }
+        cache_projection.intersect(&self.projection);
+        self.exclude_nested_columns_from_cache(&cache_projection)
+    }
+
+    /// Exclude leaves belonging to roots that span multiple parquet leaves 
(i.e. nested columns)
+    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> 
Option<ProjectionMask> {
+        let schema = self.metadata.file_metadata().schema_descr();
+        let num_leaves = schema.num_columns();
+
+        // Count how many leaves each root column has
+        let num_roots = schema.root_schema().get_fields().len();
+        let mut root_leaf_counts = vec![0usize; num_roots];
+        for leaf_idx in 0..num_leaves {
+            let root_idx = schema.get_column_root_idx(leaf_idx);
+            root_leaf_counts[root_idx] += 1;
+        }
+
+        // Keep only leaves whose root has exactly one leaf (non-nested)
+        let mut included_leaves = Vec::new();
+        for leaf_idx in 0..num_leaves {
+            if mask.leaf_included(leaf_idx) {
+                let root_idx = schema.get_column_root_idx(leaf_idx);
+                if root_leaf_counts[root_idx] == 1 {
+                    included_leaves.push(leaf_idx);
+                }
+            }
+        }
+
+        if included_leaves.is_empty() {
+            None
+        } else {
+            Some(ProjectionMask::leaves(schema, included_leaves))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    #[test]
+    // Verify that the size of RowGroupDecoderState does not grow too large
+    fn test_structure_size() {
+        assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 184);
+    }
+}
diff --git a/parquet/src/arrow/push_decoder/remaining.rs 
b/parquet/src/arrow/push_decoder/remaining.rs
new file mode 100644
index 0000000000..4613fda087
--- /dev/null
+++ b/parquet/src/arrow/push_decoder/remaining.rs
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
+use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use bytes::Bytes;
+use std::collections::VecDeque;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// State machine that tracks the remaining high level chunks (row groups) of
+/// Parquet data are left to read.
+///
+/// This is currently a row group, but the author aspires to extend the pattern
+/// to data boundaries other than RowGroups in the future.
+#[derive(Debug)]
+pub(crate) struct RemainingRowGroups {
+    /// The underlying Parquet metadata
+    parquet_metadata: Arc<ParquetMetaData>,
+
+    /// The row groups that have not yet been read
+    row_groups: VecDeque<usize>,
+
+    /// Remaining selection to apply to the next row groups
+    selection: Option<RowSelection>,
+
+    /// State for building the reader for the current row group
+    row_group_reader_builder: RowGroupReaderBuilder,
+}
+
+impl RemainingRowGroups {
+    pub fn new(
+        parquet_metadata: Arc<ParquetMetaData>,
+        row_groups: Vec<usize>,
+        selection: Option<RowSelection>,
+        row_group_reader_builder: RowGroupReaderBuilder,
+    ) -> Self {
+        Self {
+            parquet_metadata,
+            row_groups: VecDeque::from(row_groups),
+            selection,
+            row_group_reader_builder,
+        }
+    }
+
+    /// Push new data buffers that can be used to satisfy pending requests
+    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+        self.row_group_reader_builder.push_data(ranges, buffers);
+    }
+
+    /// Return the total number of bytes buffered so far
+    pub fn buffered_bytes(&self) -> u64 {
+        self.row_group_reader_builder.buffered_bytes()
+    }
+
+    /// returns [`ParquetRecordBatchReader`] suitable for reading the next
+    /// group of rows from the Parquet data, or the list of data ranges still
+    /// needed to proceed
+    pub fn try_next_reader(
+        &mut self,
+    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+        loop {
+            // Are we ready yet to start reading?
+            let result: DecodeResult<ParquetRecordBatchReader> =
+                self.row_group_reader_builder.try_build()?;
+            match result {
+                DecodeResult::Finished => {
+                    // reader is done, proceed to the next row group
+                    // fall through to the next row group
+                    // This happens if the row group was completely filtered 
out
+                }
+                DecodeResult::NeedsData(ranges) => {
+                    // need more data to proceed
+                    return Ok(DecodeResult::NeedsData(ranges));
+                }
+                DecodeResult::Data(batch_reader) => {
+                    // ready to read the row group
+                    return Ok(DecodeResult::Data(batch_reader));
+                }
+            }
+
+            // No current reader, proceed to the next row group if any
+            let row_group_idx = match self.row_groups.pop_front() {
+                None => return Ok(DecodeResult::Finished),
+                Some(idx) => idx,
+            };
+
+            let row_count: usize = self
+                .parquet_metadata
+                .row_group(row_group_idx)
+                .num_rows()
+                .try_into()
+                .map_err(|e| ParquetError::General(format!("Row count 
overflow: {e}")))?;
+
+            let selection = self.selection.as_mut().map(|s| 
s.split_off(row_count));
+            self.row_group_reader_builder
+                .next_row_group(row_group_idx, row_count, selection)?;
+            // the next iteration will try to build the reader for the new row 
group
+        }
+    }
+}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 7022bd61c4..1c8f3e9c69 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -42,7 +42,6 @@
 //! * [`ParquetMetaDataPushDecoder`] for decoding from bytes without I/O
 //! * [`ParquetMetaDataWriter`] for writing.
 //!
-//!
 //! # Examples
 //!
 //! Please see [`external_metadata.rs`]
diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs
index f8457fac23..98106a2c10 100644
--- a/parquet/src/lib.rs
+++ b/parquet/src/lib.rs
@@ -55,25 +55,28 @@
 //! ## Reading and Writing Arrow (`arrow` feature)
 //!
 //! The [`arrow`] module supports reading and writing Parquet data to/from
-//! Arrow `RecordBatch`es. Using Arrow is simple and performant, and allows 
workloads
+//! Arrow [`RecordBatch`]es. Using Arrow is simple and performant, and allows 
workloads
 //! to leverage the wide range of data transforms provided by the [arrow] 
crate, and by the
 //! ecosystem of [Arrow] compatible systems.
 //!
 //! Most users will use [`ArrowWriter`] for writing and 
[`ParquetRecordBatchReaderBuilder`] for
-//! reading.
+//! reading from synchronous IO sources such as files or in-memory buffers.
 //!
-//! Lower level APIs include [`ArrowColumnWriter`] for writing using multiple
-//! threads, and [`RowFilter`] to apply filters during decode.
+//! Lower level APIs include
+//! * [`ParquetPushDecoder`] for file grained control over interleaving of IO 
and CPU.
+//! * [`ArrowColumnWriter`] for writing using multiple threads,
+//! * [`RowFilter`] to apply filters during decode
 //!
 //! [`ArrowWriter`]: arrow::arrow_writer::ArrowWriter
 //! [`ParquetRecordBatchReaderBuilder`]: 
arrow::arrow_reader::ParquetRecordBatchReaderBuilder
+//! [`ParquetPushDecoder`]: arrow::push_decoder::ParquetPushDecoder
 //! [`ArrowColumnWriter`]: arrow::arrow_writer::ArrowColumnWriter
 //! [`RowFilter`]: arrow::arrow_reader::RowFilter
 //!
-//! ## `async` Reading and Writing Arrow (`async` feature)
+//! ## `async` Reading and Writing Arrow (`arrow` feature + `async` feature)
 //!
 //! The [`async_reader`] and [`async_writer`] modules provide async APIs to
-//! read and write `RecordBatch`es  asynchronously.
+//! read and write [`RecordBatch`]es  asynchronously.
 //!
 //! Most users will use [`AsyncArrowWriter`] for writing and 
[`ParquetRecordBatchStreamBuilder`]
 //! for reading. When the `object_store` feature is enabled, 
[`ParquetObjectReader`]
@@ -104,6 +107,7 @@
 //!
 //! [arrow]: https://docs.rs/arrow/latest/arrow/index.html
 //! [Arrow]: https://arrow.apache.org/
+//! [`RecordBatch`]: 
https://docs.rs/arrow/latest/arrow/array/struct.RecordBatch.html
 //! [CSV]: https://en.wikipedia.org/wiki/Comma-separated_values
 //! [Dremel]: https://research.google/pubs/pub36632/
 //! [Logical Types]: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs
index b30f91a81b..0475d48768 100644
--- a/parquet/src/util/push_buffers.rs
+++ b/parquet/src/util/push_buffers.rs
@@ -129,6 +129,31 @@ impl PushBuffers {
         self.offset = offset;
         self
     }
+
+    /// Return the total of all buffered ranges
+    #[cfg(feature = "arrow")]
+    pub fn buffered_bytes(&self) -> u64 {
+        self.ranges.iter().map(|r| r.end - r.start).sum()
+    }
+
+    /// Clear any range and corresponding buffer that is exactly in the 
ranges_to_clear
+    #[cfg(feature = "arrow")]
+    pub fn clear_ranges(&mut self, ranges_to_clear: &[Range<u64>]) {
+        let mut new_ranges = Vec::new();
+        let mut new_buffers = Vec::new();
+
+        for (range, buffer) in self.iter() {
+            if !ranges_to_clear
+                .iter()
+                .any(|r| r.start == range.start && r.end == range.end)
+            {
+                new_ranges.push(range.clone());
+                new_buffers.push(buffer.clone());
+            }
+        }
+        self.ranges = new_ranges;
+        self.buffers = new_buffers;
+    }
 }
 
 impl Length for PushBuffers {

Reply via email to