alamb commented on code in PR #2435:
URL: https://github.com/apache/arrow-rs/pull/2435#discussion_r945699776


##########
parquet/src/arrow/array_reader/mod.rs:
##########
@@ -124,6 +124,49 @@ impl RowGroupCollection for Arc<dyn FileReader> {
     }
 }
 
+pub(crate) struct FileReaderRowGroupCollection {
+    reader: Arc<dyn FileReader>,
+    row_groups: Option<Vec<usize>>,

Review Comment:
   I think it would help to document what `usize` means here -- I assume it is 
the index of the row group within the parquet file? And that if this is None, 
all row groups will be read?



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -194,112 +194,23 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> 
AsyncFileReader for T {
     }
 }
 
+#[doc(hidden)]
+/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers 
from async

Review Comment:
   ```suggestion
   /// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync 
readers from async
   /// Allows sharing the same builder for both the sync and async versions, 
whilst also not 
   /// breaking the existing ParquetRecordBatchStreamBuilder API
   ```



##########
parquet/src/arrow/schema.rs:
##########
@@ -1662,14 +1662,9 @@ mod tests {
         writer.close()?;
 
         // read file back
-        let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
-        let read_schema = arrow_reader.get_schema()?;
-        assert_eq!(schema, read_schema);
-
-        // read all fields by columns

Review Comment:
   Isn't the usecase (and test) of reading a partial schema still valid?



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -48,9 +50,127 @@ pub(crate) use filter::{ArrowPredicate, ArrowPredicateFn, 
RowFilter};
 #[allow(unused_imports)]
 pub(crate) use selection::{RowSelection, RowSelector};
 
+/// A generic builder for constructing sync or async arrow parquet readers. 
This is not intended
+/// to be used directly, instead you should use the specialization for the 
type of reader
+/// you wish to use
+///
+/// * For a synchronous API - [`ParquetRecordBatchReaderBuilder`]
+/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`]
+///
+/// [`ParquetRecordBatchStreamBuilder`]: 
[crate::arrow::async_reader::ParquetRecordBatchStreamBuilder]
+pub struct ArrowReaderBuilder<T> {
+    pub(crate) input: T,
+
+    pub(crate) metadata: Arc<ParquetMetaData>,
+
+    pub(crate) schema: SchemaRef,
+
+    pub(crate) batch_size: usize,
+
+    pub(crate) row_groups: Option<Vec<usize>>,
+
+    pub(crate) projection: ProjectionMask,
+
+    pub(crate) filter: Option<RowFilter>,
+
+    pub(crate) selection: Option<RowSelection>,
+}
+
+impl<T> ArrowReaderBuilder<T> {

Review Comment:
   This is looking like a very nice api 👌  👨‍🍳 
   
   🎩 tip to you @tustvold @Ted-Jiang  and @thinkharderdev  for this. Very cool



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -48,9 +50,127 @@ pub(crate) use filter::{ArrowPredicate, ArrowPredicateFn, 
RowFilter};
 #[allow(unused_imports)]
 pub(crate) use selection::{RowSelection, RowSelector};
 
+/// A generic builder for constructing sync or async arrow parquet readers. 
This is not intended
+/// to be used directly, instead you should use the specialization for the 
type of reader
+/// you wish to use
+///
+/// * For a synchronous API - [`ParquetRecordBatchReaderBuilder`]
+/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`]
+///
+/// [`ParquetRecordBatchStreamBuilder`]: 
[crate::arrow::async_reader::ParquetRecordBatchStreamBuilder]
+pub struct ArrowReaderBuilder<T> {
+    pub(crate) input: T,
+
+    pub(crate) metadata: Arc<ParquetMetaData>,
+
+    pub(crate) schema: SchemaRef,
+
+    pub(crate) batch_size: usize,
+
+    pub(crate) row_groups: Option<Vec<usize>>,
+
+    pub(crate) projection: ProjectionMask,
+
+    pub(crate) filter: Option<RowFilter>,
+
+    pub(crate) selection: Option<RowSelection>,
+}
+
+impl<T> ArrowReaderBuilder<T> {
+    pub(crate) fn new_builder(
+        input: T,
+        metadata: Arc<ParquetMetaData>,
+        options: ArrowReaderOptions,
+    ) -> Result<Self> {
+        let kv_metadata = match options.skip_arrow_metadata {
+            true => None,
+            false => metadata.file_metadata().key_value_metadata(),
+        };
+
+        let schema = Arc::new(parquet_to_arrow_schema(
+            metadata.file_metadata().schema_descr(),
+            kv_metadata,
+        )?);
+
+        Ok(Self {
+            input,
+            metadata,
+            schema,
+            batch_size: 1024,
+            row_groups: None,
+            projection: ProjectionMask::all(),
+            filter: None,
+            selection: None,
+        })
+    }
+
+    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
+    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
+        &self.metadata
+    }
+
+    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
+    pub fn parquet_schema(&self) -> &SchemaDescriptor {
+        self.metadata.file_metadata().schema_descr()
+    }
+
+    /// Returns the arrow [`SchemaRef`] for this parquet file
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// Set the size of [`RecordBatch`] to produce
+    pub fn with_batch_size(self, batch_size: usize) -> Self {
+        Self { batch_size, ..self }
+    }
+
+    /// Only read data from the provided row group indexes
+    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
+        Self {
+            row_groups: Some(row_groups),
+            ..self
+        }
+    }
+
+    /// Only read data from the provided column indexes
+    pub fn with_projection(self, mask: ProjectionMask) -> Self {
+        Self {
+            projection: mask,
+            ..self
+        }
+    }
+
+    /// Provide a [`RowSelection] to filter out rows, and avoid fetching their
+    /// data into memory
+    ///
+    /// Row group filtering is applied prior to this, and rows from skipped
+    /// row groups should not be included in the [`RowSelection`]
+    ///
+    /// TODO: Make public once stable (#1792)
+    #[allow(unused)]
+    pub(crate) fn with_row_selection(self, selection: RowSelection) -> Self {
+        Self {
+            selection: Some(selection),
+            ..self
+        }
+    }
+
+    /// Provide a [`RowFilter`] to skip decoding rows

