etseidl commented on code in PR #7502:
URL: https://github.com/apache/arrow-rs/pull/7502#discussion_r2093752397


##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
+//! from a Parquet file
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::arrow::arrow_reader::{
+    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
+};
+use crate::errors::{ParquetError, Result};
+use arrow_array::Array;
+use arrow_select::filter::prep_null_mask_filter;
+use std::collections::VecDeque;
+
+/// A builder for [`ReadPlan`]
+#[derive(Clone)]
+pub(crate) struct ReadPlanBuilder {
+    batch_size: usize,
+    /// Current to apply, includes all filters
+    selection: Option<RowSelection>,
+}
+
+impl ReadPlanBuilder {
+    /// Create a `ReadPlanBuilder` with the given batch size
+    pub(crate) fn new(batch_size: usize) -> Self {
+        Self {
+            batch_size,
+            selection: None,
+        }
+    }
+
+    /// Set the current selection to the given value
+    pub(crate) fn with_selection(mut self, selection: Option<RowSelection>) -> 
Self {
+        self.selection = selection;
+        self
+    }
+
+    /// Returns the current selection, if any
+    pub(crate) fn selection(&self) -> Option<&RowSelection> {
+        self.selection.as_ref()
+    }
+
+    /// Specifies the number of rows in the row group, before filtering is 
applied.
+    ///
+    /// Returns a [`LimitedReadPlanBuilder`] that can apply
+    /// offset and limit.
+    ///
+    /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to 
this
+    /// selection.
+    pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
+        LimitedReadPlanBuilder::new(self, row_count)
+    }
+
+    /// Returns true if the current plan selects any rows
+    pub(crate) fn selects_any(&self) -> bool {
+        self.selection
+            .as_ref()
+            .map(|s| s.selects_any())
+            .unwrap_or(true)
+    }
+
+    /// Returns the number of rows selected, or `None` if all rows are 
selected.
+    pub(crate) fn num_rows_selected(&self) -> Option<usize> {
+        self.selection.as_ref().map(|s| s.row_count())
+    }
+
+    /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
+    ///
+    /// If the current `selection` is `Some`, the resulting [`RowSelection`]
+    /// will be the conjunction of the existing selection and the rows selected
+    /// by `predicate`.
+    ///
+    /// Note: pre-existing selections may come from evaluating a previous 
predicate
+    /// or if the [`ParquetRecordBatchReader`] specified an explicit
+    /// [`RowSelection`] in addition to one or more predicates.
+    pub(crate) fn with_predicate(
+        mut self,
+        array_reader: Box<dyn ArrayReader>,
+        predicate: &mut dyn ArrowPredicate,
+    ) -> Result<Self> {
+        let reader = ParquetRecordBatchReader::new(array_reader, 
self.clone().build());
+        let mut filters = vec![];
+        for maybe_batch in reader {
+            let maybe_batch = maybe_batch?;
+            let input_rows = maybe_batch.num_rows();
+            let filter = predicate.evaluate(maybe_batch)?;
+            // Since user supplied predicate, check error here to catch bugs 
quickly
+            if filter.len() != input_rows {
+                return Err(arrow_err!(
+                    "ArrowPredicate predicate returned {} rows, expected 
{input_rows}",
+                    filter.len()
+                ));
+            }
+            match filter.null_count() {
+                0 => filters.push(filter),
+                _ => filters.push(prep_null_mask_filter(&filter)),
+            };
+        }
+
+        let raw = RowSelection::from_filters(&filters);
+        self.selection = match self.selection.take() {
+            Some(selection) => Some(selection.and_then(&raw)),
+            None => Some(raw),
+        };
+        Ok(self)
+    }
+
+    /// Create a final `ReadPlan` the read plan for the scan
+    pub(crate) fn build(mut self) -> ReadPlan {
+        // If selection is empty, truncate
+        if !self.selects_any() {
+            self.selection = Some(RowSelection::from(vec![]));
+        }
+        let Self {
+            batch_size,
+            selection,
+        } = self;
+
+        let selection = selection.map(|s| s.trim().into());
+
+        ReadPlan {
+            batch_size,
+            selection,
+        }
+    }
+}
+
+/// Bbuilder for [`ReadPlan`] that applies a limit and offset to the read plan

Review Comment:
   ```suggestion
   /// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to