alamb commented on code in PR #7997:
URL: https://github.com/apache/arrow-rs/pull/7997#discussion_r2445900868


##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -967,245 +963,32 @@ where
     }
 }
 
-/// An in-memory collection of column chunks
-struct InMemoryRowGroup<'a> {
-    offset_index: Option<&'a [OffsetIndexMetaData]>,
-    /// Column chunks for this row group
-    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
-    row_count: usize,
-    row_group_idx: usize,
-    metadata: &'a ParquetMetaData,
-}
-
 impl InMemoryRowGroup<'_> {

Review Comment:
   Moved!



##########
parquet/src/arrow/in_memory_row_group.rs:
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::RowGroups;
+use crate::arrow::arrow_reader::RowSelection;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
+use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
+use bytes::{Buf, Bytes};
+use std::ops::Range;
+use std::sync::Arc;
+
+/// An in-memory collection of column chunks
+#[derive(Debug)]
+pub(crate) struct InMemoryRowGroup<'a> {
+    pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
+    /// Column chunks for this row group
+    pub(crate) column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
+    pub(crate) row_count: usize,
+    pub(crate) row_group_idx: usize,
+    pub(crate) metadata: &'a ParquetMetaData,
+}
+
+/// What ranges to fetch for the columns in this row group
+#[derive(Debug)]
+pub(crate) struct FetchRanges {
+    /// The byte ranges to fetch
+    pub(crate) ranges: Vec<Range<u64>>,
+    /// If `Some`, the start offsets of each page for each column chunk
+    pub(crate) page_start_offsets: Option<Vec<Vec<u64>>>,
+}
+
+impl InMemoryRowGroup<'_> {
+    /// Returns the byte ranges to fetch for the columns specified in
+    /// `projection` and `selection`.
+    pub(crate) fn fetch_ranges(
+        &self,
+        projection: &ProjectionMask,
+        selection: Option<&RowSelection>,
+        batch_size: usize,
+        cache_mask: Option<&ProjectionMask>,
+    ) -> FetchRanges {
+        let metadata = self.metadata.row_group(self.row_group_idx);
+        if let Some((selection, offset_index)) = 
selection.zip(self.offset_index) {
+            let expanded_selection =
+                selection.expand_to_batch_boundaries(batch_size, 
self.row_count);
+
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut page_start_offsets: Vec<Vec<u64>> = vec![];

Review Comment:
   - Filed  https://github.com/apache/arrow-rs/issues/8667 to track



##########
parquet/src/arrow/in_memory_row_group.rs:
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::RowGroups;
+use crate::arrow::arrow_reader::RowSelection;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
+use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
+use bytes::{Buf, Bytes};
+use std::ops::Range;
+use std::sync::Arc;
+
+/// An in-memory collection of column chunks
+#[derive(Debug)]
+pub(crate) struct InMemoryRowGroup<'a> {
+    pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
+    /// Column chunks for this row group
+    pub(crate) column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
+    pub(crate) row_count: usize,
+    pub(crate) row_group_idx: usize,
+    pub(crate) metadata: &'a ParquetMetaData,
+}
+
+/// What ranges to fetch for the columns in this row group
+#[derive(Debug)]
+pub(crate) struct FetchRanges {
+    /// The byte ranges to fetch
+    pub(crate) ranges: Vec<Range<u64>>,
+    /// If `Some`, the start offsets of each page for each column chunk
+    pub(crate) page_start_offsets: Option<Vec<Vec<u64>>>,
+}
+
+impl InMemoryRowGroup<'_> {
+    /// Returns the byte ranges to fetch for the columns specified in
+    /// `projection` and `selection`.
+    pub(crate) fn fetch_ranges(
+        &self,
+        projection: &ProjectionMask,
+        selection: Option<&RowSelection>,
+        batch_size: usize,
+        cache_mask: Option<&ProjectionMask>,
+    ) -> FetchRanges {
+        let metadata = self.metadata.row_group(self.row_group_idx);
+        if let Some((selection, offset_index)) = 
selection.zip(self.offset_index) {
+            let expanded_selection =
+                selection.expand_to_batch_boundaries(batch_size, 
self.row_count);
+
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
+
+            let ranges = self
+                .column_chunks
+                .iter()
+                .zip(metadata.columns())
+                .enumerate()
+                .filter(|&(idx, (chunk, _chunk_meta))| {
+                    chunk.is_none() && projection.leaf_included(idx)
+                })
+                .flat_map(|(idx, (_chunk, chunk_meta))| {
+                    // If the first page does not start at the beginning of 
the column,
+                    // then we need to also fetch a dictionary page.
+                    let mut ranges: Vec<Range<u64>> = vec![];
+                    let (start, _len) = chunk_meta.byte_range();
+                    match offset_index[idx].page_locations.first() {
+                        Some(first) if first.offset as u64 != start => {
+                            ranges.push(start..first.offset as u64);
+                        }
+                        _ => (),
+                    }
+
+                    // Expand selection to batch boundaries only for cached 
columns

Review Comment:
   Good call -- I added some additional comments with context about the 
predicate cache and links



##########
parquet/src/util/push_buffers.rs:
##########
@@ -129,6 +129,31 @@ impl PushBuffers {
         self.offset = offset;
         self
     }
+

Review Comment:
   I will file a ticket to track



##########
parquet/src/arrow/push_decoder/reader_builder/filter.rs:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates
+///
+/// This is somewhat more complicated than one might expect because the
+/// RowFilter must be owned by the FilterInfo so that predicates can
+/// be evaluated (requires mutable access).
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// These must be owned by FilterInfo because they may be mutated as part 
of
+    /// evaluation so there is a bunch of complexity of handing them back and 
forth
+    filter: RowFilter,
+    /// The next filter to be evaluated
+    next_predicate: NonZeroUsize,
+    /// Stores previously computed filter results
+    cache_info: CacheInfo,
+}
+
+/// Predicate cache
+///
+/// Note this is basically the same as CacheOptionsBuilder
+/// but it owns the ProjectionMask and RowGroupCache
+#[derive(Debug)]
+pub(super) struct CacheInfo {
+    /// The columns to cache in the predicate cache
+    cache_projection: ProjectionMask,
+    row_group_cache: Arc<Mutex<RowGroupCache>>,
+}
+
+impl CacheInfo {
+    pub(super) fn new(
+        cache_projection: ProjectionMask,
+        row_group_cache: Arc<Mutex<RowGroupCache>>,
+    ) -> Self {
+        Self {
+            cache_projection,
+            row_group_cache,
+        }
+    }
+
+    pub(super) fn builder(&self) -> CacheOptionsBuilder<'_> {
+        CacheOptionsBuilder::new(&self.cache_projection, &self.row_group_cache)
+    }
+}
+
+pub(super) enum AdvanceResult {
+    /// advanced to the next predicate
+    Continue(FilterInfo),
+    /// no more predicates returns the row filter and cache info
+    Done(RowFilter, CacheInfo),
+}
+
+impl FilterInfo {
+    /// Create a new FilterInfo
+    pub(super) fn new(filter: RowFilter, cache_info: CacheInfo) -> Self {
+        Self {
+            filter,
+            next_predicate: NonZeroUsize::new(1).expect("1 is always 
non-zero"),
+            cache_info,
+        }
+    }
+
+    /// Advance to the next predicate, returning either the updated FilterInfo
+    /// or the completed RowFilter if there are no more predicates
+    pub(super) fn advance(mut self) -> AdvanceResult {
+        if self.next_predicate.get() >= self.filter.predicates.len() {
+            AdvanceResult::Done(self.filter, self.cache_info)
+        } else {
+            self.next_predicate = self
+                .next_predicate
+                .checked_add(1)
+                .expect("no usize overflow");
+            AdvanceResult::Continue(self)
+        }
+    }
+
+    /// Return the current predicate to evaluate, mutablely
+    /// Panics if done() is true

Review Comment:
   Good catch. i think `done` is left over from a previous implementation. 
Since there is no way to change `next_predicate` except `Self::advance()` I 
think the unwrap is ok. I will remove this comment and clarify next to unwrap



##########
parquet/src/arrow/push_decoder/reader_builder/filter.rs:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates
+///
+/// This is somewhat more complicated than one might expect because the
+/// RowFilter must be owned by the FilterInfo so that predicates can
+/// be evaluated (requires mutable access).
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// These must be owned by FilterInfo because they may be mutated as part 
of
+    /// evaluation so there is a bunch of complexity of handing them back and 
forth
+    filter: RowFilter,
+    /// The next filter to be evaluated
+    next_predicate: NonZeroUsize,

Review Comment:
   I tried to clarify this in comments 



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -0,0 +1,1150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
+//! caller (rather than from an underlying reader).
+
+mod reader_builder;
+mod remaining;
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{
+    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
+};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use reader_builder::RowGroupReaderBuilder;
+use remaining::RemainingRowGroups;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// A builder for [`ParquetPushDecoder`].
+///
+/// To create a new decoder, use 
[`ParquetPushDecoderBuilder::try_new_decoder`] and pass
+/// the file length and metadata of the Parquet file to decode.
+///
+/// You can decode the metadata from a Parquet file using either
+/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
+///
+/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
+/// [`ParquetMetaDataPushDecoder`]: 
crate::file::metadata::ParquetMetaDataPushDecoder
+///
+/// Note the "input" type is `u64` which represents the length of the Parquet 
file
+/// being decoded. This is needed to initialize the internal buffers that track
+/// what data has been provided to the decoder.
+///
+/// # Example
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # let file_bytes = {
+/// #   let mut buffer = vec![];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!("failed to decode 
metadata") };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// // The file length and metadata are required to create the decoder
+/// let mut decoder =
+///     ParquetPushDecoderBuilder::try_new_decoder(file_length, 
parquet_metadata)
+///       .unwrap()
+///       // Optionally configure the decoder, e.g. batch size
+///       .with_batch_size(1024)
+///       // Build the decoder
+///       .build()
+///       .unwrap();
+///
+///     // In a loop, ask the decoder what it needs next, and provide it with 
the required data
+///     loop {
+///         match decoder.try_decode().unwrap() {

Review Comment:
   Yes indeed, excellent call -- captured in 
   - https://github.com/apache/arrow-rs/issues/8668



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -0,0 +1,1150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
+//! caller (rather than from an underlying reader).
+
+mod reader_builder;
+mod remaining;
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{
+    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
+};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use reader_builder::RowGroupReaderBuilder;
+use remaining::RemainingRowGroups;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// A builder for [`ParquetPushDecoder`].
+///
+/// To create a new decoder, use 
[`ParquetPushDecoderBuilder::try_new_decoder`] and pass
+/// the file length and metadata of the Parquet file to decode.
+///
+/// You can decode the metadata from a Parquet file using either
+/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
+///
+/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
+/// [`ParquetMetaDataPushDecoder`]: 
crate::file::metadata::ParquetMetaDataPushDecoder
+///
+/// Note the "input" type is `u64` which represents the length of the Parquet 
file
+/// being decoded. This is needed to initialize the internal buffers that track
+/// what data has been provided to the decoder.
+///
+/// # Example
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # let file_bytes = {
+/// #   let mut buffer = vec![];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!("failed to decode 
metadata") };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// // The file length and metadata are required to create the decoder
+/// let mut decoder =
+///     ParquetPushDecoderBuilder::try_new_decoder(file_length, 
parquet_metadata)
+///       .unwrap()
+///       // Optionally configure the decoder, e.g. batch size
+///       .with_batch_size(1024)
+///       // Build the decoder
+///       .build()
+///       .unwrap();
+///
+///     // In a loop, ask the decoder what it needs next, and provide it with 
the required data
+///     loop {
+///         match decoder.try_decode().unwrap() {
+///             DecodeResult::NeedsData(ranges) => {
+///                 // The decoder needs more data. Fetch the data for the 
given ranges
+///                 let data = ranges.iter().map(|r| 
get_range(r)).collect::<Vec<_>>();
+///                 // Push the data to the decoder
+///                 decoder.push_ranges(ranges, data).unwrap();
+///                 // After pushing the data, we can try to decode again on 
the next iteration
+///             }
+///             DecodeResult::Data(batch) => {
+///                 // Successfully decoded a batch of data
+///                 assert!(batch.num_rows() > 0);
+///             }
+///             DecodeResult::Finished => {
+///                 // The decoder has finished decoding exit the loop
+///                 break;
+///             }
+///         }
+///     }
+/// ```
+pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<u64>;
+
+/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] 
for
+/// more options that can be configured.
+impl ParquetPushDecoderBuilder {

Review Comment:
   if you mean alongside `ArrowReaderBuilder` in `arrow_reader/mod.rs` it is 
because I felt that file was already pretty big as it has both generic options 
along with specific options for sync and async readers 
   
   
https://github.com/apache/arrow-rs/blob/b9c2bf73e792e7cb849f0bd453059ceef45b0b74/parquet/src/arrow/arrow_reader/mod.rs#L710-L709



##########
parquet/src/arrow/push_decoder/reader_builder/filter.rs:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates
+///
+/// This is somewhat more complicated than one might expect because the
+/// RowFilter must be owned by the FilterInfo so that predicates can
+/// be evaluated (requires mutable access).
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// These must be owned by FilterInfo because they may be mutated as part 
of
+    /// evaluation so there is a bunch of complexity of handing them back and 
forth

Review Comment:
   The core driver is that ArrowPredicate takes `&mut self`: 
https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/trait.ArrowPredicate.html
   
   ```rust
   pub trait ArrowPredicate: Send + 'static {
       // Required methods
       fn projection(&self) -> &ProjectionMask;
       fn evaluate(
           &mut self,
           batch: RecordBatch,
       ) -> Result<BooleanArray, ArrowError>;
   }
   ```
   
   This means that to evaluate the predicate there must be exactly one owner 
and to get the lifetimes to work out I needed to pass ownership around like 
this. I will clarify this in the commetns



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+    row_group_idx: usize,
+    row_count: usize,
+    plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+    Start {
+        row_group_info: RowGroupInfo,
+    },
+    /// Planning filters, but haven't yet requested data to evaluate them
+    Filters {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from prior filters
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        filter_info: FilterInfo,
+    },
+    /// Needs data to evaluate current filter
+    WaitingOnFilterData {
+        row_group_info: RowGroupInfo,
+        filter_info: FilterInfo,
+        data_request: DataRequest,
+    },
+    /// Know what data to actually read, after all predicates
+    StartData {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from the filtering phase
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Needs data to proceed with reading the output
+    WaitingOnData {
+        row_group_info: RowGroupInfo,
+        data_request: DataRequest,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Finished (or not yet started) reading this group
+    Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+    next_state: RowGroupDecoderState,
+    /// result to return, if any
+    ///
+    /// * `Some`: the processing should stop and return the result
+    /// * `None`: processing should continue
+    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+    /// The next state with no result.
+    ///
+    /// This indicates processing should continue
+    fn again(next_state: RowGroupDecoderState) -> Self {
+        Self {
+            next_state,
+            result: None,
+        }
+    }
+
+    /// Create a NextState with a result that should be returned
+    fn result(
+        next_state: RowGroupDecoderState,
+        result: DecodeResult<ParquetRecordBatchReader>,
+    ) -> Self {
+        Self {
+            next_state,
+            result: Some(result),
+        }
+    }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+    /// The output batch size
+    batch_size: usize,
+
+    /// What columns to project (produce in each output batch)
+    projection: ProjectionMask,
+
+    /// The Parquet file metadata
+    metadata: Arc<ParquetMetaData>,
+
+    /// Top level parquet schema and arrow schema mapping
+    fields: Option<Arc<ParquetField>>,
+
+    /// Optional filter
+    filter: Option<RowFilter>,
+
+    /// Limit to apply to remaining row groups (decremented as rows are read)
+    limit: Option<usize>,
+
+    /// Offset to apply to remaining row groups (decremented as rows are read)
+    offset: Option<usize>,
+
+    /// The size in bytes of the predicate cache
+    max_predicate_cache_size: usize,
+
+    /// The metrics collector
+    metrics: ArrowReaderMetrics,
+
+    /// Current state of the decoder.
+    ///
+    /// It is taken when processing, and must be put back before returning
+    /// it is a bug error if it is not put back after transitioning states.
+    state: Option<RowGroupDecoderState>,
+
+    /// The underlying data store
+    buffers: PushBuffers,
+}
+
+impl RowGroupReaderBuilder {
+    /// Create a new RowGroupReaderBuilder
+    #[expect(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        batch_size: usize,
+        projection: ProjectionMask,
+        metadata: Arc<ParquetMetaData>,
+        fields: Option<Arc<ParquetField>>,
+        filter: Option<RowFilter>,
+        limit: Option<usize>,
+        offset: Option<usize>,
+        metrics: ArrowReaderMetrics,
+        max_predicate_cache_size: usize,
+        buffers: PushBuffers,
+    ) -> Self {
+        Self {
+            batch_size,
+            projection,
+            metadata,
+            fields,
+            filter,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+            state: Some(RowGroupDecoderState::Finished),
+            buffers,
+        }
+    }
+
+    /// Push new data buffers that can be used to satisfy pending requests
+    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+        self.buffers.push_ranges(ranges, buffers);
+    }
+
+    /// Returns the total number of buffered bytes available
+    pub fn buffered_bytes(&self) -> u64 {
+        self.buffers.buffered_bytes()
+    }
+
+    /// take the current state, leaving None in its place.
+    ///
+    /// Returns an error if there the state wasn't put back after the previous
+    /// call to [`Self::take_state`].
+    ///
+    /// Any code that calls this method must ensure that the state is put back
+    /// before returning, otherwise the reader error next time it is called
+    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
+        self.state.take().ok_or_else(|| {
+            ParquetError::General(String::from(
+                "Internal Error: RowGroupReader in invalid state",
+            ))
+        })
+    }
+
+    /// Setup this reader to read the next row group
+    pub(crate) fn next_row_group(
+        &mut self,
+        row_group_idx: usize,
+        row_count: usize,
+        selection: Option<RowSelection>,
+    ) -> Result<(), ParquetError> {
+        let state = self.take_state()?;
+        if !matches!(state, RowGroupDecoderState::Finished) {
+            return Err(ParquetError::General(format!(
+                "Internal Error: next_row_group called while still reading a 
row group. Expected Finished state, got {state:?}"
+            )));
+        }
+        let plan_builder = 
ReadPlanBuilder::new(self.batch_size).with_selection(selection);
+
+        let row_group_info = RowGroupInfo {
+            row_group_idx,
+            row_count,
+            plan_builder,
+        };
+
+        self.state = Some(RowGroupDecoderState::Start { row_group_info });
+        Ok(())
+    }
+
+    /// Try to build the next `ParquetRecordBatchReader` from this 
RowGroupReader.
+    ///
+    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
+    /// ranges of data that are needed to proceed.
+    ///
+    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
+    /// `DecodeResult::Data`.
+    pub(crate) fn try_build(
+        &mut self,
+    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+        loop {
+            let current_state = self.take_state()?;
+            match self.try_transition(current_state)? {
+                NextState {
+                    next_state,
+                    result: Some(result),
+                } => {
+                    // put back the next state
+                    self.state = Some(next_state);
+                    return Ok(result);
+                }
+                NextState {
+                    next_state,
+                    result: None,
+                } => {
+                    // continue processing
+                    self.state = Some(next_state);
+                }
+            }
+        }
+    }
+
+    /// Current state --> next state + optional output
+    ///
+    /// This is the main state transition function for the row group reader
+    /// and encodes the row group decoding state machine.
+    ///
+    /// # Notes
+    ///
+    /// This structure is used to reduce the indentation level of the main loop
+    /// in try_build
+    fn try_transition(
+        &mut self,
+        current_state: RowGroupDecoderState,
+    ) -> Result<NextState, ParquetError> {
+        let result = match current_state {
+            RowGroupDecoderState::Start { row_group_info } => {
+                let column_chunks = None; // no prior column chunks
+
+                let Some(filter) = self.filter.take() else {
+                    // no filter, start trying to read data immediately
+                    return Ok(NextState::again(RowGroupDecoderState::StartData 
{
+                        row_group_info,
+                        column_chunks,
+                        cache_info: None,
+                    }));
+                };
+                // no predicates in filter, so start reading immediately
+                if filter.predicates.is_empty() {
+                    return Ok(NextState::again(RowGroupDecoderState::StartData 
{
+                        row_group_info,
+                        column_chunks,
+                        cache_info: None,
+                    }));
+                };
+
+                // we have predicates to evaluate
+                let cache_projection =
+                    
self.compute_cache_projection(row_group_info.row_group_idx, &filter);
+
+                let cache_info = CacheInfo::new(
+                    cache_projection,
+                    Arc::new(Mutex::new(RowGroupCache::new(
+                        self.batch_size,
+                        self.max_predicate_cache_size,
+                    ))),
+                );
+
+                let filter_info = FilterInfo::new(filter, cache_info);
+                NextState::again(RowGroupDecoderState::Filters {
+                    row_group_info,
+                    filter_info,
+                    column_chunks,
+                })
+            }
+            // need to evaluate filters
+            RowGroupDecoderState::Filters {
+                row_group_info,
+                column_chunks,
+                filter_info,
+            } => {
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                } = row_group_info;
+
+                // If nothing is selected, we are done with this row group
+                if !plan_builder.selects_any() {
+                    // ruled out entire row group
+                    self.filter = Some(filter_info.into_filter());
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::Finished,
+                        DecodeResult::Finished,
+                    ));
+                }
+
+                // Make a request for the data needed to evaluate the current 
predicate
+                let predicate = filter_info.current();
+
+                // need to fetch pages the column needs for decoding, figure
+                // that out based on the current selection and projection
+                let data_request = DataRequestBuilder::new(
+                    row_group_idx,
+                    row_count,
+                    self.batch_size,
+                    &self.metadata,
+                    predicate.projection(), // use the predicate's projection
+                )
+                .with_selection(plan_builder.selection())
+                // Fetch predicate columns; expand selection only for cached 
predicate columns
+                .with_cache_projection(Some(filter_info.cache_projection()))
+                .with_column_chunks(column_chunks)
+                .build();
+
+                let row_group_info = RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                };
+
+                NextState::again(RowGroupDecoderState::WaitingOnFilterData {
+                    row_group_info,
+                    filter_info,
+                    data_request,
+                })
+            }
+            RowGroupDecoderState::WaitingOnFilterData {
+                row_group_info,
+                data_request,
+                mut filter_info,
+            } => {
+                // figure out what ranges we still need
+                let needed_ranges = data_request.needed_ranges(&self.buffers);
+                if !needed_ranges.is_empty() {
+                    // still need data
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::WaitingOnFilterData {
+                            row_group_info,
+                            filter_info,
+                            data_request,
+                        },
+                        DecodeResult::NeedsData(needed_ranges),
+                    ));
+                }
+
+                // otherwise we have all the data we need to evaluate the 
predicate
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    mut plan_builder,
+                } = row_group_info;
+
+                let predicate = filter_info.current();
+
+                let row_group = data_request.try_into_in_memory_row_group(
+                    row_group_idx,
+                    row_count,
+                    &self.metadata,
+                    predicate.projection(),
+                    &mut self.buffers,
+                )?;
+
+                let cache_options = filter_info.cache_builder().producer();
+
+                let array_reader = ArrayReaderBuilder::new(&row_group, 
&self.metrics)
+                    .with_cache_options(Some(&cache_options))
+                    .build_array_reader(self.fields.as_deref(), 
predicate.projection())?;
+
+                plan_builder =
+                    plan_builder.with_predicate(array_reader, 
filter_info.current_mut())?;
+
+                let row_group_info = RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                };
+
+                // Take back the column chunks that were read
+                let column_chunks = Some(row_group.column_chunks);
+
+                // advance to the next predicate, if any
+                match filter_info.advance() {
+                    AdvanceResult::Continue(filter_info) => {
+                        NextState::again(RowGroupDecoderState::Filters {
+                            row_group_info,
+                            column_chunks,
+                            filter_info,
+                        })
+                    }
+                    // done with predicates, proceed to reading data
+                    AdvanceResult::Done(filter, cache_info) => {
+                        // remember we need to put back the filter
+                        assert!(self.filter.is_none());
+                        self.filter = Some(filter);
+                        NextState::again(RowGroupDecoderState::StartData {
+                            row_group_info,
+                            column_chunks,
+                            cache_info: Some(cache_info),
+                        })
+                    }
+                }
+            }
+            RowGroupDecoderState::StartData {
+                row_group_info,
+                column_chunks,
+                cache_info,
+            } => {
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                } = row_group_info;
+
+                // Compute the number of rows in the selection before applying 
limit and offset
+                let rows_before = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+
+                if rows_before == 0 {
+                    // ruled out entire row group
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::Finished,
+                        DecodeResult::Finished,
+                    ));
+                }
+
+                // Apply any limit and offset
+                let plan_builder = plan_builder
+                    .limited(row_count)
+                    .with_offset(self.offset)
+                    .with_limit(self.limit)
+                    .build_limited();
+
+                let rows_after = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+
+                // Update running offset and limit for after the current row 
group is read
+                if let Some(offset) = &mut self.offset {
+                    // Reduction is either because of offset or limit, as 
limit is applied
+                    // after offset has been "exhausted" can just use 
saturating sub here
+                    *offset = offset.saturating_sub(rows_before - rows_after)
+                }
+
+                if rows_after == 0 {
+                    // no rows left after applying limit/offset
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::Finished,
+                        DecodeResult::Finished,
+                    ));
+                }
+
+                if let Some(limit) = &mut self.limit {
+                    *limit -= rows_after;
+                }
+
+                let data_request = DataRequestBuilder::new(
+                    row_group_idx,
+                    row_count,
+                    self.batch_size,
+                    &self.metadata,
+                    &self.projection,
+                )
+                .with_selection(plan_builder.selection())
+                .with_column_chunks(column_chunks)
+                // Final projection fetch shouldn't expand selection for cache
+                // so don't call with_cache_projection here
+                .build();
+
+                let row_group_info = RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                };
+
+                NextState::again(RowGroupDecoderState::WaitingOnData {
+                    row_group_info,
+                    data_request,
+                    cache_info,
+                })
+            }
+            // Waiting on data to proceed with reading the output
+            RowGroupDecoderState::WaitingOnData {
+                row_group_info,
+                data_request,
+                cache_info,
+            } => {
+                let needed_ranges = data_request.needed_ranges(&self.buffers);
+                if !needed_ranges.is_empty() {
+                    // still need data
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::WaitingOnData {
+                            row_group_info,
+                            data_request,
+                            cache_info,
+                        },
+                        DecodeResult::NeedsData(needed_ranges),
+                    ));
+                }
+
+                // otherwise we have all the data we need to proceed
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                } = row_group_info;
+
+                let row_group = data_request.try_into_in_memory_row_group(
+                    row_group_idx,
+                    row_count,
+                    &self.metadata,
+                    &self.projection,
+                    &mut self.buffers,
+                )?;
+
+                let plan = plan_builder.build();
+
+                // if we have any cached results, connect them up
+                let array_reader_builder = ArrayReaderBuilder::new(&row_group, 
&self.metrics);
+                let array_reader = if let Some(cache_info) = 
cache_info.as_ref() {
+                    let cache_options = cache_info.builder().consumer();
+                    array_reader_builder
+                        .with_cache_options(Some(&cache_options))
+                        .build_array_reader(self.fields.as_deref(), 
&self.projection)
+                } else {
+                    array_reader_builder
+                        .build_array_reader(self.fields.as_deref(), 
&self.projection)
+                }?;
+
+                let reader = ParquetRecordBatchReader::new(array_reader, plan);
+                NextState::result(RowGroupDecoderState::Finished, 
DecodeResult::Data(reader))
+            }
+            RowGroupDecoderState::Finished => {
+                // nothing left to read
+                NextState::result(RowGroupDecoderState::Finished, 
DecodeResult::Finished)
+            }
+        };
+        Ok(result)
+    }
+
+    /// Which columns should be cached?
+    ///
+    /// Returns the columns that are used by the filters *and* then used in the
+    /// final projection, excluding any nested columns.
+    fn compute_cache_projection(&self, row_group_idx: usize, filter: 
&RowFilter) -> ProjectionMask {
+        let meta = self.metadata.row_group(row_group_idx);
+        match self.compute_cache_projection_inner(filter) {
+            Some(projection) => projection,
+            None => ProjectionMask::none(meta.columns().len()),
+        }
+    }
+
+    fn compute_cache_projection_inner(&self, filter: &RowFilter) -> 
Option<ProjectionMask> {
+        let mut cache_projection = 
filter.predicates.first()?.projection().clone();
+        for predicate in filter.predicates.iter() {
+            cache_projection.union(predicate.projection());
+        }
+        cache_projection.intersect(&self.projection);
+        self.exclude_nested_columns_from_cache(&cache_projection)
+    }
+
+    /// Exclude leaves belonging to roots that span multiple parquet leaves 
(i.e. nested columns)

