This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dd0378e3 Simplify parquet PageIterator (#4306)
6dd0378e3 is described below

commit 6dd0378e33926f756d142f04d3e5cbc984ad258d
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue May 30 15:15:52 2023 +0100

    Simplify parquet PageIterator (#4306)
---
 parquet/benches/arrow_reader.rs                   | 43 +++--------------------
 parquet/src/arrow/array_reader/primitive_array.rs | 25 ++++---------
 parquet/src/arrow/array_reader/test_util.rs       | 25 +++----------
 parquet/src/arrow/async_reader/mod.rs             | 16 ++-------
 parquet/src/column/page.rs                        |  9 +----
 parquet/src/file/reader.rs                        | 16 ++-------
 parquet/src/util/test_common/page_util.rs         | 22 ++----------
 7 files changed, 24 insertions(+), 132 deletions(-)

diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index f6f65bea8..3dda6304d 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -76,7 +76,6 @@ pub fn seedable_rng() -> StdRng {
 
 // support byte array for decimal
 fn build_encoded_decimal_bytes_page_iterator<T>(
-    schema: SchemaDescPtr,
     column_desc: ColumnDescPtr,
     null_density: f32,
     encoding: Encoding,
@@ -136,11 +135,10 @@ where
         }
         pages.push(column_chunk_pages);
     }
-    InMemoryPageIterator::new(schema, column_desc, pages)
+    InMemoryPageIterator::new(pages)
 }
 
 fn build_encoded_primitive_page_iterator<T>(
-    schema: SchemaDescPtr,
     column_desc: ColumnDescPtr,
     null_density: f32,
     encoding: Encoding,
@@ -185,11 +183,10 @@ where
         pages.push(column_chunk_pages);
     }
 
-    InMemoryPageIterator::new(schema, column_desc, pages)
+    InMemoryPageIterator::new(pages)
 }
 
 fn build_dictionary_encoded_primitive_page_iterator<T>(
-    schema: SchemaDescPtr,
     column_desc: ColumnDescPtr,
     null_density: f32,
 ) -> impl PageIterator + Clone
@@ -254,11 +251,10 @@ where
         pages.push(column_chunk_pages.into());
     }
 