Review Comment:
   ```suggestion
       /// Provide a [`RowFilter`] to skip decoding rows. Row filters are 
applied 
       /// after row group selection and row selection
   ```



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -84,10 +204,14 @@ pub trait ArrowReader {
     ) -> Result<Self::RecordReader>;
 }
 
+/// Options that control how metadata is read for a parquet file
+///
+/// See [`ArrowReaderBuilder`] for how to configure how the column data
+/// is then read from the file, including projection and filter pushdown
 #[derive(Debug, Clone, Default)]
 pub struct ArrowReaderOptions {
     skip_arrow_metadata: bool,
-    selection: Option<RowSelection>,
+    page_index: bool,

Review Comment:
   ```suggestion
       /// if true, forces decoding of the page index for all row groups
       /// as the group selection isn't known at the point the metadata is read
       page_index: bool,
   ```



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -48,9 +50,127 @@ pub(crate) use filter::{ArrowPredicate, ArrowPredicateFn, 
RowFilter};
 #[allow(unused_imports)]
 pub(crate) use selection::{RowSelection, RowSelector};
 
+/// A generic builder for constructing sync or async arrow parquet readers. 
This is not intended
+/// to be used directly, instead you should use the specialization for the 
type of reader
+/// you wish to use
+///
+/// * For a synchronous API - [`ParquetRecordBatchReaderBuilder`]
+/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`]
+///
+/// [`ParquetRecordBatchStreamBuilder`]: 
[crate::arrow::async_reader::ParquetRecordBatchStreamBuilder]

Review Comment:
   Eventually it would be great it update the examples to use this (much nicer) 
API as well: https://docs.rs/parquet/20.0.0/parquet/arrow/index.html



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -48,9 +50,127 @@ pub(crate) use filter::{ArrowPredicate, ArrowPredicateFn, 
RowFilter};
 #[allow(unused_imports)]
 pub(crate) use selection::{RowSelection, RowSelector};
 
+/// A generic builder for constructing sync or async arrow parquet readers. 
This is not intended
+/// to be used directly, instead you should use the specialization for the 
type of reader
+/// you wish to use
+///
+/// * For a synchronous API - [`ParquetRecordBatchReaderBuilder`]
+/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`]
+///
+/// [`ParquetRecordBatchStreamBuilder`]: 
[crate::arrow::async_reader::ParquetRecordBatchStreamBuilder]
+pub struct ArrowReaderBuilder<T> {
+    pub(crate) input: T,
+
+    pub(crate) metadata: Arc<ParquetMetaData>,
+
+    pub(crate) schema: SchemaRef,
+
+    pub(crate) batch_size: usize,
+
+    pub(crate) row_groups: Option<Vec<usize>>,
+
+    pub(crate) projection: ProjectionMask,
+
+    pub(crate) filter: Option<RowFilter>,
+
+    pub(crate) selection: Option<RowSelection>,
+}
+
+impl<T> ArrowReaderBuilder<T> {
+    pub(crate) fn new_builder(
+        input: T,
+        metadata: Arc<ParquetMetaData>,
+        options: ArrowReaderOptions,
+    ) -> Result<Self> {
+        let kv_metadata = match options.skip_arrow_metadata {
+            true => None,
+            false => metadata.file_metadata().key_value_metadata(),
+        };
+
+        let schema = Arc::new(parquet_to_arrow_schema(
+            metadata.file_metadata().schema_descr(),
+            kv_metadata,
+        )?);
+
+        Ok(Self {
+            input,
+            metadata,
+            schema,
+            batch_size: 1024,
+            row_groups: None,
+            projection: ProjectionMask::all(),
+            filter: None,
+            selection: None,
+        })
+    }
+
+    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
+    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
+        &self.metadata
+    }
+
+    /// Returns the parquet [`SchemaDescriptor`] for this parquet file
+    pub fn parquet_schema(&self) -> &SchemaDescriptor {
+        self.metadata.file_metadata().schema_descr()
+    }
+
+    /// Returns the arrow [`SchemaRef`] for this parquet file
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// Set the size of [`RecordBatch`] to produce