Review Comment:
   Short answer is I don't know -- this just preserves the existing semantics 
   
   
https://github.com/apache/arrow-rs/blob/b9c2bf73e792e7cb849f0bd453059ceef45b0b74/parquet/src/arrow/async_reader/mod.rs#L728-L727
   
   Maybe @XiangpengHao can explain in more detail



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -0,0 +1,1150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
+//! caller (rather than from an underlying reader).
+
+mod reader_builder;
+mod remaining;
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{
+    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
+};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use reader_builder::RowGroupReaderBuilder;
+use remaining::RemainingRowGroups;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// A builder for [`ParquetPushDecoder`].
+///
+/// To create a new decoder, use 
[`ParquetPushDecoderBuilder::try_new_decoder`] and pass
+/// the file length and metadata of the Parquet file to decode.
+///
+/// You can decode the metadata from a Parquet file using either
+/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
+///
+/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
+/// [`ParquetMetaDataPushDecoder`]: 
crate::file::metadata::ParquetMetaDataPushDecoder
+///
+/// Note the "input" type is `u64` which represents the length of the Parquet 
file
+/// being decoded. This is needed to initialize the internal buffers that track
+/// what data has been provided to the decoder.
+///
+/// # Example
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+///  # let file_bytes = {
+///  #   let mut buffer = vec![];
+///  #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+///  #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+///  #   writer.write(&batch).unwrap();
+///  #   writer.close().unwrap();
+///  #   Bytes::from(buffer)
+///  # };
+///  # // mimic IO by returning a function that returns the bytes for a given 
range
+///  # let get_range = |range: &Range<u64>| -> Bytes {
+///  #    let start = range.start as usize;
+///  #     let end = range.end as usize;
+///  #    file_bytes.slice(start..end)
+///  # };
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!("failed to decode 
metadata") };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// // The file length and metadata are required to create the decoder
+/// let mut decoder =
+///     ParquetPushDecoderBuilder::try_new_decoder(file_length, 
parquet_metadata)
+///       .unwrap()
+///       // Optionally configure the decoder, e.g. batch size
+///       .with_batch_size(1024)
+///       // Build the decoder
+///       .build()
+///       .unwrap();
+///
+///     // In a loop, ask the decoder what it needs next, and provide it with 
the required data
+///     loop {
+///         match decoder.try_decode().unwrap() {
+///             DecodeResult::NeedsData(ranges) => {
+///                 // The decoder needs more data. Fetch the data for the 
given ranges
+///                 let data = ranges.iter().map(|r| 
get_range(r)).collect::<Vec<_>>();
+///                 // Push the data to the decoder
+///                 decoder.push_ranges(ranges, data).unwrap();
+///                 // After pushing the data, we can try to decode again on 
the next iteration
+///             }
+///             DecodeResult::Data(batch) => {
+///                 // Successfully decoded a batch of data
+///                 assert!(batch.num_rows() > 0);
+///             }
+///             DecodeResult::Finished => {
+///                 // The decoder has finished decoding exit the loop
+///                 break;
+///             }
+///         }
+///     }
+/// ```
+pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<u64>;
+
+/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] 
for
+/// more options that can be configured.
+impl ParquetPushDecoderBuilder {
+    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder 
for the given file.
+    ///
+    /// See [`ParquetMetadataDecoder`] for a builder that can read the 
metadata from a Parquet file.
+    ///
+    /// [`ParquetMetadataDecoder`]: 
crate::file::metadata::ParquetMetaDataPushDecoder
+    ///
+    /// See example on [`ParquetPushDecoderBuilder`]
+    pub fn try_new_decoder(
+        file_len: u64,
+        parquet_metadata: Arc<ParquetMetaData>,
+    ) -> Result<Self, ParquetError> {
+        Self::try_new_decoder_with_options(
+            file_len,
+            parquet_metadata,
+            ArrowReaderOptions::default(),
+        )
+    }
+
+    /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder 
for the given file
+    /// with the given reader options.
+    ///
+    /// This is similar to [`Self::try_new_decoder`] but allows configuring
+    /// options such as Arrow schema
+    pub fn try_new_decoder_with_options(
+        file_len: u64,
+        parquet_metadata: Arc<ParquetMetaData>,
+        arrow_reader_options: ArrowReaderOptions,
+    ) -> Result<Self, ParquetError> {
+        let arrow_reader_metadata =
+            ArrowReaderMetadata::try_new(parquet_metadata, 
arrow_reader_options)?;
+        Ok(Self::new_with_metadata(file_len, arrow_reader_metadata))
+    }
+
+    /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
+    ///
+    /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata 
from
+    /// the Parquet metadata and reader options.
+    pub fn new_with_metadata(file_len: u64, arrow_reader_metadata: 
ArrowReaderMetadata) -> Self {
+        Self::new_builder(file_len, arrow_reader_metadata)
+    }
+
+    /// Create a [`ParquetPushDecoder`] with the configured options
+    pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
+        let Self {
+            input: file_len,
+            metadata: parquet_metadata,
+            schema: _,
+            fields,
+            batch_size,
+            row_groups,
+            projection,
+            filter,
+            selection,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+        } = self;
+
+        // If no row groups were specified, read all of them
+        let row_groups =
+            row_groups.unwrap_or_else(|| 
(0..parquet_metadata.num_row_groups()).collect());
+
+        // Prepare to build RowGroup readers
+        let buffers = PushBuffers::new(file_len);
+        let row_group_reader_builder = RowGroupReaderBuilder::new(
+            batch_size,
+            projection,
+            Arc::clone(&parquet_metadata),
+            fields,
+            filter,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+            buffers,
+        );
+
+        // Initialize the decoder with the configured options
+        let remaining_row_groups = RemainingRowGroups::new(
+            parquet_metadata,
+            row_groups,
+            selection,
+            row_group_reader_builder,
+        );
+
+        Ok(ParquetPushDecoder {
+            state: ParquetDecoderState::ReadingRowGroup {
+                remaining_row_groups: Box::new(remaining_row_groups),
+            },
+        })
+    }
+}
+
+/// A push based Parquet Decoder
+///
+/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use 
the decoder.
+///
+/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data 
without an
+/// underlying reader for performing IO, and thus offers fine grained control
+/// over how data is fetched and decoded.
+///
+/// When more data is needed to make progress, instead of reading data directly
+/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
+/// are needed. Once the caller provides the requested ranges via
+/// [`Self::push_ranges`], they try to decode again by calling
+/// [`Self::try_decode`].
+///
+/// The decoder's internal state tracks what has been already decoded and what
+/// is needed next.
+#[derive(Debug)]
+pub struct ParquetPushDecoder {
+    /// The inner state.
+    ///
+    /// This state is consumed on every transition and a new state is produced
+    /// so the Rust compiler can ensure that the state is always valid and
+    /// transitions are not missed.
+    state: ParquetDecoderState,
+}
+
+impl ParquetPushDecoder {
+    /// Attempt to decode the next batch of data, or return what data is needed
+    ///
+    /// The the decoder communicates the next state with a [`DecodeResult`]
+    ///
+    /// See full example in [`ParquetPushDecoderBuilder`]
+    ///
+    /// ```no_run
+    /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
+    /// use parquet::DecodeResult;
+    /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
+    /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: 
Vec<std::ops::Range<u64>>) { unimplemented!() }
+    /// let mut decoder = get_decoder();
+    /// loop {
+    ///    match decoder.try_decode().unwrap() {
+    ///       DecodeResult::NeedsData(ranges) => {
+    ///         // The decoder needs more data. Fetch the data for the given 
ranges
+    ///         // call decoder.push_ranges(ranges, data) and call again
+    ///         push_data(&mut decoder, ranges);
+    ///       }
+    ///       DecodeResult::Data(batch) => {
+    ///         // Successfully decoded the next batch of data
+    ///         println!("Got batch with {} rows", batch.num_rows());
+    ///       }
+    ///       DecodeResult::Finished => {
+    ///         // The decoder has finished decoding all data
+    ///         break;
+    ///       }
+    ///    }
+    /// }
+    ///```
+    pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, 
ParquetError> {
+        let current_state = std::mem::replace(&mut self.state, 
ParquetDecoderState::Finished);
+        let (new_state, decode_result) = current_state.try_transition()?;
+        self.state = new_state;
+        Ok(decode_result)
+    }
+
+    /// Push data into the decoder for processing
+    ///
+    /// This is a convenience wrapper around [`Self::push_ranges`] for pushing 
a
+    /// single range of data.
+    ///
+    /// Note this can be the entire file or just a part of it. If it is part 
of the file,
+    /// the ranges should correspond to the data ranges requested by the 
decoder.
+    ///
+    /// See example in [`ParquetPushDecoderBuilder`]
+    pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), 
ParquetError> {
+        self.push_ranges(vec![range], vec![data])
+    }
+
+    /// Push data into the decoder for processing
+    ///
+    /// This should correspond to the data ranges requested by the decoder
+    pub fn push_ranges(
+        &mut self,
+        ranges: Vec<Range<u64>>,
+        data: Vec<Bytes>,
+    ) -> Result<(), ParquetError> {
+        let current_state = std::mem::replace(&mut self.state, 
ParquetDecoderState::Finished);
+        self.state = current_state.push_data(ranges, data)?;
+        Ok(())
+    }
+
+    /// Returns the total number of buffered bytes in the decoder
+    ///
+    /// This is the sum of the size of all [`Bytes`] that has been pushed to 
the
+    /// decoder but not yet consumed.
+    ///
+    /// Note that this does not include any overhead of the internal data
+    /// structures and that since [`Bytes`] are ref counted memory, this may 
not
+    /// reflect additional memory usage.
+    ///
+    /// This can be used to monitor memory usage of the decoder.
+    pub fn buffered_bytes(&self) -> u64 {
+        self.state.buffered_bytes()
+    }
+}
+
+/// Internal state machine for the [`ParquetPushDecoder`]
+#[derive(Debug)]
+enum ParquetDecoderState {
+    /// Waiting for data needed to decode the next RowGroup
+    ReadingRowGroup {
+        remaining_row_groups: Box<RemainingRowGroups>,
+    },
+    /// The decoder is actively decoding a RowGroup
+    DecodingRowGroup {
+        /// Current active reader
+        record_batch_reader: Box<ParquetRecordBatchReader>,
+        remaining_row_groups: Box<RemainingRowGroups>,
+    },
+    /// The decoder has finished processing all data
+    Finished,
+}
+
+impl ParquetDecoderState {
+    /// Current state --> next state + output
+    ///
+    /// This function is called to check if the decoder has any RecordBatches
+    /// and [`Self::push_data`] is called when new data is available.
+    ///
+    /// # Notes
+    ///
+    /// This structure is used to reduce the indentation level of the main loop
+    /// in try_build
+    fn try_transition(self) -> Result<(Self, DecodeResult<RecordBatch>), 
ParquetError> {
+        match self {
+            Self::ReadingRowGroup {
+                mut remaining_row_groups,
+            } => {
+                match remaining_row_groups.try_next_reader()? {
+                    // If we have a next reader, we can transition to decoding 
it
+                    DecodeResult::Data(record_batch_reader) => {
+                        // Transition to decoding the row group
+                        Self::DecodingRowGroup {
+                            record_batch_reader: Box::new(record_batch_reader),
+                            remaining_row_groups,
+                        }
+                        .try_transition()
+                    }
+                    // If there are no more readers, we are finished
+                    DecodeResult::NeedsData(ranges) => {
+                        // If we need more data, we return the ranges needed 
and stay in Reading
+                        // RowGroup state
+                        Ok((
+                            Self::ReadingRowGroup {
+                                remaining_row_groups,
+                            },
+                            DecodeResult::NeedsData(ranges),
+                        ))
+                    }
+                    DecodeResult::Finished => {
+                        // No more row groups to read, we are finished
+                        Ok((Self::Finished, DecodeResult::Finished))
+                    }
+                }
+            }
+            Self::DecodingRowGroup {
+                mut record_batch_reader,
+                remaining_row_groups,
+            } => {
+                // Decide the next record batch
+                match record_batch_reader.next() {
+                    Some(Ok(batch)) => {
+                        // Successfully decoded a batch, return it
+                        Ok((
+                            Self::DecodingRowGroup {
+                                record_batch_reader,
+                                remaining_row_groups,
+                            },
+                            DecodeResult::Data(batch),
+                        ))
+                    }
+                    None => {
+                        // No more batches in this row group, move to the next 
row group
+                        // or finish if there are no more row groups
+                        Self::ReadingRowGroup {
+                            remaining_row_groups,
+                        }
+                        .try_transition()
+                    }
+                    Some(Err(e)) => Err(ParquetError::from(e)), // some error 
occurred while decoding
+                }
+            }
+            Self::Finished => Ok((Self::Finished, DecodeResult::Finished)),
+        }
+    }
+
+    /// Push data, and transition state if needed
+    ///
+    /// This should correspond to the data ranges requested by the decoder
+    pub fn push_data(
+        self,
+        ranges: Vec<Range<u64>>,
+        data: Vec<Bytes>,
+    ) -> Result<Self, ParquetError> {
+        match self {
+            ParquetDecoderState::ReadingRowGroup {
+                mut remaining_row_groups,
+            } => {
+                // Push data to the RowGroupReaderBuilder
+                remaining_row_groups.push_data(ranges, data);
+                Ok(ParquetDecoderState::ReadingRowGroup {
+                    remaining_row_groups,
+                })
+            }
+            // it is ok to get data before we asked for it
+            ParquetDecoderState::DecodingRowGroup {
+                record_batch_reader,
+                mut remaining_row_groups,
+            } => {
+                remaining_row_groups.push_data(ranges, data);
+                Ok(ParquetDecoderState::DecodingRowGroup {
+                    record_batch_reader,
+                    remaining_row_groups,
+                })
+            }
+            ParquetDecoderState::Finished => Err(ParquetError::General(
+                "Cannot push data to a finished decoder".to_string(),
+            )),
+        }
+    }
+
+    /// How many bytes are currently buffered in the decoder?
+    fn buffered_bytes(&self) -> u64 {
+        match self {
+            ParquetDecoderState::ReadingRowGroup {
+                remaining_row_groups,
+            } => remaining_row_groups.buffered_bytes(),
+            ParquetDecoderState::DecodingRowGroup {
+                record_batch_reader: _,
+                remaining_row_groups,
+            } => remaining_row_groups.buffered_bytes(),
+            ParquetDecoderState::Finished => 0,
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::DecodeResult;
+    use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, 
RowSelection, RowSelector};
+    use crate::arrow::push_decoder::{ParquetPushDecoder, 
ParquetPushDecoderBuilder};
+    use crate::arrow::{ArrowWriter, ProjectionMask};
+    use crate::errors::ParquetError;
+    use crate::file::metadata::ParquetMetaDataPushDecoder;
+    use crate::file::properties::WriterProperties;
+    use arrow::compute::kernels::cmp::{gt, lt};
+    use arrow_array::cast::AsArray;
+    use arrow_array::types::Int64Type;
+    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
+    use arrow_select::concat::concat_batches;
+    use bytes::Bytes;
+    use std::fmt::Debug;
+    use std::ops::Range;
+    use std::sync::{Arc, LazyLock};
+
+    /// Test decoder struct size (as they are copied around on each 
transition, they
+    /// should not grow too large)
+    #[test]
+    fn test_decoder_size() {
+        assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
+    }
+
+    /// Decode the entire file at once, simulating a scenario where all data is
+    /// available in memory
+    #[test]
+    fn test_decoder_all_data() {
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap()
+        .build()
+        .unwrap();
+
+        decoder
+            .push_range(test_file_range(), TEST_FILE_DATA.clone())
+            .unwrap();
+
+        let results = vec![
+            // first row group should be decoded without needing more data
+            expect_data(decoder.try_decode()),
+            // second row group should be decoded without needing more data
+            expect_data(decoder.try_decode()),
+        ];
+        expect_finished(decoder.try_decode());
+
+        let all_output = concat_batches(&TEST_BATCH.schema(), 
&results).unwrap();
+        // Check that the output matches the input batch
+        assert_eq!(all_output, *TEST_BATCH);
+    }
+
+    /// Decode the entire file incrementally, simulating a scenario where data 
is
+    /// fetched as needed
+    #[test]
+    fn test_decoder_incremental() {
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap()
+        .build()
+        .unwrap();
+
+        let mut results = vec![];
+
+        // First row group, expect a single request
+        let ranges = expect_needs_data(decoder.try_decode());
+        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - 
r.start).sum();
+        push_ranges_to_decoder(&mut decoder, ranges);
+        // The decoder should currently only store the data it needs to decode 
the first row group
+        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
+        results.push(expect_data(decoder.try_decode()));
+        // the decoder should have consumed the data for the first row group 
and freed it
+        assert_eq!(decoder.buffered_bytes(), 0);
+
+        // Second row group,
+        let ranges = expect_needs_data(decoder.try_decode());
+        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - 
r.start).sum();
+        push_ranges_to_decoder(&mut decoder, ranges);
+        // The decoder should currently only store the data it needs to decode 
the second row group
+        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
+        results.push(expect_data(decoder.try_decode()));
+        // the decoder should have consumed the data for the second row group 
and freed it
+        assert_eq!(decoder.buffered_bytes(), 0);
+        expect_finished(decoder.try_decode());
+
+        // Check that the output matches the input batch
+        let all_output = concat_batches(&TEST_BATCH.schema(), 
&results).unwrap();
+        assert_eq!(all_output, *TEST_BATCH);
+    }
+
+    /// Decode the entire file incrementally,  simulating partial reads
+    #[test]
+    fn test_decoder_partial() {
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap()
+        .build()
+        .unwrap();
+
+        // First row group, expect a single request for all data needed to 
read "a" and "b"
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch1 = expect_data(decoder.try_decode());
+        let expected1 = TEST_BATCH.slice(0, 200);
+        assert_eq!(batch1, expected1);
+
+        // Second row group, this time provide the data in two steps
+        let ranges = expect_needs_data(decoder.try_decode());
+        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
+        assert!(!ranges1.is_empty());
+        assert!(!ranges2.is_empty());
+        // push first half to simulate partial read
+        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
+
+        // still expect more data
+        let ranges = expect_needs_data(decoder.try_decode());
+        assert_eq!(ranges, ranges2); // should be the remaining ranges
+        // push empty ranges should be a no-op
+        push_ranges_to_decoder(&mut decoder, vec![]);
+        let ranges = expect_needs_data(decoder.try_decode());
+        assert_eq!(ranges, ranges2); // should be the remaining ranges
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch2 = expect_data(decoder.try_decode());
+        let expected2 = TEST_BATCH.slice(200, 200);
+        assert_eq!(batch2, expected2);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    /// Decode multiple columns "a" and "b", expect that the decoder requests
+    /// only a single request per row group
+    #[test]
+    fn test_decoder_selection_does_one_request() {
+        let builder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap();
+
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        let mut decoder = builder
+            .with_projection(
+                ProjectionMask::columns(&schema_descr, ["a", "b"]), // read 
"a", "b"
+            )
+            .build()
+            .unwrap();
+
+        // First row group, expect a single request for all data needed to 
read "a" and "b"
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch1 = expect_data(decoder.try_decode());
+        let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
+        assert_eq!(batch1, expected1);
+
+        // Second row group, similarly expect a single request for all data 
needed to read "a" and "b"
+        let ranges = expect_needs_data(decoder.try_decode());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let batch2 = expect_data(decoder.try_decode());
+        let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
+        assert_eq!(batch2, expected2);
+
+        expect_finished(decoder.try_decode());
+    }
+
+    /// Decode with a filter that requires multiple requests, but only provide 
part
+    /// of the data needed for the filter at a time simulating partial reads.
+    #[test]
+    fn test_decoder_single_filter_partial() {
+        let builder = ParquetPushDecoderBuilder::try_new_decoder(
+            test_file_len(),
+            test_file_parquet_metadata(),
+        )
+        .unwrap();
+
+        // Values in column "a" range 0..399
+        // First filter: "a" > 250  (nothing in Row Group 0, both data pages 
in Row Group 1)
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        // a > 250
+        let row_filter_a = ArrowPredicateFn::new(
+            // claim to use both a and b so we get two ranges requests for the 
filter pages
+            ProjectionMask::columns(&schema_descr, ["a", "b"]),
+            |batch: RecordBatch| {
+                let scalar_250 = Int64Array::new_scalar(250);
+                let column = batch.column(0).as_primitive::<Int64Type>();
+                gt(column, &scalar_250)
+            },
+        );
+
+        let mut decoder = builder
+            .with_projection(
+                // read only column "a" to test that filter pages are reused
+                ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
+            )
+            .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+            .build()
+            .unwrap();
+
+        // First row group,
+        let ranges = expect_needs_data(decoder.try_decode());
+        // only provide half the ranges
+        let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
+        assert!(!ranges1.is_empty());
+        assert!(!ranges2.is_empty());
+        push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
+        // still expect more data
+        let ranges = expect_needs_data(decoder.try_decode());
+        assert_eq!(ranges, ranges2); // should be the remaining ranges
+        let ranges = expect_needs_data(decoder.try_decode());
+        assert_eq!(ranges, ranges2); // should be the remaining ranges
+        push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
+
+        // expect the first row group to be filtered out (no rows match)

Review Comment:
   I think the idea here is that the predicate filters out all the rows in Row 
GRoup 0, so it needs to fetch the data pages for `a` to evaluate the predicate 
(`a > 250`) but no additional pages for `b`. I updated the comments to try and 
clarify what was going on a bit better
   
   ```rust
           // Since no rows in the first row group pass the filters, there is no
           // additional requests to read data pages for "b" here
   ```



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+    row_group_idx: usize,
+    row_count: usize,
+    plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+    Start {
+        row_group_info: RowGroupInfo,
+    },
+    /// Planning filters, but haven't yet requested data to evaluate them
+    Filters {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from prior filters
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        filter_info: FilterInfo,
+    },
+    /// Needs data to evaluate current filter
+    WaitingOnFilterData {
+        row_group_info: RowGroupInfo,
+        filter_info: FilterInfo,
+        data_request: DataRequest,
+    },
+    /// Know what data to actually read, after all predicates
+    StartData {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from the filtering phase
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Needs data to proceed with reading the output
+    WaitingOnData {
+        row_group_info: RowGroupInfo,
+        data_request: DataRequest,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Finished (or not yet started) reading this group
+    Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+    next_state: RowGroupDecoderState,
+    /// result to return, if any
+    ///
+    /// * `Some`: the processing should stop and return the result
+    /// * `None`: processing should continue
+    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+    /// The next state with no result.
+    ///
+    /// This indicates processing should continue
+    fn again(next_state: RowGroupDecoderState) -> Self {
+        Self {
+            next_state,
+            result: None,
+        }
+    }
+
+    /// Create a NextState with a result that should be returned
+    fn result(
+        next_state: RowGroupDecoderState,
+        result: DecodeResult<ParquetRecordBatchReader>,
+    ) -> Self {
+        Self {
+            next_state,
+            result: Some(result),
+        }
+    }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+    /// The output batch size
+    batch_size: usize,
+
+    /// What columns to project (produce in each output batch)
+    projection: ProjectionMask,
+
+    /// The Parquet file metadata
+    metadata: Arc<ParquetMetaData>,
+
+    /// Top level parquet schema and arrow schema mapping
+    fields: Option<Arc<ParquetField>>,
+
+    /// Optional filter
+    filter: Option<RowFilter>,
+
+    /// Limit to apply to remaining row groups (decremented as rows are read)
+    limit: Option<usize>,
+
+    /// Offset to apply to remaining row groups (decremented as rows are read)
+    offset: Option<usize>,
+
+    /// The size in bytes of the predicate cache
+    max_predicate_cache_size: usize,
+
+    /// The metrics collector
+    metrics: ArrowReaderMetrics,
+
+    /// Current state of the decoder.
+    ///
+    /// It is taken when processing, and must be put back before returning
+    /// it is a bug error if it is not put back after transitioning states.
+    state: Option<RowGroupDecoderState>,
+
+    /// The underlying data store
+    buffers: PushBuffers,
+}
+
+impl RowGroupReaderBuilder {
+    /// Create a new RowGroupReaderBuilder
+    #[expect(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        batch_size: usize,
+        projection: ProjectionMask,
+        metadata: Arc<ParquetMetaData>,
+        fields: Option<Arc<ParquetField>>,
+        filter: Option<RowFilter>,
+        limit: Option<usize>,
+        offset: Option<usize>,
+        metrics: ArrowReaderMetrics,
+        max_predicate_cache_size: usize,
+        buffers: PushBuffers,
+    ) -> Self {
+        Self {
+            batch_size,
+            projection,
+            metadata,
+            fields,
+            filter,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+            state: Some(RowGroupDecoderState::Finished),
+            buffers,
+        }
+    }
+
+    /// Push new data buffers that can be used to satisfy pending requests
+    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+        self.buffers.push_ranges(ranges, buffers);
+    }
+
+    /// Returns the total number of buffered bytes available
+    pub fn buffered_bytes(&self) -> u64 {
+        self.buffers.buffered_bytes()
+    }
+
+    /// take the current state, leaving None in its place.
+    ///
+    /// Returns an error if there the state wasn't put back after the previous
+    /// call to [`Self::take_state`].
+    ///
+    /// Any code that calls this method must ensure that the state is put back
+    /// before returning, otherwise the reader error next time it is called
+    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
+        self.state.take().ok_or_else(|| {
+            ParquetError::General(String::from(
+                "Internal Error: RowGroupReader in invalid state",
+            ))
+        })
+    }
+
+    /// Setup this reader to read the next row group
+    pub(crate) fn next_row_group(
+        &mut self,
+        row_group_idx: usize,
+        row_count: usize,
+        selection: Option<RowSelection>,
+    ) -> Result<(), ParquetError> {
+        let state = self.take_state()?;
+        if !matches!(state, RowGroupDecoderState::Finished) {
+            return Err(ParquetError::General(format!(
+                "Internal Error: next_row_group called while still reading a 
row group. Expected Finished state, got {state:?}"
+            )));
+        }
+        let plan_builder = 
ReadPlanBuilder::new(self.batch_size).with_selection(selection);
+
+        let row_group_info = RowGroupInfo {
+            row_group_idx,
+            row_count,
+            plan_builder,
+        };
+
+        self.state = Some(RowGroupDecoderState::Start { row_group_info });
+        Ok(())
+    }
+
+    /// Try to build the next `ParquetRecordBatchReader` from this 
RowGroupReader.
+    ///
+    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
+    /// ranges of data that are needed to proceed.
+    ///
+    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
+    /// `DecodeResult::Data`.
+    pub(crate) fn try_build(
+        &mut self,
+    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+        loop {

Review Comment:
   It isn't called concurrently. `try_transition` can change the state as 
decoding proceeds. If the decoder already has enough data to potentially move 
on to the next state it begins doing so immediately via this loop -- I will 
make this clearer. in comments



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+    row_group_idx: usize,
+    row_count: usize,
+    plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+    Start {
+        row_group_info: RowGroupInfo,
+    },
+    /// Planning filters, but haven't yet requested data to evaluate them
+    Filters {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from prior filters
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        filter_info: FilterInfo,
+    },
+    /// Needs data to evaluate current filter
+    WaitingOnFilterData {
+        row_group_info: RowGroupInfo,
+        filter_info: FilterInfo,
+        data_request: DataRequest,
+    },
+    /// Know what data to actually read, after all predicates
+    StartData {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from the filtering phase
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Needs data to proceed with reading the output
+    WaitingOnData {
+        row_group_info: RowGroupInfo,
+        data_request: DataRequest,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Finished (or not yet started) reading this group
+    Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+    next_state: RowGroupDecoderState,
+    /// result to return, if any
+    ///
+    /// * `Some`: the processing should stop and return the result
+    /// * `None`: processing should continue
+    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+    /// The next state with no result.
+    ///
+    /// This indicates processing should continue
+    fn again(next_state: RowGroupDecoderState) -> Self {
+        Self {
+            next_state,
+            result: None,
+        }
+    }
+
+    /// Create a NextState with a result that should be returned
+    fn result(
+        next_state: RowGroupDecoderState,
+        result: DecodeResult<ParquetRecordBatchReader>,
+    ) -> Self {
+        Self {
+            next_state,
+            result: Some(result),
+        }
+    }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+    /// The output batch size
+    batch_size: usize,
+
+    /// What columns to project (produce in each output batch)
+    projection: ProjectionMask,
+
+    /// The Parquet file metadata
+    metadata: Arc<ParquetMetaData>,
+
+    /// Top level parquet schema and arrow schema mapping
+    fields: Option<Arc<ParquetField>>,
+
+    /// Optional filter
+    filter: Option<RowFilter>,
+
+    /// Limit to apply to remaining row groups (decremented as rows are read)
+    limit: Option<usize>,
+
+    /// Offset to apply to remaining row groups (decremented as rows are read)
+    offset: Option<usize>,
+
+    /// The size in bytes of the predicate cache

Review Comment:
   added a link to RowGroupCache



##########
parquet/src/arrow/in_memory_row_group.rs:
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::RowGroups;
+use crate::arrow::arrow_reader::RowSelection;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
+use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
+use bytes::{Buf, Bytes};
+use std::ops::Range;
+use std::sync::Arc;
+
+/// An in-memory collection of column chunks
+#[derive(Debug)]
+pub(crate) struct InMemoryRowGroup<'a> {
+    pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
+    /// Column chunks for this row group
+    pub(crate) column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
+    pub(crate) row_count: usize,
+    pub(crate) row_group_idx: usize,
+    pub(crate) metadata: &'a ParquetMetaData,
+}
+
+/// What ranges to fetch for the columns in this row group
+#[derive(Debug)]
+pub(crate) struct FetchRanges {
+    /// The byte ranges to fetch
+    pub(crate) ranges: Vec<Range<u64>>,
+    /// If `Some`, the start offsets of each page for each column chunk
+    pub(crate) page_start_offsets: Option<Vec<Vec<u64>>>,
+}
+
+impl InMemoryRowGroup<'_> {
+    /// Returns the byte ranges to fetch for the columns specified in
+    /// `projection` and `selection`.
+    pub(crate) fn fetch_ranges(
+        &self,
+        projection: &ProjectionMask,
+        selection: Option<&RowSelection>,
+        batch_size: usize,
+        cache_mask: Option<&ProjectionMask>,
+    ) -> FetchRanges {
+        let metadata = self.metadata.row_group(self.row_group_idx);
+        if let Some((selection, offset_index)) = 
selection.zip(self.offset_index) {
+            let expanded_selection =
+                selection.expand_to_batch_boundaries(batch_size, 
self.row_count);
+
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
+
+            let ranges = self
+                .column_chunks
+                .iter()
+                .zip(metadata.columns())
+                .enumerate()
+                .filter(|&(idx, (chunk, _chunk_meta))| {
+                    chunk.is_none() && projection.leaf_included(idx)
+                })
+                .flat_map(|(idx, (_chunk, chunk_meta))| {
+                    // If the first page does not start at the beginning of 
the column,
+                    // then we need to also fetch a dictionary page.
+                    let mut ranges: Vec<Range<u64>> = vec![];
+                    let (start, _len) = chunk_meta.byte_range();
+                    match offset_index[idx].page_locations.first() {
+                        Some(first) if first.offset as u64 != start => {
+                            ranges.push(start..first.offset as u64);
+                        }
+                        _ => (),
+                    }
+
+                    // Expand selection to batch boundaries only for cached 
columns
+                    let use_expanded = cache_mask.map(|m| 
m.leaf_included(idx)).unwrap_or(false);
+                    if use_expanded {
+                        ranges.extend(
+                            
expanded_selection.scan_ranges(&offset_index[idx].page_locations),
+                        );
+                    } else {
+                        
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
+                    }
+                    page_start_offsets.push(ranges.iter().map(|range| 
range.start).collect());
+
+                    ranges
+                })
+                .collect();
+            FetchRanges {
+                ranges,
+                page_start_offsets: Some(page_start_offsets),
+            }
+        } else {
+            let ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .filter(|&(idx, chunk)| chunk.is_none() && 
projection.leaf_included(idx))
+                .map(|(idx, _chunk)| {
+                    let column = metadata.column(idx);
+                    let (start, length) = column.byte_range();
+                    start..(start + length)
+                })
+                .collect();
+            FetchRanges {
+                ranges,
+                page_start_offsets: None,
+            }
+        }
+    }
+
+    /// Fills in `self.column_chunks` with the data fetched from `chunk_data`.
+    ///
+    /// This function **must** be called with the data from the ranges 
returned by
+    /// `fetch_ranges` and the corresponding page_start_offsets, with the 
exact same and `selection`.
+    pub(crate) fn fill_column_chunks<I>(
+        &mut self,
+        projection: &ProjectionMask,
+        page_start_offsets: Option<Vec<Vec<u64>>>,
+        chunk_data: I,
+    ) where
+        I: IntoIterator<Item = Bytes>,
+    {
+        let mut chunk_data = chunk_data.into_iter();
+        let metadata = self.metadata.row_group(self.row_group_idx);
+        if let Some(page_start_offsets) = page_start_offsets {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut page_start_offsets = page_start_offsets.into_iter();
+
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(offsets) = page_start_offsets.next() {
+                    let mut chunks = Vec::with_capacity(offsets.len());
+                    for _ in 0..offsets.len() {
+                        chunks.push(chunk_data.next().unwrap());
+                    }
+
+                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
+                        length: metadata.column(idx).byte_range().1 as usize,
+                        data: offsets
+                            .into_iter()
+                            .map(|x| x as usize)
+                            .zip(chunks.into_iter())
+                            .collect(),
+                    }))
+                }
+            }
+        } else {
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(data) = chunk_data.next() {
+                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
+                        offset: metadata.column(idx).byte_range().0 as usize,
+                        data,
+                    }));
+                }
+            }
+        }
+    }
+}
+
+impl RowGroups for InMemoryRowGroup<'_> {
+    fn num_rows(&self) -> usize {
+        self.row_count
+    }
+
+    /// Return chunks for column i
+    fn column_chunks(&self, i: usize) -> crate::errors::Result<Box<dyn 
PageIterator>> {
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {i}, column was not fetched"
+            ))),
+            Some(data) => {
+                let page_locations = self
+                    .offset_index
+                    // filter out empty offset indexes (old versions specified 
Some(vec![]) when no present)
+                    .filter(|index| !index.is_empty())
+                    .map(|index| index[i].page_locations.clone());
+                let column_chunk_metadata = 
self.metadata.row_group(self.row_group_idx).column(i);
+                let page_reader = SerializedPageReader::new(
+                    data.clone(),
+                    column_chunk_metadata,
+                    self.row_count,
+                    page_locations,
+                )?;
+                let page_reader = page_reader.add_crypto_context(
+                    self.row_group_idx,
+                    i,
+                    self.metadata,
+                    column_chunk_metadata,
+                )?;
+
+                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
+
+                Ok(Box::new(ColumnChunkIterator {
+                    reader: Some(Ok(page_reader)),
+                }))
+            }
+        }
+    }
+}
+
+/// An in-memory column chunk
+#[derive(Clone, Debug)]
+pub(crate) enum ColumnChunkData {
+    /// Column chunk data representing only a subset of data pages
+    Sparse {
+        /// Length of the full column chunk
+        length: usize,
+        /// Subset of data pages included in this sparse chunk.
+        ///
+        /// Each element is a tuple of (page offset within file, page data).
+        /// Each entry is a complete page and the list is ordered by offset.
+        data: Vec<(usize, Bytes)>,
+    },
+    /// Full column chunk and the offset within the original file
+    Dense { offset: usize, data: Bytes },

Review Comment:
   Thanks -- this is just giving a name to what was already happening in 
https://github.com/apache/arrow-rs/blob/b9c2bf73e792e7cb849f0bd453059ceef45b0b74/parquet/src/arrow/async_reader/mod.rs#L989-L1083
   
   (but was stored in local variable names)



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+    row_group_idx: usize,
+    row_count: usize,
+    plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+    Start {
+        row_group_info: RowGroupInfo,
+    },
+    /// Planning filters, but haven't yet requested data to evaluate them
+    Filters {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from prior filters
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        filter_info: FilterInfo,
+    },
+    /// Needs data to evaluate current filter
+    WaitingOnFilterData {
+        row_group_info: RowGroupInfo,
+        filter_info: FilterInfo,
+        data_request: DataRequest,
+    },
+    /// Know what data to actually read, after all predicates
+    StartData {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from the filtering phase
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Needs data to proceed with reading the output
+    WaitingOnData {
+        row_group_info: RowGroupInfo,
+        data_request: DataRequest,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Finished (or not yet started) reading this group
+    Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+    next_state: RowGroupDecoderState,
+    /// result to return, if any
+    ///
+    /// * `Some`: the processing should stop and return the result
+    /// * `None`: processing should continue
+    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+    /// The next state with no result.
+    ///
+    /// This indicates processing should continue
+    fn again(next_state: RowGroupDecoderState) -> Self {
+        Self {
+            next_state,
+            result: None,
+        }
+    }
+
+    /// Create a NextState with a result that should be returned
+    fn result(
+        next_state: RowGroupDecoderState,
+        result: DecodeResult<ParquetRecordBatchReader>,
+    ) -> Self {
+        Self {
+            next_state,
+            result: Some(result),
+        }
+    }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+    /// The output batch size
+    batch_size: usize,
+
+    /// What columns to project (produce in each output batch)
+    projection: ProjectionMask,
+
+    /// The Parquet file metadata
+    metadata: Arc<ParquetMetaData>,
+
+    /// Top level parquet schema and arrow schema mapping
+    fields: Option<Arc<ParquetField>>,
+
+    /// Optional filter
+    filter: Option<RowFilter>,
+
+    /// Limit to apply to remaining row groups (decremented as rows are read)
+    limit: Option<usize>,
+
+    /// Offset to apply to remaining row groups (decremented as rows are read)
+    offset: Option<usize>,
+
+    /// The size in bytes of the predicate cache
+    max_predicate_cache_size: usize,
+
+    /// The metrics collector
+    metrics: ArrowReaderMetrics,
+
+    /// Current state of the decoder.
+    ///
+    /// It is taken when processing, and must be put back before returning
+    /// it is a bug error if it is not put back after transitioning states.
+    state: Option<RowGroupDecoderState>,
+
+    /// The underlying data store
+    buffers: PushBuffers,
+}
+
+impl RowGroupReaderBuilder {
+    /// Create a new RowGroupReaderBuilder
+    #[expect(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        batch_size: usize,
+        projection: ProjectionMask,
+        metadata: Arc<ParquetMetaData>,
+        fields: Option<Arc<ParquetField>>,
+        filter: Option<RowFilter>,
+        limit: Option<usize>,
+        offset: Option<usize>,
+        metrics: ArrowReaderMetrics,
+        max_predicate_cache_size: usize,
+        buffers: PushBuffers,
+    ) -> Self {
+        Self {
+            batch_size,
+            projection,
+            metadata,
+            fields,
+            filter,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+            state: Some(RowGroupDecoderState::Finished),
+            buffers,
+        }
+    }
+
+    /// Push new data buffers that can be used to satisfy pending requests
+    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+        self.buffers.push_ranges(ranges, buffers);
+    }
+
+    /// Returns the total number of buffered bytes available
+    pub fn buffered_bytes(&self) -> u64 {
+        self.buffers.buffered_bytes()
+    }
+
+    /// take the current state, leaving None in its place.
+    ///
+    /// Returns an error if there the state wasn't put back after the previous
+    /// call to [`Self::take_state`].
+    ///
+    /// Any code that calls this method must ensure that the state is put back
+    /// before returning, otherwise the reader error next time it is called
+    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
+        self.state.take().ok_or_else(|| {
+            ParquetError::General(String::from(
+                "Internal Error: RowGroupReader in invalid state",
+            ))
+        })
+    }
+
+    /// Setup this reader to read the next row group
+    pub(crate) fn next_row_group(
+        &mut self,
+        row_group_idx: usize,
+        row_count: usize,
+        selection: Option<RowSelection>,
+    ) -> Result<(), ParquetError> {
+        let state = self.take_state()?;
+        if !matches!(state, RowGroupDecoderState::Finished) {
+            return Err(ParquetError::General(format!(
+                "Internal Error: next_row_group called while still reading a 
row group. Expected Finished state, got {state:?}"
+            )));
+        }
+        let plan_builder = 
ReadPlanBuilder::new(self.batch_size).with_selection(selection);
+
+        let row_group_info = RowGroupInfo {
+            row_group_idx,
+            row_count,
+            plan_builder,
+        };
+
+        self.state = Some(RowGroupDecoderState::Start { row_group_info });
+        Ok(())
+    }
+
+    /// Try to build the next `ParquetRecordBatchReader` from this 
RowGroupReader.
+    ///
+    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
+    /// ranges of data that are needed to proceed.
+    ///
+    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
+    /// `DecodeResult::Data`.
+    pub(crate) fn try_build(
+        &mut self,
+    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+        loop {
+            let current_state = self.take_state()?;
+            match self.try_transition(current_state)? {
+                NextState {
+                    next_state,
+                    result: Some(result),
+                } => {
+                    // put back the next state
+                    self.state = Some(next_state);
+                    return Ok(result);
+                }
+                NextState {
+                    next_state,
+                    result: None,
+                } => {
+                    // continue processing
+                    self.state = Some(next_state);
+                }
+            }
+        }
+    }
+
+    /// Current state --> next state + optional output
+    ///
+    /// This is the main state transition function for the row group reader
+    /// and encodes the row group decoding state machine.
+    ///
+    /// # Notes
+    ///
+    /// This structure is used to reduce the indentation level of the main loop
+    /// in try_build
+    fn try_transition(
+        &mut self,
+        current_state: RowGroupDecoderState,
+    ) -> Result<NextState, ParquetError> {
+        let result = match current_state {
+            RowGroupDecoderState::Start { row_group_info } => {
+                let column_chunks = None; // no prior column chunks
+
+                let Some(filter) = self.filter.take() else {
+                    // no filter, start trying to read data immediately
+                    return Ok(NextState::again(RowGroupDecoderState::StartData 
{
+                        row_group_info,
+                        column_chunks,
+                        cache_info: None,
+                    }));
+                };
+                // no predicates in filter, so start reading immediately
+                if filter.predicates.is_empty() {
+                    return Ok(NextState::again(RowGroupDecoderState::StartData 
{
+                        row_group_info,
+                        column_chunks,
+                        cache_info: None,
+                    }));
+                };

Review Comment:
   Since it happens for each row group (aka  `row_group_info` changes on each 
row group) I don't think it could be in `RowGroupReaderBuilder::new()`
   
   We could possibly inline it in `RowGroupReaderBuilder::next_row_group()` 🤔  
   
https://github.com/apache/arrow-rs/blob/10aae0fc6bc8020f77d584a5fa3cc0c5da605211/parquet/src/arrow/push_decoder/reader_builder/mod.rs#L244-L243
   
   But I am not sure how much better that would be as it just moves the code 
around?



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -0,0 +1,1150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
+//! caller (rather than from an underlying reader).
+
+mod reader_builder;
+mod remaining;
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{
+    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
+};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use reader_builder::RowGroupReaderBuilder;
+use remaining::RemainingRowGroups;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// A builder for [`ParquetPushDecoder`].
+///
+/// To create a new decoder, use 
[`ParquetPushDecoderBuilder::try_new_decoder`] and pass
+/// the file length and metadata of the Parquet file to decode.
+///
+/// You can decode the metadata from a Parquet file using either
+/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
+///
+/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
+/// [`ParquetMetaDataPushDecoder`]: 
crate::file::metadata::ParquetMetaDataPushDecoder
+///
+/// Note the "input" type is `u64` which represents the length of the Parquet 
file
+/// being decoded. This is needed to initialize the internal buffers that track
+/// what data has been provided to the decoder.
+///
+/// # Example
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # let file_bytes = {
+/// #   let mut buffer = vec![];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!("failed to decode 
metadata") };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// // The file length and metadata are required to create the decoder
+/// let mut decoder =
+///     ParquetPushDecoderBuilder::try_new_decoder(file_length, 
parquet_metadata)
+///       .unwrap()
+///       // Optionally configure the decoder, e.g. batch size
+///       .with_batch_size(1024)
+///       // Build the decoder
+///       .build()
+///       .unwrap();
+///
+///     // In a loop, ask the decoder what it needs next, and provide it with 
the required data
+///     loop {
+///         match decoder.try_decode().unwrap() {
+///             DecodeResult::NeedsData(ranges) => {
+///                 // The decoder needs more data. Fetch the data for the 
given ranges
+///                 let data = ranges.iter().map(|r| 
get_range(r)).collect::<Vec<_>>();
+///                 // Push the data to the decoder
+///                 decoder.push_ranges(ranges, data).unwrap();
+///                 // After pushing the data, we can try to decode again on 
the next iteration
+///             }
+///             DecodeResult::Data(batch) => {
+///                 // Successfully decoded a batch of data
+///                 assert!(batch.num_rows() > 0);
+///             }
+///             DecodeResult::Finished => {
+///                 // The decoder has finished decoding exit the loop
+///                 break;
+///             }

Review Comment:
   I am also pleased as it mirrors the Metadata decoder one as well: 
https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataPushDecoder.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to