-    InMemoryPageIterator::new(schema, column_desc, pages)
+    InMemoryPageIterator::new(pages)
 }
 
 fn build_plain_encoded_string_page_iterator(
-    schema: SchemaDescPtr,
     column_desc: ColumnDescPtr,
     null_density: f32,
 ) -> impl PageIterator + Clone {
@@ -297,11 +293,10 @@ fn build_plain_encoded_string_page_iterator(
         pages.push(column_chunk_pages);
     }
 
-    InMemoryPageIterator::new(schema, column_desc, pages)
+    InMemoryPageIterator::new(pages)
 }
 
 fn build_dictionary_encoded_string_page_iterator(
-    schema: SchemaDescPtr,
     column_desc: ColumnDescPtr,
     null_density: f32,
 ) -> impl PageIterator + Clone {
@@ -363,7 +358,7 @@ fn build_dictionary_encoded_string_page_iterator(
         pages.push(column_chunk_pages.into());
     }
 
-    InMemoryPageIterator::new(schema, column_desc, pages)
+    InMemoryPageIterator::new(pages)
 }
 
 fn bench_array_reader(mut array_reader: Box<dyn ArrayReader>) -> usize {
@@ -471,7 +466,6 @@ fn create_string_byte_array_dictionary_reader(
 
 fn bench_byte_decimal<T>(
     group: &mut BenchmarkGroup<WallTime>,
-    schema: &SchemaDescPtr,
     mandatory_column_desc: &ColumnDescPtr,
     optional_column_desc: &ColumnDescPtr,
     min: i128,
@@ -485,7 +479,6 @@ fn bench_byte_decimal<T>(
 
     // plain encoded, no NULLs
     let data = build_encoded_decimal_bytes_page_iterator::<T>(
-        schema.clone(),
         mandatory_column_desc.clone(),
         0.0,
         Encoding::PLAIN,
@@ -504,7 +497,6 @@ fn bench_byte_decimal<T>(
     });
 
     let data = build_encoded_decimal_bytes_page_iterator::<T>(
-        schema.clone(),
         optional_column_desc.clone(),
         0.0,
         Encoding::PLAIN,
@@ -524,7 +516,6 @@ fn bench_byte_decimal<T>(
 
     // half null
     let data = build_encoded_decimal_bytes_page_iterator::<T>(
-        schema.clone(),
         optional_column_desc.clone(),
         0.5,
         Encoding::PLAIN,
@@ -545,7 +536,6 @@ fn bench_byte_decimal<T>(
 
 fn bench_primitive<T>(
     group: &mut BenchmarkGroup<WallTime>,
-    schema: &SchemaDescPtr,
     mandatory_column_desc: &ColumnDescPtr,
     optional_column_desc: &ColumnDescPtr,
     min: usize,
@@ -558,7 +548,6 @@ fn bench_primitive<T>(
 
     // plain encoded, no NULLs
     let data = build_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         mandatory_column_desc.clone(),
         0.0,
         Encoding::PLAIN,
@@ -577,7 +566,6 @@ fn bench_primitive<T>(
     });
 
     let data = build_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         optional_column_desc.clone(),
         0.0,
         Encoding::PLAIN,
@@ -595,7 +583,6 @@ fn bench_primitive<T>(
 
     // plain encoded, half NULLs
     let data = build_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         optional_column_desc.clone(),
         0.5,
         Encoding::PLAIN,
@@ -613,7 +600,6 @@ fn bench_primitive<T>(
 
     // binary packed, no NULLs
     let data = build_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         mandatory_column_desc.clone(),
         0.0,
         Encoding::DELTA_BINARY_PACKED,
@@ -632,7 +618,6 @@ fn bench_primitive<T>(
     });
 
     let data = build_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         optional_column_desc.clone(),
         0.0,
         Encoding::DELTA_BINARY_PACKED,
@@ -650,7 +635,6 @@ fn bench_primitive<T>(
 
     // binary packed skip , no NULLs
     let data = build_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         mandatory_column_desc.clone(),
         0.0,
         Encoding::DELTA_BINARY_PACKED,
@@ -669,7 +653,6 @@ fn bench_primitive<T>(
     });
 
     let data = build_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         optional_column_desc.clone(),
         0.0,
         Encoding::DELTA_BINARY_PACKED,
@@ -687,7 +670,6 @@ fn bench_primitive<T>(
 
     // binary packed, half NULLs
     let data = build_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         optional_column_desc.clone(),
         0.5,
         Encoding::DELTA_BINARY_PACKED,
@@ -705,7 +687,6 @@ fn bench_primitive<T>(
 
     // dictionary encoded, no NULLs
     let data = build_dictionary_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         mandatory_column_desc.clone(),
         0.0,
     );
@@ -721,7 +702,6 @@ fn bench_primitive<T>(
     });
 
     let data = build_dictionary_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         optional_column_desc.clone(),
         0.0,
     );
@@ -736,7 +716,6 @@ fn bench_primitive<T>(
 
     // dictionary encoded, half NULLs
     let data = build_dictionary_encoded_primitive_page_iterator::<T>(
-        schema.clone(),
         optional_column_desc.clone(),
         0.5,
     );
@@ -758,7 +737,6 @@ fn decimal_benches(c: &mut Criterion) {
     let mut group = 
c.benchmark_group("arrow_array_reader/INT32/Decimal128Array");
     bench_primitive::<Int32Type>(
         &mut group,
-        &schema,
         &mandatory_decimal1_leaf_desc,
         &optional_decimal1_leaf_desc,
         // precision is 8: the max is 99999999
@@ -773,7 +751,6 @@ fn decimal_benches(c: &mut Criterion) {
     let optional_decimal2_leaf_desc = schema.column(9);
     bench_primitive::<Int64Type>(
         &mut group,
-        &schema,
         &mandatory_decimal2_leaf_desc,
         &optional_decimal2_leaf_desc,
         // precision is 16: the max is 9999999999999999
@@ -788,7 +765,6 @@ fn decimal_benches(c: &mut Criterion) {
     let optional_decimal3_leaf_desc = schema.column(11);
     bench_byte_decimal::<ByteArrayType>(
         &mut group,
-        &schema,
         &mandatory_decimal3_leaf_desc,
         &optional_decimal3_leaf_desc,
         // precision is 16: the max is 9999999999999999
@@ -803,7 +779,6 @@ fn decimal_benches(c: &mut Criterion) {
     let optional_decimal4_leaf_desc = schema.column(13);
     bench_byte_decimal::<FixedLenByteArrayType>(
         &mut group,
-        &schema,
         &mandatory_decimal4_leaf_desc,
         &optional_decimal4_leaf_desc,
         // precision is 16: the max is 9999999999999999
@@ -829,7 +804,6 @@ fn add_benches(c: &mut Criterion) {
     let mut group = c.benchmark_group("arrow_array_reader/Int32Array");
     bench_primitive::<Int32Type>(
         &mut group,
-        &schema,
         &mandatory_int32_column_desc,
         &optional_int32_column_desc,
         0,
@@ -843,7 +817,6 @@ fn add_benches(c: &mut Criterion) {
     let mut group = c.benchmark_group("arrow_array_reader/Int64Array");
     bench_primitive::<Int64Type>(
         &mut group,
-        &schema,
         &mandatory_int64_column_desc,
         &optional_int64_column_desc,
         0,
@@ -858,7 +831,6 @@ fn add_benches(c: &mut Criterion) {
 
     // string, plain encoded, no NULLs
     let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
-        schema.clone(),
         mandatory_string_column_desc.clone(),
         0.0,
     );
@@ -874,7 +846,6 @@ fn add_benches(c: &mut Criterion) {
     });
 
     let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
-        schema.clone(),
         optional_string_column_desc.clone(),
         0.0,
     );
@@ -891,7 +862,6 @@ fn add_benches(c: &mut Criterion) {
 
     // string, plain encoded, half NULLs
     let plain_string_half_null_data = build_plain_encoded_string_page_iterator(
-        schema.clone(),
         optional_string_column_desc.clone(),
         0.5,
     );
@@ -908,7 +878,6 @@ fn add_benches(c: &mut Criterion) {
 
     // string, dictionary encoded, no NULLs
     let dictionary_string_no_null_data = 
build_dictionary_encoded_string_page_iterator(
-        schema.clone(),
         mandatory_string_column_desc.clone(),
         0.0,
     );
@@ -924,7 +893,6 @@ fn add_benches(c: &mut Criterion) {
     });
 
     let dictionary_string_no_null_data = 
build_dictionary_encoded_string_page_iterator(
-        schema.clone(),
         optional_string_column_desc.clone(),
         0.0,
     );
@@ -941,7 +909,6 @@ fn add_benches(c: &mut Criterion) {
 
     // string, dictionary encoded, half NULLs
     let dictionary_string_half_null_data = 
build_dictionary_encoded_string_page_iterator(
-        schema.clone(),
         optional_string_column_desc.clone(),
         0.5,
     );
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index 1e2720a4a..bef27dc7a 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -363,12 +363,9 @@ mod tests {
             .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
             .unwrap();
 
-        let column_desc = schema.column(0);
-        let page_iterator = EmptyPageIterator::new(schema);
-
         let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
-            Box::new(page_iterator),
-            column_desc,
+            Box::<EmptyPageIterator>::default(),
+            schema.column(0),
             None,
         )
         .unwrap();
@@ -410,8 +407,7 @@ mod tests {
                 true,
                 2,
             );
-            let page_iterator =
-                InMemoryPageIterator::new(schema, column_desc.clone(), 
page_lists);
+            let page_iterator = InMemoryPageIterator::new(page_lists);
 
             let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
                 Box::new(page_iterator),
@@ -474,11 +470,7 @@ mod tests {
                     true,
                     2,
                 );
-                let page_iterator = InMemoryPageIterator::new(
-                    schema.clone(),
-                    column_desc.clone(),
-                    page_lists,
-                );
+                let page_iterator = InMemoryPageIterator::new(page_lists);
                 let mut array_reader = 
PrimitiveArrayReader::<$arrow_parquet_type>::new(
                     Box::new(page_iterator),
                     column_desc.clone(),
@@ -610,8 +602,7 @@ mod tests {
                 2,
             );
 
-            let page_iterator =
-                InMemoryPageIterator::new(schema, column_desc.clone(), 
page_lists);
+            let page_iterator = InMemoryPageIterator::new(page_lists);
 
             let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
                 Box::new(page_iterator),
@@ -690,8 +681,7 @@ mod tests {
                 true,
                 2,
             );
-            let page_iterator =
-                InMemoryPageIterator::new(schema, column_desc.clone(), 
page_lists);
+            let page_iterator = InMemoryPageIterator::new(page_lists);
 
             let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
                 Box::new(page_iterator),
@@ -753,8 +743,7 @@ mod tests {
                 true,
                 2,
             );
-            let page_iterator =
-                InMemoryPageIterator::new(schema, column_desc.clone(), 
page_lists);
+            let page_iterator = InMemoryPageIterator::new(page_lists);
 
             let mut array_reader = PrimitiveArrayReader::<Int64Type>::new(
                 Box::new(page_iterator),
diff --git a/parquet/src/arrow/array_reader/test_util.rs 
b/parquet/src/arrow/array_reader/test_util.rs
index 6585d4614..7e66efead 100644
--- a/parquet/src/arrow/array_reader/test_util.rs
+++ b/parquet/src/arrow/array_reader/test_util.rs
@@ -26,9 +26,7 @@ use crate::column::page::{PageIterator, PageReader};
 use crate::data_type::{ByteArray, ByteArrayType};
 use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
 use crate::errors::Result;
-use crate::schema::types::{
-    ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type,
-};
+use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type};
 use crate::util::memory::ByteBufferPtr;
 
 /// Returns a descriptor for a UTF-8 column
@@ -197,15 +195,8 @@ impl ArrayReader for InMemoryArrayReader {
 }
 
 /// Iterator for testing reading empty columns
-pub struct EmptyPageIterator {
-    schema: SchemaDescPtr,
-}
-
-impl EmptyPageIterator {
-    pub fn new(schema: SchemaDescPtr) -> Self {
-        EmptyPageIterator { schema }
-    }
-}
+#[derive(Default)]
+pub struct EmptyPageIterator {}
 
 impl Iterator for EmptyPageIterator {
     type Item = Result<Box<dyn PageReader>>;
@@ -215,12 +206,4 @@ impl Iterator for EmptyPageIterator {
     }
 }
 
-impl PageIterator for EmptyPageIterator {
-    fn schema(&mut self) -> Result<SchemaDescPtr> {
-        Ok(self.schema.clone())
-    }
-
-    fn column_schema(&mut self) -> Result<ColumnDescPtr> {
-        Ok(self.schema.column(0))
-    }
-}
+impl PageIterator for EmptyPageIterator {}
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index fb81a2b5d..c11033eae 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -112,7 +112,7 @@ use crate::format::PageLocation;
 
 use crate::file::FOOTER_SIZE;
 
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+use crate::schema::types::SchemaDescPtr;
 
 mod metadata;
 pub use metadata::*;
@@ -673,8 +673,6 @@ impl<'a> RowGroupCollection for InMemoryRowGroup<'a> {
                     )?);
 
                 Ok(Box::new(ColumnChunkIterator {
-                    schema: self.metadata.schema_descr_ptr(),
-                    column_schema: 
self.metadata.schema_descr_ptr().columns()[i].clone(),
                     reader: Some(Ok(page_reader)),
                 }))
             }
@@ -739,8 +737,6 @@ impl ChunkReader for ColumnChunkData {
 
 /// Implements [`PageIterator`] for a single column chunk, yielding a single 
[`PageReader`]
 struct ColumnChunkIterator {
-    schema: SchemaDescPtr,
-    column_schema: ColumnDescPtr,
     reader: Option<Result<Box<dyn PageReader>>>,
 }
 
@@ -752,15 +748,7 @@ impl Iterator for ColumnChunkIterator {
     }
 }
 
-impl PageIterator for ColumnChunkIterator {
-    fn schema(&mut self) -> Result<SchemaDescPtr> {
-        Ok(self.schema.clone())
-    }
-
-    fn column_schema(&mut self) -> Result<ColumnDescPtr> {
-        Ok(self.column_schema.clone())
-    }
-}
+impl PageIterator for ColumnChunkIterator {}
 
 #[cfg(test)]
 mod tests {
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 57a0278e2..3b19734a2 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -21,7 +21,6 @@ use crate::basic::{Encoding, PageType};
 use crate::errors::{ParquetError, Result};
 use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
 use crate::format::PageHeader;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
 use crate::util::memory::ByteBufferPtr;
 
 /// Parquet Page definition.
@@ -338,13 +337,7 @@ pub trait PageWriter: Send {
 }
 
 /// An iterator over pages of one specific column in a parquet file.
-pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {
-    /// Get schema of parquet file.
-    fn schema(&mut self) -> Result<SchemaDescPtr>;
-
-    /// Get column schema of this page iterator.
-    fn column_schema(&mut self) -> Result<ColumnDescPtr>;
-}
+pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
 
 #[cfg(test)]
 mod tests {
diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs
index 6a7bbc78f..7d2d7ea15 100644
--- a/parquet/src/file/reader.rs
+++ b/parquet/src/file/reader.rs
@@ -31,7 +31,7 @@ use crate::errors::{ParquetError, Result};
 use crate::file::metadata::*;
 pub use crate::file::serialized_reader::{SerializedFileReader, 
SerializedPageReader};
 use crate::record::reader::RowIter;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, Type as SchemaType};
+use crate::schema::types::Type as SchemaType;
 
 use crate::basic::Type;
 
@@ -264,16 +264,4 @@ impl Iterator for FilePageIterator {
     }
 }
 
-impl PageIterator for FilePageIterator {
-    fn schema(&mut self) -> Result<SchemaDescPtr> {
-        Ok(self
-            .file_reader
-            .metadata()
-            .file_metadata()
-            .schema_descr_ptr())
-    }
-
-    fn column_schema(&mut self) -> Result<ColumnDescPtr> {
-        self.schema().map(|s| s.column(self.column_index))
-    }
-}
+impl PageIterator for FilePageIterator {}
diff --git a/parquet/src/util/test_common/page_util.rs 
b/parquet/src/util/test_common/page_util.rs
index 243fb6f8b..ab5287462 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -22,7 +22,7 @@ use crate::data_type::DataType;
 use crate::encodings::encoding::{get_encoder, Encoder};
 use crate::encodings::levels::LevelEncoder;
 use crate::errors::Result;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+use crate::schema::types::ColumnDescPtr;
 use crate::util::memory::ByteBufferPtr;
 use std::iter::Peekable;
 use std::mem;
@@ -204,20 +204,12 @@ impl<P: Iterator<Item = Page> + Send> Iterator for 
InMemoryPageReader<P> {
 /// A utility page iterator which stores page readers in memory, used for 
tests.
 #[derive(Clone)]
 pub struct InMemoryPageIterator<I: Iterator<Item = Vec<Page>>> {
-    schema: SchemaDescPtr,
-    column_desc: ColumnDescPtr,
     page_reader_iter: I,
 }
 
 impl<I: Iterator<Item = Vec<Page>>> InMemoryPageIterator<I> {
-    pub fn new(
-        schema: SchemaDescPtr,
-        column_desc: ColumnDescPtr,
-        pages: impl IntoIterator<Item = Vec<Page>, IntoIter = I>,
-    ) -> Self {
+    pub fn new(pages: impl IntoIterator<Item = Vec<Page>, IntoIter = I>) -> 
Self {
         Self {
-            schema,
-            column_desc,
             page_reader_iter: pages.into_iter(),
         }
     }
@@ -233,12 +225,4 @@ impl<I: Iterator<Item = Vec<Page>>> Iterator for 
InMemoryPageIterator<I> {
     }
 }
 
-impl<I: Iterator<Item = Vec<Page>> + Send> PageIterator for 
InMemoryPageIterator<I> {
-    fn schema(&mut self) -> Result<SchemaDescPtr> {
-        Ok(self.schema.clone())
-    }
-
-    fn column_schema(&mut self) -> Result<ColumnDescPtr> {
-        Ok(self.column_desc.clone())
-    }
-}
+impl<I: Iterator<Item = Vec<Page>> + Send> PageIterator for 
InMemoryPageIterator<I> {}

Reply via email to