alamb commented on code in PR #6921:
URL: https://github.com/apache/arrow-rs/pull/6921#discussion_r1900416235


##########
parquet/src/file/serialized_reader.rs:
##########
@@ -650,7 +702,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> 
{
                     let bytes = buffer.slice(offset..);
                     decode_page(
                         header,
-                        bytes,
+                        bytes.to_vec(),

Review Comment:
   I think this now does an extra copy of the data -- I think we can get rid of 
it like this:
   - https://github.com/XiangpengHao/arrow-rs/pull/3



##########
parquet/src/arrow/async_reader/arrow_reader.rs:
##########
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::hash_map::Entry;
+use std::collections::HashMap;
+use std::sync::RwLock;
+use std::{collections::VecDeque, sync::Arc};
+
+use arrow_array::ArrayRef;
+use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader};
+use arrow_schema::{ArrowError, DataType, Schema, SchemaRef};
+use arrow_select::filter::prep_null_mask_filter;
+
+use crate::basic::PageType;
+use crate::column::page::{Page, PageMetadata, PageReader};
+use crate::errors::ParquetError;
+use crate::{
+    arrow::{
+        array_reader::ArrayReader,
+        arrow_reader::{RowFilter, RowSelection, RowSelector},
+    },
+    file::reader::{ChunkReader, SerializedPageReader},
+};
+
+pub struct FilteredParquetRecordBatchReader {
+    batch_size: usize,
+    array_reader: Box<dyn ArrayReader>,
+    predicate_readers: Vec<Box<dyn ArrayReader>>,
+    schema: SchemaRef,
+    selection: VecDeque<RowSelector>,
+    row_filter: Option<RowFilter>,
+}
+
+fn read_selection(
+    reader: &mut dyn ArrayReader,
+    selection: &RowSelection,
+) -> Result<ArrayRef, ParquetError> {
+    for selector in selection.iter() {
+        if selector.skip {
+            let skipped = reader.skip_records(selector.row_count)?;
+            debug_assert_eq!(skipped, selector.row_count, "failed to skip 
rows");
+        } else {
+            let read_records = reader.read_records(selector.row_count)?;
+            debug_assert_eq!(read_records, selector.row_count, "failed to read 
rows");
+        }
+    }
+    reader.consume_batch()
+}
+
+/// Take the next selection from the selection queue, and return the selection
+/// whose selected row count is to_select or less (if input selection is 
exhausted).
+fn take_next_selection(
+    selection: &mut VecDeque<RowSelector>,
+    to_select: usize,
+) -> Option<RowSelection> {
+    let mut current_selected = 0;
+    let mut rt = Vec::new();
+    while let Some(front) = selection.pop_front() {
+        if front.skip {
+            rt.push(front);
+            continue;
+        }
+
+        if current_selected + front.row_count <= to_select {
+            rt.push(front);
+            current_selected += front.row_count;
+        } else {
+            let select = to_select - current_selected;
+            let remaining = front.row_count - select;
+            rt.push(RowSelector::select(select));
+            selection.push_front(RowSelector::select(remaining));
+
+            return Some(rt.into());
+        }
+    }
+    if !rt.is_empty() {
+        return Some(rt.into());
+    }
+    None
+}
+
+impl FilteredParquetRecordBatchReader {
+    pub(crate) fn new(
+        batch_size: usize,
+        array_reader: Box<dyn ArrayReader>,
+        selection: RowSelection,
+        filter_readers: Vec<Box<dyn ArrayReader>>,
+        row_filter: Option<RowFilter>,
+    ) -> Self {
+        let schema = match array_reader.get_data_type() {
+            DataType::Struct(ref fields) => Schema::new(fields.clone()),
+            _ => unreachable!("Struct array reader's data type is not 
struct!"),
+        };
+
+        Self {
+            batch_size,
+            array_reader,
+            predicate_readers: filter_readers,
+            schema: Arc::new(schema),
+            selection: selection.into(),
+            row_filter,
+        }
+    }
+
+    pub(crate) fn take_filter(&mut self) -> Option<RowFilter> {
+        self.row_filter.take()
+    }
+
+    #[inline(never)]
+    /// Take a selection, and return the new selection where the rows are 
filtered by the predicate.
+    fn build_predicate_filter(
+        &mut self,
+        mut selection: RowSelection,
+    ) -> Result<RowSelection, ArrowError> {
+        match &mut self.row_filter {
+            None => Ok(selection),
+            Some(filter) => {
+                debug_assert_eq!(
+                    self.predicate_readers.len(),
+                    filter.predicates.len(),
+                    "predicate readers and predicates should have the same 
length"
+                );
+
+                for (predicate, reader) in filter
+                    .predicates
+                    .iter_mut()
+                    .zip(self.predicate_readers.iter_mut())
+                {
+                    let array = read_selection(reader.as_mut(), &selection)?;
+                    let batch = 
RecordBatch::from(array.as_struct_opt().ok_or_else(|| {
+                        general_err!("Struct array reader should return struct 
array")
+                    })?);
+                    let input_rows = batch.num_rows();
+                    let predicate_filter = predicate.evaluate(batch)?;
+                    if predicate_filter.len() != input_rows {
+                        return Err(ArrowError::ParquetError(format!(
+                            "ArrowPredicate predicate returned {} rows, 
expected {input_rows}",
+                            predicate_filter.len()
+                        )));
+                    }
+                    let predicate_filter = match predicate_filter.null_count() 
{
+                        0 => predicate_filter,
+                        _ => prep_null_mask_filter(&predicate_filter),
+                    };
+                    let raw = RowSelection::from_filters(&[predicate_filter]);
+                    selection = selection.and_then(&raw);
+                }
+                Ok(selection)
+            }
+        }
+    }
+}
+
+impl Iterator for FilteredParquetRecordBatchReader {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        // With filter pushdown, it's very hard to predict the number of rows 
to return -- depends on the selectivity of the filter.
+        // We can do one of the following:
+        // 1. Add a coalescing step to coalesce the resulting batches.
+        // 2. Ask parquet reader to collect more rows before returning.
+
+        // Approach 1 has the drawback of extra overhead of coalesce batch, 
which can be painful to be efficient.
+        // Code below implements approach 2, where we keep consuming the 
selection until we select at least 3/4 of the batch size.
+        // It boils down to leveraging array_reader's ability to collect large 
batches natively,
+        //    rather than concatenating multiple small batches.
+
+        let mut selected = 0;
+        while let Some(cur_selection) =
+            take_next_selection(&mut self.selection, self.batch_size - 
selected)
+        {
+            let filtered_selection = match 
self.build_predicate_filter(cur_selection) {
+                Ok(selection) => selection,
+                Err(e) => return Some(Err(e)),
+            };
+
+            for selector in filtered_selection.iter() {
+                if selector.skip {
+                    self.array_reader.skip_records(selector.row_count).ok()?;
+                } else {
+                    self.array_reader.read_records(selector.row_count).ok()?;
+                }
+            }
+            selected += filtered_selection.row_count();
+            if selected >= (self.batch_size / 4 * 3) {
+                break;
+            }
+        }
+        if selected == 0 {
+            return None;
+        }
+
+        let array = self.array_reader.consume_batch().ok()?;
+        let struct_array = array
+            .as_struct_opt()
+            .ok_or_else(|| general_err!("Struct array reader should return 
struct array"))
+            .ok()?;
+        Some(Ok(RecordBatch::from(struct_array.clone())))
+    }
+}
+
+impl RecordBatchReader for FilteredParquetRecordBatchReader {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+struct CachedPage {
+    dict: Option<(usize, Page)>,
+    data: Option<(usize, Page)>,
+}
+
+struct PageCacheInner {
+    pages: HashMap<usize, CachedPage>, // col_id -> CachedPage
+}
+
+/// A simple cache for decompressed pages.
+/// We cache only one dictionary page and one data page per column
+pub(crate) struct PageCache {
+    inner: RwLock<PageCacheInner>,
+}
+
+impl PageCache {
+    const CAPACITY: usize = 16;

Review Comment:
   This might be clearer if it was called `INITIAL_CAPACITY` or something as I 
didn't see code that limits the overall size of the cache



##########
parquet/Cargo.toml:
##########
@@ -69,6 +69,7 @@ paste = { version = "1.0" }
 half = { version = "2.1", default-features = false, features = ["num-traits"] }
 sysinfo = { version = "0.32.0", optional = true, default-features = false, 
features = ["system"] }
 crc32fast = { version = "1.4.2", optional = true, default-features = false }
+simdutf8 = "0.1.5"

Review Comment:
   - Is this the same as https://github.com/apache/arrow-rs/pull/6668 from 
@Dandandan ?



##########
parquet/src/arrow/array_reader/primitive_array.rs:
##########
@@ -413,9 +413,12 @@ mod tests {
             );
             let page_iterator = InMemoryPageIterator::new(page_lists);
 
-            let mut array_reader =
-                
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, 
None)
-                    .unwrap();
+            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(

Review Comment:
   the changes in this file look like formatting changes to me 



##########
parquet/src/arrow/array_reader/byte_view_array.rs:
##########
@@ -307,6 +309,9 @@ impl ByteViewArrayDecoderPlain {
         num_values: Option<usize>,
         validate_utf8: bool,
     ) -> Self {
+        // Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is 
zero copy
+        // Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, 
which is also zero copy

Review Comment:
   - @kylebarron found something similar in 
https://github.com/apache/arrow-rs/pull/6920
   
   Maybe it would be helpful to make a explicit API to create an 
`arrow_buffer::Buffer` directly from `bytes::Bytes` to make it clearer how to 
do so (and that it is zero copy)



##########
parquet/src/arrow/mod.rs:
##########
@@ -210,6 +210,44 @@ impl ProjectionMask {
     pub fn leaf_included(&self, leaf_idx: usize) -> bool {
         self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
     }
+
+    /// Union two projection masks

Review Comment:
   These are nice additions -- it would be fairly straightforward to split them 
into their own PR.



##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -722,6 +747,8 @@ struct InMemoryRowGroup<'a> {
     offset_index: Option<&'a [OffsetIndexMetaData]>,
     column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
     row_count: usize,
+    cache: Arc<PageCache>,

Review Comment:
   this seems like the key change -- to cache the pages



##########
arrow-buffer/src/buffer/boolean.rs:
##########
@@ -205,6 +205,51 @@ impl BooleanBuffer {
     pub fn set_slices(&self) -> BitSliceIterator<'_> {
         BitSliceIterator::new(self.values(), self.offset, self.len)
     }
+
+    /// Combines this [`BooleanBuffer`] with another using logical AND on the 
selected bits.
+    ///
+    /// Unlike intersection, the `other` [`BooleanBuffer`] must have exactly 
as many **set bits** as `self`,
+    /// i.e., self.count_set_bits() == other.len().
+    ///
+    /// This method will keep only the bits in `self` that are also set in 
`other`
+    /// at the positions corresponding to `self`'s set bits.
+    /// For example:
+    /// self:   NNYYYNNYYNYN
+    /// other:    YNY  NY N
+    /// result: NNYNYNNNYNNN
+    pub fn and_then(&self, other: &Self) -> Self {
+        // Ensure that 'other' has exactly as many set bits as 'self'
+        debug_assert_eq!(
+            self.count_set_bits(),
+            other.len(),
+            "The 'other' selection must have exactly as many set bits as 
'self'."
+        );
+
+        if self.len() == other.len() {
+            // fast path if the two bool masks are the same length
+            // this happens when self selects all rows
+            debug_assert_eq!(self.count_set_bits(), self.len());
+            return other.clone();
+        }
+
+        let mut buffer = MutableBuffer::from_len_zeroed(self.values().len());
+        buffer.copy_from_slice(self.values());
+        let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, 
self.len());
+
+        // Create iterators for 'self' and 'other' bits
+        let mut other_bits = other.iter();

Review Comment:
   there are probably crazier ways to optimize this like iterating by 64 bits 
at a time, etc



##########
parquet/src/arrow/async_reader/arrow_reader.rs:
##########
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::hash_map::Entry;
+use std::collections::HashMap;
+use std::sync::RwLock;
+use std::{collections::VecDeque, sync::Arc};
+
+use arrow_array::ArrayRef;
+use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader};
+use arrow_schema::{ArrowError, DataType, Schema, SchemaRef};
+use arrow_select::filter::prep_null_mask_filter;
+
+use crate::basic::PageType;
+use crate::column::page::{Page, PageMetadata, PageReader};
+use crate::errors::ParquetError;
+use crate::{
+    arrow::{
+        array_reader::ArrayReader,
+        arrow_reader::{RowFilter, RowSelection, RowSelector},
+    },
+    file::reader::{ChunkReader, SerializedPageReader},
+};
+
+pub struct FilteredParquetRecordBatchReader {
+    batch_size: usize,
+    array_reader: Box<dyn ArrayReader>,
+    predicate_readers: Vec<Box<dyn ArrayReader>>,
+    schema: SchemaRef,
+    selection: VecDeque<RowSelector>,
+    row_filter: Option<RowFilter>,
+}
+
+fn read_selection(
+    reader: &mut dyn ArrayReader,
+    selection: &RowSelection,
+) -> Result<ArrayRef, ParquetError> {
+    for selector in selection.iter() {
+        if selector.skip {
+            let skipped = reader.skip_records(selector.row_count)?;
+            debug_assert_eq!(skipped, selector.row_count, "failed to skip 
rows");
+        } else {
+            let read_records = reader.read_records(selector.row_count)?;
+            debug_assert_eq!(read_records, selector.row_count, "failed to read 
rows");
+        }
+    }
+    reader.consume_batch()
+}
+
+/// Take the next selection from the selection queue, and return the selection
+/// whose selected row count is to_select or less (if input selection is 
exhausted).
+fn take_next_selection(
+    selection: &mut VecDeque<RowSelector>,
+    to_select: usize,
+) -> Option<RowSelection> {
+    let mut current_selected = 0;
+    let mut rt = Vec::new();
+    while let Some(front) = selection.pop_front() {
+        if front.skip {
+            rt.push(front);
+            continue;
+        }
+
+        if current_selected + front.row_count <= to_select {
+            rt.push(front);
+            current_selected += front.row_count;
+        } else {
+            let select = to_select - current_selected;
+            let remaining = front.row_count - select;
+            rt.push(RowSelector::select(select));
+            selection.push_front(RowSelector::select(remaining));
+
+            return Some(rt.into());
+        }
+    }
+    if !rt.is_empty() {
+        return Some(rt.into());
+    }
+    None
+}
+
+impl FilteredParquetRecordBatchReader {
+    pub(crate) fn new(
+        batch_size: usize,
+        array_reader: Box<dyn ArrayReader>,
+        selection: RowSelection,
+        filter_readers: Vec<Box<dyn ArrayReader>>,
+        row_filter: Option<RowFilter>,
+    ) -> Self {
+        let schema = match array_reader.get_data_type() {
+            DataType::Struct(ref fields) => Schema::new(fields.clone()),
+            _ => unreachable!("Struct array reader's data type is not 
struct!"),
+        };
+
+        Self {
+            batch_size,
+            array_reader,
+            predicate_readers: filter_readers,
+            schema: Arc::new(schema),
+            selection: selection.into(),
+            row_filter,
+        }
+    }
+
+    pub(crate) fn take_filter(&mut self) -> Option<RowFilter> {
+        self.row_filter.take()
+    }
+
+    #[inline(never)]
+    /// Take a selection, and return the new selection where the rows are 
filtered by the predicate.
+    fn build_predicate_filter(
+        &mut self,
+        mut selection: RowSelection,
+    ) -> Result<RowSelection, ArrowError> {
+        match &mut self.row_filter {
+            None => Ok(selection),
+            Some(filter) => {
+                debug_assert_eq!(
+                    self.predicate_readers.len(),
+                    filter.predicates.len(),
+                    "predicate readers and predicates should have the same 
length"
+                );
+
+                for (predicate, reader) in filter
+                    .predicates
+                    .iter_mut()
+                    .zip(self.predicate_readers.iter_mut())
+                {
+                    let array = read_selection(reader.as_mut(), &selection)?;
+                    let batch = 
RecordBatch::from(array.as_struct_opt().ok_or_else(|| {
+                        general_err!("Struct array reader should return struct 
array")
+                    })?);
+                    let input_rows = batch.num_rows();
+                    let predicate_filter = predicate.evaluate(batch)?;
+                    if predicate_filter.len() != input_rows {
+                        return Err(ArrowError::ParquetError(format!(
+                            "ArrowPredicate predicate returned {} rows, 
expected {input_rows}",
+                            predicate_filter.len()
+                        )));
+                    }
+                    let predicate_filter = match predicate_filter.null_count() 
{
+                        0 => predicate_filter,
+                        _ => prep_null_mask_filter(&predicate_filter),
+                    };
+                    let raw = RowSelection::from_filters(&[predicate_filter]);
+                    selection = selection.and_then(&raw);
+                }
+                Ok(selection)
+            }
+        }
+    }
+}
+
+impl Iterator for FilteredParquetRecordBatchReader {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        // With filter pushdown, it's very hard to predict the number of rows 
to return -- depends on the selectivity of the filter.
+        // We can do one of the following:
+        // 1. Add a coalescing step to coalesce the resulting batches.
+        // 2. Ask parquet reader to collect more rows before returning.
+
+        // Approach 1 has the drawback of extra overhead of coalesce batch, 
which can be painful to be efficient.
+        // Code below implements approach 2, where we keep consuming the 
selection until we select at least 3/4 of the batch size.
+        // It boils down to leveraging array_reader's ability to collect large 
batches natively,
+        //    rather than concatenating multiple small batches.
+
+        let mut selected = 0;
+        while let Some(cur_selection) =
+            take_next_selection(&mut self.selection, self.batch_size - 
selected)
+        {
+            let filtered_selection = match 
self.build_predicate_filter(cur_selection) {
+                Ok(selection) => selection,
+                Err(e) => return Some(Err(e)),
+            };
+
+            for selector in filtered_selection.iter() {
+                if selector.skip {
+                    self.array_reader.skip_records(selector.row_count).ok()?;
+                } else {
+                    self.array_reader.read_records(selector.row_count).ok()?;
+                }
+            }
+            selected += filtered_selection.row_count();
+            if selected >= (self.batch_size / 4 * 3) {
+                break;
+            }
+        }
+        if selected == 0 {
+            return None;
+        }
+
+        let array = self.array_reader.consume_batch().ok()?;
+        let struct_array = array
+            .as_struct_opt()
+            .ok_or_else(|| general_err!("Struct array reader should return 
struct array"))
+            .ok()?;
+        Some(Ok(RecordBatch::from(struct_array.clone())))
+    }
+}
+
+impl RecordBatchReader for FilteredParquetRecordBatchReader {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+struct CachedPage {
+    dict: Option<(usize, Page)>,
+    data: Option<(usize, Page)>,
+}
+
+struct PageCacheInner {
+    pages: HashMap<usize, CachedPage>, // col_id -> CachedPage
+}
+
+/// A simple cache for decompressed pages.
+/// We cache only one dictionary page and one data page per column
+pub(crate) struct PageCache {
+    inner: RwLock<PageCacheInner>,

Review Comment:
   it would help me understand how this works if we could add some comments 
about how `col_id` was assigned (is it derived from some Parquet data or is it 
assigned by the cache / reader somehow) 🤔 



##########
parquet/src/arrow/async_reader/arrow_reader.rs:
##########
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::hash_map::Entry;
+use std::collections::HashMap;
+use std::sync::RwLock;
+use std::{collections::VecDeque, sync::Arc};
+
+use arrow_array::ArrayRef;
+use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader};
+use arrow_schema::{ArrowError, DataType, Schema, SchemaRef};
+use arrow_select::filter::prep_null_mask_filter;
+
+use crate::basic::PageType;
+use crate::column::page::{Page, PageMetadata, PageReader};
+use crate::errors::ParquetError;
+use crate::{
+    arrow::{
+        array_reader::ArrayReader,
+        arrow_reader::{RowFilter, RowSelection, RowSelector},
+    },
+    file::reader::{ChunkReader, SerializedPageReader},
+};
+
+pub struct FilteredParquetRecordBatchReader {
+    batch_size: usize,
+    array_reader: Box<dyn ArrayReader>,
+    predicate_readers: Vec<Box<dyn ArrayReader>>,
+    schema: SchemaRef,
+    selection: VecDeque<RowSelector>,
+    row_filter: Option<RowFilter>,
+}
+
+fn read_selection(
+    reader: &mut dyn ArrayReader,
+    selection: &RowSelection,
+) -> Result<ArrayRef, ParquetError> {
+    for selector in selection.iter() {
+        if selector.skip {
+            let skipped = reader.skip_records(selector.row_count)?;
+            debug_assert_eq!(skipped, selector.row_count, "failed to skip 
rows");
+        } else {
+            let read_records = reader.read_records(selector.row_count)?;
+            debug_assert_eq!(read_records, selector.row_count, "failed to read 
rows");
+        }
+    }
+    reader.consume_batch()
+}
+
+/// Take the next selection from the selection queue, and return the selection
+/// whose selected row count is to_select or less (if input selection is 
exhausted).
+fn take_next_selection(
+    selection: &mut VecDeque<RowSelector>,
+    to_select: usize,
+) -> Option<RowSelection> {
+    let mut current_selected = 0;
+    let mut rt = Vec::new();
+    while let Some(front) = selection.pop_front() {
+        if front.skip {
+            rt.push(front);
+            continue;
+        }
+
+        if current_selected + front.row_count <= to_select {
+            rt.push(front);
+            current_selected += front.row_count;
+        } else {
+            let select = to_select - current_selected;
+            let remaining = front.row_count - select;
+            rt.push(RowSelector::select(select));
+            selection.push_front(RowSelector::select(remaining));
+
+            return Some(rt.into());
+        }
+    }
+    if !rt.is_empty() {
+        return Some(rt.into());
+    }
+    None
+}
+
+impl FilteredParquetRecordBatchReader {
+    pub(crate) fn new(
+        batch_size: usize,
+        array_reader: Box<dyn ArrayReader>,
+        selection: RowSelection,
+        filter_readers: Vec<Box<dyn ArrayReader>>,
+        row_filter: Option<RowFilter>,
+    ) -> Self {
+        let schema = match array_reader.get_data_type() {
+            DataType::Struct(ref fields) => Schema::new(fields.clone()),
+            _ => unreachable!("Struct array reader's data type is not 
struct!"),
+        };
+
+        Self {
+            batch_size,
+            array_reader,
+            predicate_readers: filter_readers,
+            schema: Arc::new(schema),
+            selection: selection.into(),
+            row_filter,
+        }
+    }
+
+    pub(crate) fn take_filter(&mut self) -> Option<RowFilter> {
+        self.row_filter.take()
+    }
+
+    #[inline(never)]
+    /// Take a selection, and return the new selection where the rows are 
filtered by the predicate.
+    fn build_predicate_filter(
+        &mut self,
+        mut selection: RowSelection,
+    ) -> Result<RowSelection, ArrowError> {
+        match &mut self.row_filter {
+            None => Ok(selection),
+            Some(filter) => {
+                debug_assert_eq!(
+                    self.predicate_readers.len(),
+                    filter.predicates.len(),
+                    "predicate readers and predicates should have the same 
length"
+                );
+
+                for (predicate, reader) in filter
+                    .predicates
+                    .iter_mut()
+                    .zip(self.predicate_readers.iter_mut())
+                {
+                    let array = read_selection(reader.as_mut(), &selection)?;
+                    let batch = 
RecordBatch::from(array.as_struct_opt().ok_or_else(|| {
+                        general_err!("Struct array reader should return struct 
array")
+                    })?);
+                    let input_rows = batch.num_rows();
+                    let predicate_filter = predicate.evaluate(batch)?;
+                    if predicate_filter.len() != input_rows {
+                        return Err(ArrowError::ParquetError(format!(
+                            "ArrowPredicate predicate returned {} rows, 
expected {input_rows}",
+                            predicate_filter.len()
+                        )));
+                    }
+                    let predicate_filter = match predicate_filter.null_count() 
{
+                        0 => predicate_filter,
+                        _ => prep_null_mask_filter(&predicate_filter),
+                    };
+                    let raw = RowSelection::from_filters(&[predicate_filter]);
+                    selection = selection.and_then(&raw);
+                }
+                Ok(selection)
+            }
+        }
+    }
+}
+
+impl Iterator for FilteredParquetRecordBatchReader {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        // With filter pushdown, it's very hard to predict the number of rows 
to return -- depends on the selectivity of the filter.
+        // We can do one of the following:
+        // 1. Add a coalescing step to coalesce the resulting batches.
+        // 2. Ask parquet reader to collect more rows before returning.
+
+        // Approach 1 has the drawback of extra overhead of coalesce batch, 
which can be painful to be efficient.
+        // Code below implements approach 2, where we keep consuming the 
selection until we select at least 3/4 of the batch size.
+        // It boils down to leveraging array_reader's ability to collect large 
batches natively,

Review Comment:
   I think this rationale makes sense



##########
parquet/src/arrow/async_reader/arrow_reader.rs:
##########
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::hash_map::Entry;
+use std::collections::HashMap;
+use std::sync::RwLock;
+use std::{collections::VecDeque, sync::Arc};
+
+use arrow_array::ArrayRef;
+use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader};
+use arrow_schema::{ArrowError, DataType, Schema, SchemaRef};
+use arrow_select::filter::prep_null_mask_filter;
+
+use crate::basic::PageType;
+use crate::column::page::{Page, PageMetadata, PageReader};
+use crate::errors::ParquetError;
+use crate::{
+    arrow::{
+        array_reader::ArrayReader,
+        arrow_reader::{RowFilter, RowSelection, RowSelector},
+    },
+    file::reader::{ChunkReader, SerializedPageReader},
+};
+
+pub struct FilteredParquetRecordBatchReader {
+    batch_size: usize,
+    array_reader: Box<dyn ArrayReader>,
+    predicate_readers: Vec<Box<dyn ArrayReader>>,
+    schema: SchemaRef,
+    selection: VecDeque<RowSelector>,
+    row_filter: Option<RowFilter>,
+}
+
+fn read_selection(
+    reader: &mut dyn ArrayReader,
+    selection: &RowSelection,
+) -> Result<ArrayRef, ParquetError> {
+    for selector in selection.iter() {
+        if selector.skip {
+            let skipped = reader.skip_records(selector.row_count)?;
+            debug_assert_eq!(skipped, selector.row_count, "failed to skip 
rows");
+        } else {
+            let read_records = reader.read_records(selector.row_count)?;
+            debug_assert_eq!(read_records, selector.row_count, "failed to read 
rows");
+        }
+    }
+    reader.consume_batch()
+}
+
+/// Take the next selection from the selection queue, and return the selection
+/// whose selected row count is to_select or less (if input selection is 
exhausted).
+fn take_next_selection(
+    selection: &mut VecDeque<RowSelector>,
+    to_select: usize,
+) -> Option<RowSelection> {
+    let mut current_selected = 0;
+    let mut rt = Vec::new();
+    while let Some(front) = selection.pop_front() {
+        if front.skip {
+            rt.push(front);
+            continue;
+        }
+
+        if current_selected + front.row_count <= to_select {
+            rt.push(front);
+            current_selected += front.row_count;
+        } else {
+            let select = to_select - current_selected;
+            let remaining = front.row_count - select;
+            rt.push(RowSelector::select(select));
+            selection.push_front(RowSelector::select(remaining));
+
+            return Some(rt.into());
+        }
+    }
+    if !rt.is_empty() {
+        return Some(rt.into());
+    }
+    None
+}
+
+impl FilteredParquetRecordBatchReader {
+    pub(crate) fn new(
+        batch_size: usize,
+        array_reader: Box<dyn ArrayReader>,
+        selection: RowSelection,
+        filter_readers: Vec<Box<dyn ArrayReader>>,
+        row_filter: Option<RowFilter>,
+    ) -> Self {
+        let schema = match array_reader.get_data_type() {
+            DataType::Struct(ref fields) => Schema::new(fields.clone()),
+            _ => unreachable!("Struct array reader's data type is not 
struct!"),
+        };
+
+        Self {
+            batch_size,
+            array_reader,
+            predicate_readers: filter_readers,
+            schema: Arc::new(schema),
+            selection: selection.into(),
+            row_filter,
+        }
+    }
+
+    pub(crate) fn take_filter(&mut self) -> Option<RowFilter> {
+        self.row_filter.take()
+    }
+
+    #[inline(never)]
+    /// Take a selection, and return the new selection where the rows are 
filtered by the predicate.
+    fn build_predicate_filter(
+        &mut self,
+        mut selection: RowSelection,
+    ) -> Result<RowSelection, ArrowError> {
+        match &mut self.row_filter {
+            None => Ok(selection),
+            Some(filter) => {
+                debug_assert_eq!(
+                    self.predicate_readers.len(),
+                    filter.predicates.len(),
+                    "predicate readers and predicates should have the same 
length"
+                );
+
+                for (predicate, reader) in filter
+                    .predicates
+                    .iter_mut()
+                    .zip(self.predicate_readers.iter_mut())
+                {
+                    let array = read_selection(reader.as_mut(), &selection)?;
+                    let batch = 
RecordBatch::from(array.as_struct_opt().ok_or_else(|| {
+                        general_err!("Struct array reader should return struct 
array")
+                    })?);
+                    let input_rows = batch.num_rows();
+                    let predicate_filter = predicate.evaluate(batch)?;
+                    if predicate_filter.len() != input_rows {
+                        return Err(ArrowError::ParquetError(format!(
+                            "ArrowPredicate predicate returned {} rows, 
expected {input_rows}",
+                            predicate_filter.len()
+                        )));
+                    }
+                    let predicate_filter = match predicate_filter.null_count() 
{
+                        0 => predicate_filter,
+                        _ => prep_null_mask_filter(&predicate_filter),
+                    };
+                    let raw = RowSelection::from_filters(&[predicate_filter]);
+                    selection = selection.and_then(&raw);
+                }
+                Ok(selection)
+            }
+        }
+    }
+}
+
+impl Iterator for FilteredParquetRecordBatchReader {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        // With filter pushdown, it's very hard to predict the number of rows 
to return -- depends on the selectivity of the filter.
+        // We can do one of the following:
+        // 1. Add a coalescing step to coalesce the resulting batches.
+        // 2. Ask parquet reader to collect more rows before returning.
+
+        // Approach 1 has the drawback of extra overhead of coalesce batch, 
which can be painful to be efficient.
+        // Code below implements approach 2, where we keep consuming the 
selection until we select at least 3/4 of the batch size.
+        // It boils down to leveraging array_reader's ability to collect large 
batches natively,
+        //    rather than concatenating multiple small batches.
+
+        let mut selected = 0;
+        while let Some(cur_selection) =
+            take_next_selection(&mut self.selection, self.batch_size - 
selected)
+        {
+            let filtered_selection = match 
self.build_predicate_filter(cur_selection) {
+                Ok(selection) => selection,
+                Err(e) => return Some(Err(e)),
+            };
+
+            for selector in filtered_selection.iter() {
+                if selector.skip {
+                    self.array_reader.skip_records(selector.row_count).ok()?;
+                } else {
+                    self.array_reader.read_records(selector.row_count).ok()?;
+                }
+            }
+            selected += filtered_selection.row_count();
+            if selected >= (self.batch_size / 4 * 3) {
+                break;
+            }
+        }
+        if selected == 0 {
+            return None;
+        }
+
+        let array = self.array_reader.consume_batch().ok()?;
+        let struct_array = array
+            .as_struct_opt()
+            .ok_or_else(|| general_err!("Struct array reader should return 
struct array"))
+            .ok()?;
+        Some(Ok(RecordBatch::from(struct_array.clone())))
+    }
+}
+
+impl RecordBatchReader for FilteredParquetRecordBatchReader {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+struct CachedPage {
+    dict: Option<(usize, Page)>,

Review Comment:
   I think these are `(offset, Page)` tuples which might be nice to explain in 
comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to