adriangb commented on code in PR #7997:
URL: https://github.com/apache/arrow-rs/pull/7997#discussion_r2444873624


##########
parquet/src/arrow/push_decoder/reader_builder/filter.rs:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates
+///
+/// This is somewhat more complicated than one might expect because the
+/// RowFilter must be owned by the FilterInfo so that predicates can
+/// be evaluated (requires mutable access).
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// These must be owned by FilterInfo because they may be mutated as part 
of
+    /// evaluation so there is a bunch of complexity of handing them back and 
forth
+    filter: RowFilter,
+    /// The next filter to be evaluated
+    next_predicate: NonZeroUsize,
+    /// Stores previously computed filter results
+    cache_info: CacheInfo,
+}
+
+/// Predicate cache
+///
+/// Note this is basically the same as CacheOptionsBuilder
+/// but it owns the ProjectionMask and RowGroupCache
+#[derive(Debug)]
+pub(super) struct CacheInfo {
+    /// The columns to cache in the predicate cache
+    cache_projection: ProjectionMask,
+    row_group_cache: Arc<Mutex<RowGroupCache>>,
+}
+
+impl CacheInfo {
+    pub(super) fn new(
+        cache_projection: ProjectionMask,
+        row_group_cache: Arc<Mutex<RowGroupCache>>,
+    ) -> Self {
+        Self {
+            cache_projection,
+            row_group_cache,
+        }
+    }
+
+    pub(super) fn builder(&self) -> CacheOptionsBuilder<'_> {
+        CacheOptionsBuilder::new(&self.cache_projection, &self.row_group_cache)
+    }
+}
+
+pub(super) enum AdvanceResult {
+    /// advanced to the next predicate

Review Comment:
   ```suggestion
       /// Advanced to the next predicate
   ```



##########
parquet/src/arrow/push_decoder/reader_builder/filter.rs:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates
+///
+/// This is somewhat more complicated than one might expect because the
+/// RowFilter must be owned by the FilterInfo so that predicates can
+/// be evaluated (requires mutable access).
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// These must be owned by FilterInfo because they may be mutated as part 
of
+    /// evaluation so there is a bunch of complexity of handing them back and 
forth
+    filter: RowFilter,
+    /// The next filter to be evaluated
+    next_predicate: NonZeroUsize,

Review Comment:
   Seeing now that we just maintain a mutable index into RowFilter (i.e. don't 
mutate `RowFilter` itself I'm guessing)



##########
parquet/src/arrow/push_decoder/reader_builder/filter.rs:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates
+///
+/// This is somewhat more complicated than one might expect because the
+/// RowFilter must be owned by the FilterInfo so that predicates can
+/// be evaluated (requires mutable access).
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// These must be owned by FilterInfo because they may be mutated as part 
of
+    /// evaluation so there is a bunch of complexity of handing them back and 
forth
+    filter: RowFilter,
+    /// The next filter to be evaluated
+    next_predicate: NonZeroUsize,
+    /// Stores previously computed filter results
+    cache_info: CacheInfo,
+}
+
+/// Predicate cache
+///
+/// Note this is basically the same as CacheOptionsBuilder
+/// but it owns the ProjectionMask and RowGroupCache
+#[derive(Debug)]
+pub(super) struct CacheInfo {
+    /// The columns to cache in the predicate cache
+    cache_projection: ProjectionMask,
+    row_group_cache: Arc<Mutex<RowGroupCache>>,
+}
+
+impl CacheInfo {
+    pub(super) fn new(
+        cache_projection: ProjectionMask,
+        row_group_cache: Arc<Mutex<RowGroupCache>>,
+    ) -> Self {
+        Self {
+            cache_projection,
+            row_group_cache,
+        }
+    }
+
+    pub(super) fn builder(&self) -> CacheOptionsBuilder<'_> {
+        CacheOptionsBuilder::new(&self.cache_projection, &self.row_group_cache)
+    }
+}
+
+pub(super) enum AdvanceResult {
+    /// advanced to the next predicate
+    Continue(FilterInfo),
+    /// no more predicates returns the row filter and cache info

Review Comment:
   ```suggestion
       /// No more predicates returns the row filter and cache info
   ```



##########
parquet/src/arrow/push_decoder/reader_builder/filter.rs:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates
+///
+/// This is somewhat more complicated than one might expect because the
+/// RowFilter must be owned by the FilterInfo so that predicates can
+/// be evaluated (requires mutable access).
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// These must be owned by FilterInfo because they may be mutated as part 
of
+    /// evaluation so there is a bunch of complexity of handing them back and 
forth

Review Comment:
   Having some familiarity with what is happening in the levels above this I 
believe it needs to be mutable because the mask of one filter is fed into the 
next / it's a stack of filters we pop from. I will try to confirm as I read 
through the rest of the code but a comment explaining why / what mutable state 
is used for would be helpful.



##########
parquet/src/arrow/push_decoder/reader_builder/filter.rs:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates
+///
+/// This is somewhat more complicated than one might expect because the
+/// RowFilter must be owned by the FilterInfo so that predicates can
+/// be evaluated (requires mutable access).
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// These must be owned by FilterInfo because they may be mutated as part 
of
+    /// evaluation so there is a bunch of complexity of handing them back and 
forth
+    filter: RowFilter,
+    /// The next filter to be evaluated
+    next_predicate: NonZeroUsize,
+    /// Stores previously computed filter results
+    cache_info: CacheInfo,
+}
+
+/// Predicate cache
+///
+/// Note this is basically the same as CacheOptionsBuilder
+/// but it owns the ProjectionMask and RowGroupCache
+#[derive(Debug)]
+pub(super) struct CacheInfo {
+    /// The columns to cache in the predicate cache
+    cache_projection: ProjectionMask,
+    row_group_cache: Arc<Mutex<RowGroupCache>>,
+}
+
+impl CacheInfo {
+    pub(super) fn new(
+        cache_projection: ProjectionMask,
+        row_group_cache: Arc<Mutex<RowGroupCache>>,
+    ) -> Self {
+        Self {
+            cache_projection,
+            row_group_cache,
+        }
+    }
+
+    pub(super) fn builder(&self) -> CacheOptionsBuilder<'_> {
+        CacheOptionsBuilder::new(&self.cache_projection, &self.row_group_cache)
+    }
+}
+
+pub(super) enum AdvanceResult {
+    /// advanced to the next predicate
+    Continue(FilterInfo),
+    /// no more predicates returns the row filter and cache info
+    Done(RowFilter, CacheInfo),
+}
+
+impl FilterInfo {
+    /// Create a new FilterInfo
+    pub(super) fn new(filter: RowFilter, cache_info: CacheInfo) -> Self {
+        Self {
+            filter,
+            next_predicate: NonZeroUsize::new(1).expect("1 is always 
non-zero"),
+            cache_info,
+        }
+    }
+
+    /// Advance to the next predicate, returning either the updated FilterInfo
+    /// or the completed RowFilter if there are no more predicates
+    pub(super) fn advance(mut self) -> AdvanceResult {
+        if self.next_predicate.get() >= self.filter.predicates.len() {
+            AdvanceResult::Done(self.filter, self.cache_info)
+        } else {
+            self.next_predicate = self
+                .next_predicate
+                .checked_add(1)
+                .expect("no usize overflow");
+            AdvanceResult::Continue(self)
+        }
+    }
+
+    /// Return the current predicate to evaluate, mutablely

Review Comment:
   ```suggestion
       /// Return the current predicate to evaluate as a mutable reference
   ```



##########
parquet/src/arrow/push_decoder/reader_builder/filter.rs:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates
+///
+/// This is somewhat more complicated than one might expect because the
+/// RowFilter must be owned by the FilterInfo so that predicates can
+/// be evaluated (requires mutable access).
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// These must be owned by FilterInfo because they may be mutated as part 
of
+    /// evaluation so there is a bunch of complexity of handing them back and 
forth
+    filter: RowFilter,
+    /// The next filter to be evaluated
+    next_predicate: NonZeroUsize,
+    /// Stores previously computed filter results
+    cache_info: CacheInfo,
+}
+
+/// Predicate cache
+///
+/// Note this is basically the same as CacheOptionsBuilder
+/// but it owns the ProjectionMask and RowGroupCache
+#[derive(Debug)]
+pub(super) struct CacheInfo {
+    /// The columns to cache in the predicate cache

Review Comment:
   ```suggestion
       /// The columns to cache in the predicate cache.
       /// Normally these are the columns that filters may look at such that
       /// if we have a filter like `(a + 10 > 5) AND (a + b = 0)` we cache `a` 
to avoid re-reading it between evaluating `a + 10 > 5` and `a + b = 0`.
   ```
   
   ?



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+    row_group_idx: usize,
+    row_count: usize,
+    plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+    Start {
+        row_group_info: RowGroupInfo,
+    },
+    /// Planning filters, but haven't yet requested data to evaluate them
+    Filters {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from prior filters
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        filter_info: FilterInfo,
+    },
+    /// Needs data to evaluate current filter
+    WaitingOnFilterData {
+        row_group_info: RowGroupInfo,
+        filter_info: FilterInfo,
+        data_request: DataRequest,
+    },
+    /// Know what data to actually read, after all predicates
+    StartData {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from the filtering phase
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Needs data to proceed with reading the output
+    WaitingOnData {
+        row_group_info: RowGroupInfo,
+        data_request: DataRequest,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Finished (or not yet started) reading this group
+    Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+    next_state: RowGroupDecoderState,
+    /// result to return, if any
+    ///
+    /// * `Some`: the processing should stop and return the result
+    /// * `None`: processing should continue
+    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+    /// The next state with no result.
+    ///
+    /// This indicates processing should continue
+    fn again(next_state: RowGroupDecoderState) -> Self {
+        Self {
+            next_state,
+            result: None,
+        }
+    }
+
+    /// Create a NextState with a result that should be returned
+    fn result(
+        next_state: RowGroupDecoderState,
+        result: DecodeResult<ParquetRecordBatchReader>,
+    ) -> Self {
+        Self {
+            next_state,
+            result: Some(result),
+        }
+    }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+    /// The output batch size
+    batch_size: usize,
+
+    /// What columns to project (produce in each output batch)
+    projection: ProjectionMask,
+
+    /// The Parquet file metadata
+    metadata: Arc<ParquetMetaData>,
+
+    /// Top level parquet schema and arrow schema mapping
+    fields: Option<Arc<ParquetField>>,
+
+    /// Optional filter
+    filter: Option<RowFilter>,
+
+    /// Limit to apply to remaining row groups (decremented as rows are read)
+    limit: Option<usize>,
+
+    /// Offset to apply to remaining row groups (decremented as rows are read)
+    offset: Option<usize>,
+
+    /// The size in bytes of the predicate cache
+    max_predicate_cache_size: usize,
+
+    /// The metrics collector
+    metrics: ArrowReaderMetrics,
+
+    /// Current state of the decoder.
+    ///
+    /// It is taken when processing, and must be put back before returning
+    /// it is a bug error if it is not put back after transitioning states.
+    state: Option<RowGroupDecoderState>,
+
+    /// The underlying data store
+    buffers: PushBuffers,
+}
+
+impl RowGroupReaderBuilder {
+    /// Create a new RowGroupReaderBuilder
+    #[expect(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        batch_size: usize,
+        projection: ProjectionMask,
+        metadata: Arc<ParquetMetaData>,
+        fields: Option<Arc<ParquetField>>,
+        filter: Option<RowFilter>,
+        limit: Option<usize>,
+        offset: Option<usize>,
+        metrics: ArrowReaderMetrics,
+        max_predicate_cache_size: usize,
+        buffers: PushBuffers,
+    ) -> Self {
+        Self {
+            batch_size,
+            projection,
+            metadata,
+            fields,
+            filter,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+            state: Some(RowGroupDecoderState::Finished),
+            buffers,
+        }
+    }
+
+    /// Push new data buffers that can be used to satisfy pending requests
+    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+        self.buffers.push_ranges(ranges, buffers);
+    }
+
+    /// Returns the total number of buffered bytes available
+    pub fn buffered_bytes(&self) -> u64 {
+        self.buffers.buffered_bytes()
+    }
+
+    /// take the current state, leaving None in its place.
+    ///
+    /// Returns an error if there the state wasn't put back after the previous
+    /// call to [`Self::take_state`].
+    ///
+    /// Any code that calls this method must ensure that the state is put back
+    /// before returning, otherwise the reader error next time it is called

Review Comment:
   ```suggestion
       /// before returning, otherwise the reader will error next time it is 
called
   ```



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+    row_group_idx: usize,
+    row_count: usize,
+    plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+    Start {
+        row_group_info: RowGroupInfo,
+    },
+    /// Planning filters, but haven't yet requested data to evaluate them
+    Filters {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from prior filters
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        filter_info: FilterInfo,
+    },
+    /// Needs data to evaluate current filter
+    WaitingOnFilterData {
+        row_group_info: RowGroupInfo,
+        filter_info: FilterInfo,
+        data_request: DataRequest,
+    },
+    /// Know what data to actually read, after all predicates
+    StartData {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from the filtering phase
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Needs data to proceed with reading the output
+    WaitingOnData {
+        row_group_info: RowGroupInfo,
+        data_request: DataRequest,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Finished (or not yet started) reading this group
+    Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+    next_state: RowGroupDecoderState,
+    /// result to return, if any
+    ///
+    /// * `Some`: the processing should stop and return the result
+    /// * `None`: processing should continue
+    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+    /// The next state with no result.
+    ///
+    /// This indicates processing should continue
+    fn again(next_state: RowGroupDecoderState) -> Self {
+        Self {
+            next_state,
+            result: None,
+        }
+    }
+
+    /// Create a NextState with a result that should be returned
+    fn result(
+        next_state: RowGroupDecoderState,
+        result: DecodeResult<ParquetRecordBatchReader>,
+    ) -> Self {
+        Self {
+            next_state,
+            result: Some(result),
+        }
+    }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+    /// The output batch size
+    batch_size: usize,
+
+    /// What columns to project (produce in each output batch)
+    projection: ProjectionMask,
+
+    /// The Parquet file metadata
+    metadata: Arc<ParquetMetaData>,
+
+    /// Top level parquet schema and arrow schema mapping
+    fields: Option<Arc<ParquetField>>,
+
+    /// Optional filter
+    filter: Option<RowFilter>,
+
+    /// Limit to apply to remaining row groups (decremented as rows are read)
+    limit: Option<usize>,
+
+    /// Offset to apply to remaining row groups (decremented as rows are read)
+    offset: Option<usize>,
+
+    /// The size in bytes of the predicate cache
+    max_predicate_cache_size: usize,
+
+    /// The metrics collector
+    metrics: ArrowReaderMetrics,
+
+    /// Current state of the decoder.
+    ///
+    /// It is taken when processing, and must be put back before returning
+    /// it is a bug error if it is not put back after transitioning states.
+    state: Option<RowGroupDecoderState>,
+
+    /// The underlying data store
+    buffers: PushBuffers,
+}
+
+impl RowGroupReaderBuilder {
+    /// Create a new RowGroupReaderBuilder
+    #[expect(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        batch_size: usize,
+        projection: ProjectionMask,
+        metadata: Arc<ParquetMetaData>,
+        fields: Option<Arc<ParquetField>>,
+        filter: Option<RowFilter>,
+        limit: Option<usize>,
+        offset: Option<usize>,
+        metrics: ArrowReaderMetrics,
+        max_predicate_cache_size: usize,
+        buffers: PushBuffers,
+    ) -> Self {
+        Self {
+            batch_size,
+            projection,
+            metadata,
+            fields,
+            filter,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+            state: Some(RowGroupDecoderState::Finished),
+            buffers,
+        }
+    }
+
+    /// Push new data buffers that can be used to satisfy pending requests
+    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+        self.buffers.push_ranges(ranges, buffers);
+    }
+
+    /// Returns the total number of buffered bytes available
+    pub fn buffered_bytes(&self) -> u64 {
+        self.buffers.buffered_bytes()
+    }
+
+    /// take the current state, leaving None in its place.
+    ///
+    /// Returns an error if there the state wasn't put back after the previous
+    /// call to [`Self::take_state`].
+    ///
+    /// Any code that calls this method must ensure that the state is put back
+    /// before returning, otherwise the reader error next time it is called
+    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
+        self.state.take().ok_or_else(|| {
+            ParquetError::General(String::from(
+                "Internal Error: RowGroupReader in invalid state",
+            ))
+        })
+    }
+
+    /// Setup this reader to read the next row group
+    pub(crate) fn next_row_group(
+        &mut self,
+        row_group_idx: usize,
+        row_count: usize,
+        selection: Option<RowSelection>,
+    ) -> Result<(), ParquetError> {
+        let state = self.take_state()?;
+        if !matches!(state, RowGroupDecoderState::Finished) {
+            return Err(ParquetError::General(format!(
+                "Internal Error: next_row_group called while still reading a 
row group. Expected Finished state, got {state:?}"
+            )));
+        }
+        let plan_builder = 
ReadPlanBuilder::new(self.batch_size).with_selection(selection);
+
+        let row_group_info = RowGroupInfo {
+            row_group_idx,
+            row_count,
+            plan_builder,
+        };
+
+        self.state = Some(RowGroupDecoderState::Start { row_group_info });
+        Ok(())
+    }
+
+    /// Try to build the next `ParquetRecordBatchReader` from this 
RowGroupReader.
+    ///
+    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
+    /// ranges of data that are needed to proceed.
+    ///
+    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
+    /// `DecodeResult::Data`.
+    pub(crate) fn try_build(
+        &mut self,
+    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+        loop {

Review Comment:
   Is this called concurrently while something else is pushing bytes? It's not 
clear to me how the state changes between loop iterations



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -0,0 +1,1150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
+//! caller (rather than from an underlying reader).
+
+mod reader_builder;
+mod remaining;
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{
+    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
+};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use reader_builder::RowGroupReaderBuilder;
+use remaining::RemainingRowGroups;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// A builder for [`ParquetPushDecoder`].
+///
+/// To create a new decoder, use 
[`ParquetPushDecoderBuilder::try_new_decoder`] and pass
+/// the file length and metadata of the Parquet file to decode.
+///
+/// You can decode the metadata from a Parquet file using either
+/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
+///
+/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
+/// [`ParquetMetaDataPushDecoder`]: 
crate::file::metadata::ParquetMetaDataPushDecoder
+///
+/// Note the "input" type is `u64` which represents the length of the Parquet 
file
+/// being decoded. This is needed to initialize the internal buffers that track
+/// what data has been provided to the decoder.
+///
+/// # Example
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # let file_bytes = {
+/// #   let mut buffer = vec![];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!("failed to decode 
metadata") };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// // The file length and metadata are required to create the decoder
+/// let mut decoder =
+///     ParquetPushDecoderBuilder::try_new_decoder(file_length, 
parquet_metadata)
+///       .unwrap()
+///       // Optionally configure the decoder, e.g. batch size
+///       .with_batch_size(1024)
+///       // Build the decoder
+///       .build()
+///       .unwrap();
+///
+///     // In a loop, ask the decoder what it needs next, and provide it with 
the required data
+///     loop {
+///         match decoder.try_decode().unwrap() {

Review Comment:
   I'm guessing if I wanted to "look ahead" we'd add a method along the lines 
of `try_peek()`? It'd be cool if it returned some structure that allowed fine 
grained control of the peeking:
   
   ```rust
   let max_ranges = 32;
   let max_bytes = 1024 * 1024 * 32;
   let mut current_bytes = 0;
   let mut ranges = Vec::new();
   let mut peek = decoder.peek()
   loop {
       match peek.next() {
           PeekResult::Range(range) => {
               ranges.push(range);
               current_bytes += range.end - range.start;
               if ranges.len() > max_ranges { break }
               if current_bytes > max_bytes { break }
           PeekResult::End { break }
       }
   }
   ```
   



##########
parquet/src/arrow/push_decoder/reader_builder/filter.rs:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates
+///
+/// This is somewhat more complicated than one might expect because the
+/// RowFilter must be owned by the FilterInfo so that predicates can
+/// be evaluated (requires mutable access).
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+    /// The predicates to evaluate, in order
+    ///
+    /// These must be owned by FilterInfo because they may be mutated as part 
of
+    /// evaluation so there is a bunch of complexity of handing them back and 
forth
+    filter: RowFilter,
+    /// The next filter to be evaluated
+    next_predicate: NonZeroUsize,
+    /// Stores previously computed filter results
+    cache_info: CacheInfo,
+}
+
+/// Predicate cache
+///
+/// Note this is basically the same as CacheOptionsBuilder
+/// but it owns the ProjectionMask and RowGroupCache
+#[derive(Debug)]
+pub(super) struct CacheInfo {
+    /// The columns to cache in the predicate cache
+    cache_projection: ProjectionMask,
+    row_group_cache: Arc<Mutex<RowGroupCache>>,
+}
+
+impl CacheInfo {
+    pub(super) fn new(
+        cache_projection: ProjectionMask,
+        row_group_cache: Arc<Mutex<RowGroupCache>>,
+    ) -> Self {
+        Self {
+            cache_projection,
+            row_group_cache,
+        }
+    }
+
+    pub(super) fn builder(&self) -> CacheOptionsBuilder<'_> {
+        CacheOptionsBuilder::new(&self.cache_projection, &self.row_group_cache)
+    }
+}
+
+pub(super) enum AdvanceResult {
+    /// advanced to the next predicate
+    Continue(FilterInfo),
+    /// no more predicates returns the row filter and cache info
+    Done(RowFilter, CacheInfo),
+}
+
+impl FilterInfo {
+    /// Create a new FilterInfo
+    pub(super) fn new(filter: RowFilter, cache_info: CacheInfo) -> Self {
+        Self {
+            filter,
+            next_predicate: NonZeroUsize::new(1).expect("1 is always 
non-zero"),
+            cache_info,
+        }
+    }
+
+    /// Advance to the next predicate, returning either the updated FilterInfo
+    /// or the completed RowFilter if there are no more predicates
+    pub(super) fn advance(mut self) -> AdvanceResult {
+        if self.next_predicate.get() >= self.filter.predicates.len() {
+            AdvanceResult::Done(self.filter, self.cache_info)
+        } else {
+            self.next_predicate = self
+                .next_predicate
+                .checked_add(1)
+                .expect("no usize overflow");
+            AdvanceResult::Continue(self)
+        }
+    }
+
+    /// Return the current predicate to evaluate, mutablely
+    /// Panics if done() is true

Review Comment:
   I don't see any `done()` method. Would returning an `Option<>` here be 
better than panicking?



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+    row_group_idx: usize,
+    row_count: usize,
+    plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+    Start {
+        row_group_info: RowGroupInfo,
+    },
+    /// Planning filters, but haven't yet requested data to evaluate them
+    Filters {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from prior filters
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        filter_info: FilterInfo,
+    },
+    /// Needs data to evaluate current filter
+    WaitingOnFilterData {
+        row_group_info: RowGroupInfo,
+        filter_info: FilterInfo,
+        data_request: DataRequest,
+    },
+    /// Know what data to actually read, after all predicates
+    StartData {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from the filtering phase
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Needs data to proceed with reading the output
+    WaitingOnData {
+        row_group_info: RowGroupInfo,
+        data_request: DataRequest,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Finished (or not yet started) reading this group
+    Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+    next_state: RowGroupDecoderState,
+    /// result to return, if any
+    ///
+    /// * `Some`: the processing should stop and return the result
+    /// * `None`: processing should continue
+    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+    /// The next state with no result.
+    ///
+    /// This indicates processing should continue
+    fn again(next_state: RowGroupDecoderState) -> Self {
+        Self {
+            next_state,
+            result: None,
+        }
+    }
+
+    /// Create a NextState with a result that should be returned
+    fn result(
+        next_state: RowGroupDecoderState,
+        result: DecodeResult<ParquetRecordBatchReader>,
+    ) -> Self {
+        Self {
+            next_state,
+            result: Some(result),
+        }
+    }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+    /// The output batch size
+    batch_size: usize,
+
+    /// What columns to project (produce in each output batch)
+    projection: ProjectionMask,
+
+    /// The Parquet file metadata
+    metadata: Arc<ParquetMetaData>,
+
+    /// Top level parquet schema and arrow schema mapping
+    fields: Option<Arc<ParquetField>>,
+
+    /// Optional filter
+    filter: Option<RowFilter>,
+
+    /// Limit to apply to remaining row groups (decremented as rows are read)
+    limit: Option<usize>,
+
+    /// Offset to apply to remaining row groups (decremented as rows are read)
+    offset: Option<usize>,
+
+    /// The size in bytes of the predicate cache

Review Comment:
   A link to the main module or struct for the predicate cache? As someone who 
knows it exists but not much more I'd be interested in seeing an overview of 
how it works. In particular the question that comes to mind is if it's used 
only to cache between predicate evaluations or if it's also then used for the 
projection (e.g. `select c1 from t where c1 ...` does c1 get cached between the 
filter and projection).



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+    row_group_idx: usize,
+    row_count: usize,
+    plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+    Start {
+        row_group_info: RowGroupInfo,
+    },
+    /// Planning filters, but haven't yet requested data to evaluate them
+    Filters {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from prior filters
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        filter_info: FilterInfo,
+    },
+    /// Needs data to evaluate current filter
+    WaitingOnFilterData {
+        row_group_info: RowGroupInfo,
+        filter_info: FilterInfo,
+        data_request: DataRequest,
+    },
+    /// Know what data to actually read, after all predicates
+    StartData {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from the filtering phase
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Needs data to proceed with reading the output
+    WaitingOnData {
+        row_group_info: RowGroupInfo,
+        data_request: DataRequest,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Finished (or not yet started) reading this group
+    Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+    next_state: RowGroupDecoderState,
+    /// result to return, if any
+    ///
+    /// * `Some`: the processing should stop and return the result
+    /// * `None`: processing should continue
+    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+    /// The next state with no result.
+    ///
+    /// This indicates processing should continue
+    fn again(next_state: RowGroupDecoderState) -> Self {
+        Self {
+            next_state,
+            result: None,
+        }
+    }
+
+    /// Create a NextState with a result that should be returned
+    fn result(
+        next_state: RowGroupDecoderState,
+        result: DecodeResult<ParquetRecordBatchReader>,
+    ) -> Self {
+        Self {
+            next_state,
+            result: Some(result),
+        }
+    }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+    /// The output batch size
+    batch_size: usize,
+
+    /// What columns to project (produce in each output batch)
+    projection: ProjectionMask,
+
+    /// The Parquet file metadata
+    metadata: Arc<ParquetMetaData>,
+
+    /// Top level parquet schema and arrow schema mapping
+    fields: Option<Arc<ParquetField>>,
+
+    /// Optional filter
+    filter: Option<RowFilter>,
+
+    /// Limit to apply to remaining row groups (decremented as rows are read)
+    limit: Option<usize>,
+
+    /// Offset to apply to remaining row groups (decremented as rows are read)
+    offset: Option<usize>,
+
+    /// The size in bytes of the predicate cache
+    max_predicate_cache_size: usize,
+
+    /// The metrics collector
+    metrics: ArrowReaderMetrics,
+
+    /// Current state of the decoder.
+    ///
+    /// It is taken when processing, and must be put back before returning
+    /// it is a bug error if it is not put back after transitioning states.
+    state: Option<RowGroupDecoderState>,
+
+    /// The underlying data store
+    buffers: PushBuffers,
+}
+
+impl RowGroupReaderBuilder {
+    /// Create a new RowGroupReaderBuilder
+    #[expect(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        batch_size: usize,
+        projection: ProjectionMask,
+        metadata: Arc<ParquetMetaData>,
+        fields: Option<Arc<ParquetField>>,
+        filter: Option<RowFilter>,
+        limit: Option<usize>,
+        offset: Option<usize>,
+        metrics: ArrowReaderMetrics,
+        max_predicate_cache_size: usize,
+        buffers: PushBuffers,
+    ) -> Self {
+        Self {
+            batch_size,
+            projection,
+            metadata,
+            fields,
+            filter,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+            state: Some(RowGroupDecoderState::Finished),
+            buffers,
+        }
+    }
+
+    /// Push new data buffers that can be used to satisfy pending requests
+    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+        self.buffers.push_ranges(ranges, buffers);
+    }
+
+    /// Returns the total number of buffered bytes available
+    pub fn buffered_bytes(&self) -> u64 {
+        self.buffers.buffered_bytes()
+    }
+
+    /// take the current state, leaving None in its place.
+    ///
+    /// Returns an error if there the state wasn't put back after the previous
+    /// call to [`Self::take_state`].
+    ///
+    /// Any code that calls this method must ensure that the state is put back
+    /// before returning, otherwise the reader error next time it is called
+    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
+        self.state.take().ok_or_else(|| {
+            ParquetError::General(String::from(
+                "Internal Error: RowGroupReader in invalid state",
+            ))
+        })
+    }
+
+    /// Setup this reader to read the next row group
+    pub(crate) fn next_row_group(
+        &mut self,
+        row_group_idx: usize,
+        row_count: usize,
+        selection: Option<RowSelection>,
+    ) -> Result<(), ParquetError> {
+        let state = self.take_state()?;
+        if !matches!(state, RowGroupDecoderState::Finished) {
+            return Err(ParquetError::General(format!(
+                "Internal Error: next_row_group called while still reading a 
row group. Expected Finished state, got {state:?}"
+            )));
+        }
+        let plan_builder = 
ReadPlanBuilder::new(self.batch_size).with_selection(selection);
+
+        let row_group_info = RowGroupInfo {
+            row_group_idx,
+            row_count,
+            plan_builder,
+        };
+
+        self.state = Some(RowGroupDecoderState::Start { row_group_info });
+        Ok(())
+    }
+
+    /// Try to build the next `ParquetRecordBatchReader` from this 
RowGroupReader.
+    ///
+    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
+    /// ranges of data that are needed to proceed.
+    ///
+    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
+    /// `DecodeResult::Data`.
+    pub(crate) fn try_build(
+        &mut self,
+    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+        loop {
+            let current_state = self.take_state()?;
+            match self.try_transition(current_state)? {
+                NextState {
+                    next_state,
+                    result: Some(result),
+                } => {
+                    // put back the next state
+                    self.state = Some(next_state);
+                    return Ok(result);
+                }
+                NextState {
+                    next_state,
+                    result: None,
+                } => {
+                    // continue processing
+                    self.state = Some(next_state);
+                }
+            }
+        }
+    }
+
+    /// Current state --> next state + optional output
+    ///
+    /// This is the main state transition function for the row group reader
+    /// and encodes the row group decoding state machine.
+    ///
+    /// # Notes
+    ///
+    /// This structure is used to reduce the indentation level of the main loop
+    /// in try_build
+    fn try_transition(
+        &mut self,
+        current_state: RowGroupDecoderState,
+    ) -> Result<NextState, ParquetError> {
+        let result = match current_state {
+            RowGroupDecoderState::Start { row_group_info } => {
+                let column_chunks = None; // no prior column chunks
+
+                let Some(filter) = self.filter.take() else {
+                    // no filter, start trying to read data immediately
+                    return Ok(NextState::again(RowGroupDecoderState::StartData 
{
+                        row_group_info,
+                        column_chunks,
+                        cache_info: None,
+                    }));
+                };
+                // no predicates in filter, so start reading immediately
+                if filter.predicates.is_empty() {
+                    return Ok(NextState::again(RowGroupDecoderState::StartData 
{
+                        row_group_info,
+                        column_chunks,
+                        cache_info: None,
+                    }));
+                };

Review Comment:
   Could we collapse these two states in `RowGroupReaderBuilder::new()`?



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -0,0 +1,1150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
+//! caller (rather than from an underlying reader).
+
+mod reader_builder;
+mod remaining;
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{
+    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
+};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use reader_builder::RowGroupReaderBuilder;
+use remaining::RemainingRowGroups;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// A builder for [`ParquetPushDecoder`].
+///
+/// To create a new decoder, use 
[`ParquetPushDecoderBuilder::try_new_decoder`] and pass
+/// the file length and metadata of the Parquet file to decode.
+///
+/// You can decode the metadata from a Parquet file using either
+/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
+///
+/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
+/// [`ParquetMetaDataPushDecoder`]: 
crate::file::metadata::ParquetMetaDataPushDecoder
+///
+/// Note the "input" type is `u64` which represents the length of the Parquet 
file
+/// being decoded. This is needed to initialize the internal buffers that track
+/// what data has been provided to the decoder.
+///
+/// # Example
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # let file_bytes = {
+/// #   let mut buffer = vec![];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!("failed to decode 
metadata") };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// // The file length and metadata are required to create the decoder
+/// let mut decoder =
+///     ParquetPushDecoderBuilder::try_new_decoder(file_length, 
parquet_metadata)
+///       .unwrap()
+///       // Optionally configure the decoder, e.g. batch size
+///       .with_batch_size(1024)
+///       // Build the decoder
+///       .build()
+///       .unwrap();
+///
+///     // In a loop, ask the decoder what it needs next, and provide it with 
the required data
+///     loop {
+///         match decoder.try_decode().unwrap() {
+///             DecodeResult::NeedsData(ranges) => {
+///                 // The decoder needs more data. Fetch the data for the 
given ranges
+///                 let data = ranges.iter().map(|r| 
get_range(r)).collect::<Vec<_>>();
+///                 // Push the data to the decoder
+///                 decoder.push_ranges(ranges, data).unwrap();
+///                 // After pushing the data, we can try to decode again on 
the next iteration
+///             }
+///             DecodeResult::Data(batch) => {
+///                 // Successfully decoded a batch of data
+///                 assert!(batch.num_rows() > 0);
+///             }
+///             DecodeResult::Finished => {
+///                 // The decoder has finished decoding exit the loop
+///                 break;
+///             }

Review Comment:
   This is a very nice high level API!



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -0,0 +1,1150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
+//! caller (rather than from an underlying reader).
+
+mod reader_builder;
+mod remaining;
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{
+    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
+};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use reader_builder::RowGroupReaderBuilder;
+use remaining::RemainingRowGroups;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// A builder for [`ParquetPushDecoder`].
+///
+/// To create a new decoder, use 
[`ParquetPushDecoderBuilder::try_new_decoder`] and pass
+/// the file length and metadata of the Parquet file to decode.
+///
+/// You can decode the metadata from a Parquet file using either
+/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
+///
+/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
+/// [`ParquetMetaDataPushDecoder`]: 
crate::file::metadata::ParquetMetaDataPushDecoder
+///
+/// Note the "input" type is `u64` which represents the length of the Parquet 
file
+/// being decoded. This is needed to initialize the internal buffers that track
+/// what data has been provided to the decoder.
+///
+/// # Example
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # let file_bytes = {
+/// #   let mut buffer = vec![];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!("failed to decode 
metadata") };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// // The file length and metadata are required to create the decoder
+/// let mut decoder =
+///     ParquetPushDecoderBuilder::try_new_decoder(file_length, 
parquet_metadata)
+///       .unwrap()
+///       // Optionally configure the decoder, e.g. batch size
+///       .with_batch_size(1024)
+///       // Build the decoder
+///       .build()
+///       .unwrap();
+///
+///     // In a loop, ask the decoder what it needs next, and provide it with 
the required data
+///     loop {
+///         match decoder.try_decode().unwrap() {
+///             DecodeResult::NeedsData(ranges) => {
+///                 // The decoder needs more data. Fetch the data for the 
given ranges
+///                 let data = ranges.iter().map(|r| 
get_range(r)).collect::<Vec<_>>();
+///                 // Push the data to the decoder
+///                 decoder.push_ranges(ranges, data).unwrap();
+///                 // After pushing the data, we can try to decode again on 
the next iteration
+///             }
+///             DecodeResult::Data(batch) => {
+///                 // Successfully decoded a batch of data
+///                 assert!(batch.num_rows() > 0);
+///             }
+///             DecodeResult::Finished => {
+///                 // The decoder has finished decoding exit the loop
+///                 break;
+///             }
+///         }
+///     }
+/// ```
+pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<u64>;
+
+/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] 
for
+/// more options that can be configured.
+impl ParquetPushDecoderBuilder {

Review Comment:
   Any particular reason for not putting the impl in the same file as the 
struct?



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+    row_group_idx: usize,
+    row_count: usize,
+    plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+    Start {
+        row_group_info: RowGroupInfo,
+    },
+    /// Planning filters, but haven't yet requested data to evaluate them
+    Filters {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from prior filters
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        filter_info: FilterInfo,
+    },
+    /// Needs data to evaluate current filter
+    WaitingOnFilterData {
+        row_group_info: RowGroupInfo,
+        filter_info: FilterInfo,
+        data_request: DataRequest,
+    },
+    /// Know what data to actually read, after all predicates
+    StartData {
+        row_group_info: RowGroupInfo,
+        /// Any previously read column chunk data from the filtering phase
+        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Needs data to proceed with reading the output
+    WaitingOnData {
+        row_group_info: RowGroupInfo,
+        data_request: DataRequest,
+        /// Any cached filter results
+        cache_info: Option<CacheInfo>,
+    },
+    /// Finished (or not yet started) reading this group
+    Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+    next_state: RowGroupDecoderState,
+    /// result to return, if any
+    ///
+    /// * `Some`: the processing should stop and return the result
+    /// * `None`: processing should continue
+    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+    /// The next state with no result.
+    ///
+    /// This indicates processing should continue
+    fn again(next_state: RowGroupDecoderState) -> Self {
+        Self {
+            next_state,
+            result: None,
+        }
+    }
+
+    /// Create a NextState with a result that should be returned
+    fn result(
+        next_state: RowGroupDecoderState,
+        result: DecodeResult<ParquetRecordBatchReader>,
+    ) -> Self {
+        Self {
+            next_state,
+            result: Some(result),
+        }
+    }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+    /// The output batch size
+    batch_size: usize,
+
+    /// What columns to project (produce in each output batch)
+    projection: ProjectionMask,
+
+    /// The Parquet file metadata
+    metadata: Arc<ParquetMetaData>,
+
+    /// Top level parquet schema and arrow schema mapping
+    fields: Option<Arc<ParquetField>>,
+
+    /// Optional filter
+    filter: Option<RowFilter>,
+
+    /// Limit to apply to remaining row groups (decremented as rows are read)
+    limit: Option<usize>,
+
+    /// Offset to apply to remaining row groups (decremented as rows are read)
+    offset: Option<usize>,
+
+    /// The size in bytes of the predicate cache
+    max_predicate_cache_size: usize,
+
+    /// The metrics collector
+    metrics: ArrowReaderMetrics,
+
+    /// Current state of the decoder.
+    ///
+    /// It is taken when processing, and must be put back before returning
+    /// it is a bug error if it is not put back after transitioning states.
+    state: Option<RowGroupDecoderState>,
+
+    /// The underlying data store
+    buffers: PushBuffers,
+}
+
+impl RowGroupReaderBuilder {
+    /// Create a new RowGroupReaderBuilder
+    #[expect(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        batch_size: usize,
+        projection: ProjectionMask,
+        metadata: Arc<ParquetMetaData>,
+        fields: Option<Arc<ParquetField>>,
+        filter: Option<RowFilter>,
+        limit: Option<usize>,
+        offset: Option<usize>,
+        metrics: ArrowReaderMetrics,
+        max_predicate_cache_size: usize,
+        buffers: PushBuffers,
+    ) -> Self {
+        Self {
+            batch_size,
+            projection,
+            metadata,
+            fields,
+            filter,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+            state: Some(RowGroupDecoderState::Finished),
+            buffers,
+        }
+    }
+
+    /// Push new data buffers that can be used to satisfy pending requests
+    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+        self.buffers.push_ranges(ranges, buffers);
+    }
+
+    /// Returns the total number of buffered bytes available
+    pub fn buffered_bytes(&self) -> u64 {
+        self.buffers.buffered_bytes()
+    }
+
+    /// take the current state, leaving None in its place.
+    ///
+    /// Returns an error if there the state wasn't put back after the previous
+    /// call to [`Self::take_state`].
+    ///
+    /// Any code that calls this method must ensure that the state is put back
+    /// before returning, otherwise the reader error next time it is called
+    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
+        self.state.take().ok_or_else(|| {
+            ParquetError::General(String::from(
+                "Internal Error: RowGroupReader in invalid state",
+            ))
+        })
+    }
+
+    /// Setup this reader to read the next row group
+    pub(crate) fn next_row_group(
+        &mut self,
+        row_group_idx: usize,
+        row_count: usize,
+        selection: Option<RowSelection>,
+    ) -> Result<(), ParquetError> {
+        let state = self.take_state()?;
+        if !matches!(state, RowGroupDecoderState::Finished) {
+            return Err(ParquetError::General(format!(
+                "Internal Error: next_row_group called while still reading a 
row group. Expected Finished state, got {state:?}"
+            )));
+        }
+        let plan_builder = 
ReadPlanBuilder::new(self.batch_size).with_selection(selection);
+
+        let row_group_info = RowGroupInfo {
+            row_group_idx,
+            row_count,
+            plan_builder,
+        };
+
+        self.state = Some(RowGroupDecoderState::Start { row_group_info });
+        Ok(())
+    }
+
+    /// Try to build the next `ParquetRecordBatchReader` from this 
RowGroupReader.
+    ///
+    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
+    /// ranges of data that are needed to proceed.
+    ///
+    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
+    /// `DecodeResult::Data`.
+    pub(crate) fn try_build(
+        &mut self,
+    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+        loop {
+            let current_state = self.take_state()?;
+            match self.try_transition(current_state)? {
+                NextState {
+                    next_state,
+                    result: Some(result),
+                } => {
+                    // put back the next state
+                    self.state = Some(next_state);
+                    return Ok(result);
+                }
+                NextState {
+                    next_state,
+                    result: None,
+                } => {
+                    // continue processing
+                    self.state = Some(next_state);
+                }
+            }
+        }
+    }
+
+    /// Current state --> next state + optional output
+    ///
+    /// This is the main state transition function for the row group reader
+    /// and encodes the row group decoding state machine.
+    ///
+    /// # Notes
+    ///
+    /// This structure is used to reduce the indentation level of the main loop
+    /// in try_build
+    fn try_transition(
+        &mut self,
+        current_state: RowGroupDecoderState,
+    ) -> Result<NextState, ParquetError> {
+        let result = match current_state {
+            RowGroupDecoderState::Start { row_group_info } => {
+                let column_chunks = None; // no prior column chunks
+
+                let Some(filter) = self.filter.take() else {
+                    // no filter, start trying to read data immediately
+                    return Ok(NextState::again(RowGroupDecoderState::StartData 
{
+                        row_group_info,
+                        column_chunks,
+                        cache_info: None,
+                    }));
+                };
+                // no predicates in filter, so start reading immediately
+                if filter.predicates.is_empty() {
+                    return Ok(NextState::again(RowGroupDecoderState::StartData 
{
+                        row_group_info,
+                        column_chunks,
+                        cache_info: None,
+                    }));
+                };
+
+                // we have predicates to evaluate
+                let cache_projection =
+                    
self.compute_cache_projection(row_group_info.row_group_idx, &filter);
+
+                let cache_info = CacheInfo::new(
+                    cache_projection,
+                    Arc::new(Mutex::new(RowGroupCache::new(
+                        self.batch_size,
+                        self.max_predicate_cache_size,
+                    ))),
+                );
+
+                let filter_info = FilterInfo::new(filter, cache_info);
+                NextState::again(RowGroupDecoderState::Filters {
+                    row_group_info,
+                    filter_info,
+                    column_chunks,
+                })
+            }
+            // need to evaluate filters
+            RowGroupDecoderState::Filters {
+                row_group_info,
+                column_chunks,
+                filter_info,
+            } => {
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                } = row_group_info;
+
+                // If nothing is selected, we are done with this row group
+                if !plan_builder.selects_any() {
+                    // ruled out entire row group
+                    self.filter = Some(filter_info.into_filter());
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::Finished,
+                        DecodeResult::Finished,
+                    ));
+                }
+
+                // Make a request for the data needed to evaluate the current 
predicate
+                let predicate = filter_info.current();
+
+                // need to fetch pages the column needs for decoding, figure
+                // that out based on the current selection and projection
+                let data_request = DataRequestBuilder::new(
+                    row_group_idx,
+                    row_count,
+                    self.batch_size,
+                    &self.metadata,
+                    predicate.projection(), // use the predicate's projection
+                )
+                .with_selection(plan_builder.selection())
+                // Fetch predicate columns; expand selection only for cached 
predicate columns
+                .with_cache_projection(Some(filter_info.cache_projection()))
+                .with_column_chunks(column_chunks)
+                .build();
+
+                let row_group_info = RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                };
+
+                NextState::again(RowGroupDecoderState::WaitingOnFilterData {
+                    row_group_info,
+                    filter_info,
+                    data_request,
+                })
+            }
+            RowGroupDecoderState::WaitingOnFilterData {
+                row_group_info,
+                data_request,
+                mut filter_info,
+            } => {
+                // figure out what ranges we still need
+                let needed_ranges = data_request.needed_ranges(&self.buffers);
+                if !needed_ranges.is_empty() {
+                    // still need data
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::WaitingOnFilterData {
+                            row_group_info,
+                            filter_info,
+                            data_request,
+                        },
+                        DecodeResult::NeedsData(needed_ranges),
+                    ));
+                }
+
+                // otherwise we have all the data we need to evaluate the 
predicate
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    mut plan_builder,
+                } = row_group_info;
+
+                let predicate = filter_info.current();
+
+                let row_group = data_request.try_into_in_memory_row_group(
+                    row_group_idx,
+                    row_count,
+                    &self.metadata,
+                    predicate.projection(),
+                    &mut self.buffers,
+                )?;
+
+                let cache_options = filter_info.cache_builder().producer();
+
+                let array_reader = ArrayReaderBuilder::new(&row_group, 
&self.metrics)
+                    .with_cache_options(Some(&cache_options))
+                    .build_array_reader(self.fields.as_deref(), 
predicate.projection())?;
+
+                plan_builder =
+                    plan_builder.with_predicate(array_reader, 
filter_info.current_mut())?;
+
+                let row_group_info = RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                };
+
+                // Take back the column chunks that were read
+                let column_chunks = Some(row_group.column_chunks);
+
+                // advance to the next predicate, if any
+                match filter_info.advance() {
+                    AdvanceResult::Continue(filter_info) => {
+                        NextState::again(RowGroupDecoderState::Filters {
+                            row_group_info,
+                            column_chunks,
+                            filter_info,
+                        })
+                    }
+                    // done with predicates, proceed to reading data
+                    AdvanceResult::Done(filter, cache_info) => {
+                        // remember we need to put back the filter
+                        assert!(self.filter.is_none());
+                        self.filter = Some(filter);
+                        NextState::again(RowGroupDecoderState::StartData {
+                            row_group_info,
+                            column_chunks,
+                            cache_info: Some(cache_info),
+                        })
+                    }
+                }
+            }
+            RowGroupDecoderState::StartData {
+                row_group_info,
+                column_chunks,
+                cache_info,
+            } => {
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                } = row_group_info;
+
+                // Compute the number of rows in the selection before applying 
limit and offset
+                let rows_before = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+
+                if rows_before == 0 {
+                    // ruled out entire row group
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::Finished,
+                        DecodeResult::Finished,
+                    ));
+                }
+
+                // Apply any limit and offset
+                let plan_builder = plan_builder
+                    .limited(row_count)
+                    .with_offset(self.offset)
+                    .with_limit(self.limit)
+                    .build_limited();
+
+                let rows_after = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+
+                // Update running offset and limit for after the current row 
group is read
+                if let Some(offset) = &mut self.offset {
+                    // Reduction is either because of offset or limit, as 
limit is applied
+                    // after offset has been "exhausted" can just use 
saturating sub here
+                    *offset = offset.saturating_sub(rows_before - rows_after)
+                }
+
+                if rows_after == 0 {
+                    // no rows left after applying limit/offset
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::Finished,
+                        DecodeResult::Finished,
+                    ));
+                }
+
+                if let Some(limit) = &mut self.limit {
+                    *limit -= rows_after;
+                }
+
+                let data_request = DataRequestBuilder::new(
+                    row_group_idx,
+                    row_count,
+                    self.batch_size,
+                    &self.metadata,
+                    &self.projection,
+                )
+                .with_selection(plan_builder.selection())
+                .with_column_chunks(column_chunks)
+                // Final projection fetch shouldn't expand selection for cache
+                // so don't call with_cache_projection here
+                .build();
+
+                let row_group_info = RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                };
+
+                NextState::again(RowGroupDecoderState::WaitingOnData {
+                    row_group_info,
+                    data_request,
+                    cache_info,
+                })
+            }
+            // Waiting on data to proceed with reading the output
+            RowGroupDecoderState::WaitingOnData {
+                row_group_info,
+                data_request,
+                cache_info,
+            } => {
+                let needed_ranges = data_request.needed_ranges(&self.buffers);
+                if !needed_ranges.is_empty() {
+                    // still need data
+                    return Ok(NextState::result(
+                        RowGroupDecoderState::WaitingOnData {
+                            row_group_info,
+                            data_request,
+                            cache_info,
+                        },
+                        DecodeResult::NeedsData(needed_ranges),
+                    ));
+                }
+
+                // otherwise we have all the data we need to proceed
+                let RowGroupInfo {
+                    row_group_idx,
+                    row_count,
+                    plan_builder,
+                } = row_group_info;
+
+                let row_group = data_request.try_into_in_memory_row_group(
+                    row_group_idx,
+                    row_count,
+                    &self.metadata,
+                    &self.projection,
+                    &mut self.buffers,
+                )?;
+
+                let plan = plan_builder.build();
+
+                // if we have any cached results, connect them up
+                let array_reader_builder = ArrayReaderBuilder::new(&row_group, 
&self.metrics);
+                let array_reader = if let Some(cache_info) = 
cache_info.as_ref() {
+                    let cache_options = cache_info.builder().consumer();
+                    array_reader_builder
+                        .with_cache_options(Some(&cache_options))
+                        .build_array_reader(self.fields.as_deref(), 
&self.projection)
+                } else {
+                    array_reader_builder
+                        .build_array_reader(self.fields.as_deref(), 
&self.projection)
+                }?;
+
+                let reader = ParquetRecordBatchReader::new(array_reader, plan);
+                NextState::result(RowGroupDecoderState::Finished, 
DecodeResult::Data(reader))
+            }
+            RowGroupDecoderState::Finished => {
+                // nothing left to read
+                NextState::result(RowGroupDecoderState::Finished, 
DecodeResult::Finished)
+            }
+        };
+        Ok(result)
+    }
+
+    /// Which columns should be cached?
+    ///
+    /// Returns the columns that are used by the filters *and* then used in the
+    /// final projection, excluding any nested columns.
+    fn compute_cache_projection(&self, row_group_idx: usize, filter: 
&RowFilter) -> ProjectionMask {
+        let meta = self.metadata.row_group(row_group_idx);
+        match self.compute_cache_projection_inner(filter) {
+            Some(projection) => projection,
+            None => ProjectionMask::none(meta.columns().len()),
+        }
+    }
+
+    fn compute_cache_projection_inner(&self, filter: &RowFilter) -> 
Option<ProjectionMask> {
+        let mut cache_projection = 
filter.predicates.first()?.projection().clone();
+        for predicate in filter.predicates.iter() {
+            cache_projection.union(predicate.projection());
+        }
+        cache_projection.intersect(&self.projection);
+        self.exclude_nested_columns_from_cache(&cache_projection)
+    }
+
+    /// Exclude leaves belonging to roots that span multiple parquet leaves 
(i.e. nested columns)

Review Comment:
   Curious why this is? Does this mean that if there is a query like `select 1 
from t where nested["field"] <15 AND nested["field"] > 5` we will read the leaf 
for `nested["field"]` twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to