tustvold commented on code in PR #2335:
URL: https://github.com/apache/arrow-rs/pull/2335#discussion_r938877205


##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -39,20 +39,18 @@ use crate::data_type::{
     Int64Type, Int96Type,
 };
 use crate::errors::Result;
-use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};
+use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
 
 /// Create array reader from parquet schema, projection mask, and parquet file 
reader.
 pub fn build_array_reader(
-    parquet_schema: SchemaDescPtr,
     arrow_schema: SchemaRef,
     mask: ProjectionMask,
-    row_groups: Box<dyn RowGroupCollection>,
+    row_groups: &dyn RowGroupCollection,

Review Comment:
   Drive by cleanup



##########
parquet/src/arrow/arrow_reader/filter.rs:
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::ProjectionMask;
+use arrow::array::BooleanArray;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+/// A predicate operating on [`RecordBatch`]
+pub trait ArrowPredicate: Send + 'static {

Review Comment:
   This is to make things more extensible in the long run



##########
parquet/src/arrow/array_reader/mod.rs:
##########
@@ -110,8 +110,8 @@ pub trait RowGroupCollection {
 }
 
 impl RowGroupCollection for Arc<dyn FileReader> {
-    fn schema(&self) -> Result<SchemaDescPtr> {
-        Ok(self.metadata().file_metadata().schema_descr_ptr())
+    fn schema(&self) -> SchemaDescPtr {

Review Comment:
   Drive by cleanup



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -128,18 +108,18 @@ impl ArrowReaderOptions {
 
     /// Scan rows from the parquet file according to the provided `selection`
     ///
-    /// TODO: Make public once row selection fully implemented (#1792)
-    pub(crate) fn with_row_selection(
-        self,
-        selection: impl Into<Vec<RowSelection>>,
-    ) -> Self {
+    /// TODO: Revisit this API, as [`Self`] is provided before the file 
metadata is available

Review Comment:
   I intend to revisit this as part of the next (21) arrow release, I suspect 
we can move to a builder and deprecate the current API which is quite clunky



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -375,11 +340,36 @@ impl ParquetRecordBatchReader {
             batch_size,
             array_reader,
             schema: Arc::new(schema),
-            selection,
+            selection: selection.map(Into::into),
         }
     }
 }
 
+/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`]
+///
+/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the
+/// returned [`RowSelection`] will be the conjunction of this and
+/// the rows selected by `predicate`
+pub(crate) fn evaluate_predicate(
+    batch_size: usize,
+    array_reader: Box<dyn ArrayReader>,
+    selection: Option<RowSelection>,
+    predicate: &mut dyn ArrowPredicate,
+) -> Result<RowSelection> {
+    let reader =
+        ParquetRecordBatchReader::new(batch_size, array_reader, 
selection.clone());
+    let mut filters = vec![];

Review Comment:
   We could theoretically keep the decoded arrays around, but requires a lot of 
non-trivial `take` + `concat` in order to sync up the yielded batches. It also 
potentially balloons the memory consumption. I decided it was not worth it



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -370,101 +503,40 @@ where
                         None => return Poll::Ready(None),
                     };
 
-                    let metadata = self.metadata.clone();
-                    let mut input = match self.input.take() {
-                        Some(input) => input,
-                        None => {
-                            self.state = StreamState::Error;
-                            return Poll::Ready(Some(Err(general_err!(
-                                "input stream lost"
-                            ))));
-                        }
-                    };
+                    let reader = self.reader.take().expect("lost reader");
 
-                    let projection = self.projection.clone();
-                    self.state = StreamState::Reading(
-                        async move {
-                            let row_group_metadata = 
metadata.row_group(row_group_idx);
-                            let mut column_chunks =
-                                vec![None; row_group_metadata.columns().len()];
-
-                            // TODO: Combine consecutive ranges
-                            let fetch_ranges = (0..column_chunks.len())

Review Comment:
   This logic is moved into `InMemoryRowGroup::fetch`



##########
parquet/src/arrow/arrow_reader/filter.rs:
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::ProjectionMask;
+use arrow::array::BooleanArray;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+/// A predicate operating on [`RecordBatch`]
+pub trait ArrowPredicate: Send + 'static {
+    /// Returns the projection mask for this predicate
+    fn projection(&self) -> &ProjectionMask;
+
+    /// Called with a [`RecordBatch`] containing the columns identified by 
[`Self::mask`],
+    /// with `true` values in the returned [`BooleanArray`] indicating rows
+    /// matching the predicate.
+    ///
+    /// The returned [`BooleanArray`] must not contain any nulls
+    fn filter(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray>;
+}
+
+/// An [`ArrowPredicate`] created from an [`FnMut`]
+pub struct ArrowPredicateFn<F> {
+    f: F,
+    projection: ProjectionMask,
+}
+
+impl<F> ArrowPredicateFn<F>
+where
+    F: FnMut(RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
+{
+    /// Create a new [`ArrowPredicateFn`]
+    pub fn new(projection: ProjectionMask, f: F) -> Self {
+        Self { f, projection }
+    }
+}
+
+impl<F> ArrowPredicate for ArrowPredicateFn<F>
+where
+    F: FnMut(RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
+{
+    fn projection(&self) -> &ProjectionMask {
+        &self.projection
+    }
+
+    fn filter(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+        (self.f)(batch)
+    }
+}
+
+/// A [`RowFilter`] allows pushing down a filter predicate to skip IO and 
decode
+///
+/// This consists of a list of [`ArrowPredicate`] where only the rows that 
satisfy all
+/// of the predicates will be returned. Any [`RowSelection`] will be applied 
prior
+/// to the first predicate, and each predicate in turn will then be used to 
compute
+/// a more refined [`RowSelection`] to use when evaluating the subsequent 
predicates.
+///
+/// Once all predicates have been evaluated, the resulting [`RowSelection`] 
will be
+/// used to return just the desired rows.
+///
+/// This design has a couple of implications:

Review Comment:
   This is the major change vs #2310, FYI @thinkharderdev 



##########
parquet/src/arrow/arrow_reader/selection.rs:
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, BooleanArray};
+use arrow::compute::SlicesIterator;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::ops::Range;
+
+/// [`RowSelection`] is a collection of [`RowSelect`] used to skip rows when
+/// scanning a parquet file
+#[derive(Debug, Clone, Copy)]
+pub struct RowSelector {
+    /// The number of rows
+    pub row_count: usize,
+
+    /// If true, skip `row_count` rows
+    pub skip: bool,
+}
+
+impl RowSelector {
+    /// Select `row_count` rows
+    pub fn select(row_count: usize) -> Self {
+        Self {
+            row_count,
+            skip: false,
+        }
+    }
+
+    /// Skip `row_count` rows
+    pub fn skip(row_count: usize) -> Self {
+        Self {
+            row_count,
+            skip: true,
+        }
+    }
+}
+
+/// [`RowSelection`] allows selecting or skipping a provided number of rows
+/// when scanning the parquet file.
+///
+/// This is applied prior to reading column data, and can therefore
+/// be used to skip IO to fetch data into memory
+///
+/// A typical use-case would be using the [`PageIndex`] to filter out rows
+/// that don't satisfy a predicate
+///
+/// [`PageIndex`]: [crate::file::page_index::index::PageIndex]
+#[derive(Debug, Clone, Default)]
+pub struct RowSelection {
+    selectors: Vec<RowSelector>,
+}
+
+impl RowSelection {
+    /// Creates a [`RowSelection`] from a slice of [`BooleanArray`]
+    ///
+    /// # Panic
+    ///
+    /// Panics if any of the [`BooleanArray`] contain nulls
+    pub fn from_filters(filters: &[BooleanArray]) -> Self {
+        let mut next_offset = 0;
+        let total_rows = filters.iter().map(|x| x.len()).sum();
+
+        let iter = filters.iter().flat_map(|filter| {
+            let offset = next_offset;
+            next_offset += filter.len();
+            assert_eq!(filter.null_count(), 0);
+            SlicesIterator::new(filter)
+                .map(move |(start, end)| start + offset..end + offset)
+        });
+
+        Self::from_consecutive_ranges(iter, total_rows)
+    }
+
+    /// Creates a [`RowSelection`] from an iterator of consecutive ranges to 
keep
+    fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
+        ranges: I,
+        total_rows: usize,
+    ) -> Self {
+        let mut selectors: Vec<RowSelector> = 
Vec::with_capacity(ranges.size_hint().0);
+        let mut last_end = 0;
+        for range in ranges {
+            let len = range.end - range.start;
+
+            match range.start.cmp(&last_end) {
+                Ordering::Equal => match selectors.last_mut() {
+                    Some(last) => last.row_count += len,
+                    None => selectors.push(RowSelector::select(len)),
+                },
+                Ordering::Greater => {
+                    selectors.push(RowSelector::skip(range.start - last_end));
+                    selectors.push(RowSelector::select(len))
+                }
+                Ordering::Less => panic!("out of order"),
+            }
+            last_end = range.end;
+        }
+
+        if last_end != total_rows {
+            selectors.push(RowSelector::skip(total_rows - last_end))
+        }
+
+        Self { selectors }
+    }
+
+    /// Splits off `row_count` from this [`RowSelection`]
+    pub fn split_off(&mut self, row_count: usize) -> Self {
+        let mut total_count = 0;
+
+        // Find the index where the selector exceeds the row count
+        let find = self.selectors.iter().enumerate().find(|(_, selector)| {
+            total_count += selector.row_count;
+            total_count >= row_count
+        });
+
+        let split_idx = match find {
+            Some((idx, _)) => idx,
+            None => return Self::default(),
+        };
+
+        let mut remaining = self.selectors.split_off(split_idx);
+        if total_count != row_count {
+            let overflow = total_count - row_count;
+            let rem = remaining.first_mut().unwrap();
+            rem.row_count -= overflow;
+
+            self.selectors.push(RowSelector {
+                row_count,
+                skip: rem.skip,
+            })
+        }
+
+        std::mem::swap(&mut remaining, &mut self.selectors);
+        Self {
+            selectors: remaining,
+        }
+    }
+
+    /// Given a [`RowSelection`] computed under `self` returns the 
[`RowSelection`]
+    /// representing their conjunction
+    pub fn and(&self, other: &Self) -> Self {
+        let mut selectors = vec![];
+        let mut first = self.selectors.iter().cloned().peekable();
+        let mut second = other.selectors.iter().cloned().peekable();
+
+        let mut to_skip = 0;
+        while let (Some(a), Some(b)) = (first.peek_mut(), second.peek_mut()) {
+            if a.row_count == 0 {
+                first.next().unwrap();
+                continue;
+            }
+
+            if b.row_count == 0 {
+                second.next().unwrap();
+                continue;
+            }
+
+            if a.skip {
+                // Records were skipped when producing second
+                to_skip += a.row_count;
+                first.next().unwrap();
+                continue;
+            }
+
+            let skip = b.skip;
+            let to_process = a.row_count.min(b.row_count);
+
+            a.row_count -= to_process;
+            b.row_count -= to_process;
+
+            match skip {
+                true => to_skip += to_process,
+                false => {
+                    if to_skip != 0 {
+                        selectors.push(RowSelector::skip(to_skip));
+                        to_skip = 0;
+                    }
+                    selectors.push(RowSelector::select(to_process))
+                }
+            }
+        }
+        Self { selectors }
+    }
+
+    /// Returns `true` if this [`RowSelection`] selects any rows
+    pub fn selects_any(&self) -> bool {
+        self.selectors.iter().any(|x| !x.skip)
+    }
+}
+
+impl From<Vec<RowSelector>> for RowSelection {
+    fn from(selectors: Vec<RowSelector>) -> Self {
+        Self { selectors }
+    }
+}
+
+impl Into<VecDeque<RowSelector>> for RowSelection {
+    fn into(self) -> VecDeque<RowSelector> {
+        self.selectors.into()
+    }
+}

Review Comment:
   This file definitely needs some tests prior to merge. The code is largely 
lifted from #2201 



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -349,22 +323,13 @@ impl RecordBatchReader for ParquetRecordBatchReader {
 }
 
 impl ParquetRecordBatchReader {
-    pub fn try_new(

Review Comment:
   This module is not public, and this method was only being used in one place, 
so we can just remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to