Review Comment:
   ```suggestion
       /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
   ```



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -109,27 +233,29 @@ impl ArrowReaderOptions {
         }
     }
 
-    /// Scan rows from the parquet file according to the provided `selection`
+    /// Set this true to enable decoding of the [PageIndex] if present. This 
can be used
+    /// to push down predicates to the parquet scan, potentially eliminating 
unnecessary IO
     ///
-    /// TODO: Revisit this API, as [`Self`] is provided before the file 
metadata is available
-    #[allow(unused)]
-    pub(crate) fn with_row_selection(self, selection: impl Into<RowSelection>) 
-> Self {

Review Comment:
   Isn't it moved to `ArrowReaderBuilder::with_row_selection`?



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1309,20 +1464,21 @@ mod tests {
         let testdata = arrow::util::test_util::parquet_test_data();
         let path = format!("{}/nested_structs.rust.parquet", testdata);
         let file = File::open(&path).unwrap();
-        let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
-        let record_batch_reader = arrow_reader
-            .get_record_reader(60)
-            .expect("Failed to read into array!");
+        let record_batch_reader = ParquetRecordBatchReader::try_new(file, 
60).unwrap();
 
         for batch in record_batch_reader {
             batch.unwrap();
         }
 
-        let mask = ProjectionMask::leaves(arrow_reader.parquet_schema(), [3, 
8, 10]);
-        let projected_reader = arrow_reader
-            .get_record_reader_by_columns(mask.clone(), 60)
+        let file = File::open(&path).unwrap();
+        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+
+        let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 
10]);

Review Comment:
   👌  very nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to