This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new c757829b7 Simplify ColumnReader::read_batch (#1995)
c757829b7 is described below

commit c757829b76ab95af18850fa173434d9747843849
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Jul 5 13:39:25 2022 +0100

    Simplify ColumnReader::read_batch (#1995)
    
    Miscellaneous parquet cleanups
---
 parquet/src/arrow/array_reader/byte_array.rs       |   8 +-
 .../arrow/array_reader/byte_array_dictionary.rs    |   8 +-
 parquet/src/arrow/array_reader/null_array.rs       |   6 +-
 parquet/src/arrow/array_reader/primitive_array.rs  |   8 +-
 parquet/src/arrow/arrow_reader.rs                  |   1 -
 parquet/src/arrow/record_reader/mod.rs             | 108 ++++---------
 parquet/src/column/reader.rs                       | 176 +++++++--------------
 parquet/src/util/test_common/page_util.rs          |   6 +-
 8 files changed, 107 insertions(+), 214 deletions(-)

diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index 9e0f83fa9..95620d940 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -113,10 +113,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for 
ByteArrayReader<I> {
 
     fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
         read_records(&mut self.record_reader, self.pages.as_mut(), 
batch_size)?;
-        let buffer = self.record_reader.consume_record_data()?;
-        let null_buffer = self.record_reader.consume_bitmap_buffer()?;
-        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
-        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+        let buffer = self.record_reader.consume_record_data();
+        let null_buffer = self.record_reader.consume_bitmap_buffer();
+        self.def_levels_buffer = self.record_reader.consume_def_levels();
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
         self.record_reader.reset();
 
         Ok(buffer.into_array(null_buffer, self.data_type.clone()))
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs 
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 0cd67206f..77f7916ed 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -173,12 +173,12 @@ where
 
     fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
         read_records(&mut self.record_reader, self.pages.as_mut(), 
batch_size)?;
-        let buffer = self.record_reader.consume_record_data()?;
-        let null_buffer = self.record_reader.consume_bitmap_buffer()?;
+        let buffer = self.record_reader.consume_record_data();
+        let null_buffer = self.record_reader.consume_bitmap_buffer();
         let array = buffer.into_array(null_buffer, &self.data_type)?;
 
-        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
-        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+        self.def_levels_buffer = self.record_reader.consume_def_levels();
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
         self.record_reader.reset();
 
         Ok(array)
diff --git a/parquet/src/arrow/array_reader/null_array.rs 
b/parquet/src/arrow/array_reader/null_array.rs
index 53ac0852f..4b592025d 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -86,11 +86,11 @@ where
         let array = 
arrow::array::NullArray::new(self.record_reader.num_values());
 
         // save definition and repetition buffers
-        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
-        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+        self.def_levels_buffer = self.record_reader.consume_def_levels();
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
 
         // Must consume bitmap buffer
-        self.record_reader.consume_bitmap_buffer()?;
+        self.record_reader.consume_bitmap_buffer();
 
         self.record_reader.reset();
         Ok(Arc::new(array))
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index 222b595c2..c25df89a6 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -148,7 +148,7 @@ where
         // Convert to arrays by using the Parquet physical type.
         // The physical types are then cast to Arrow types if necessary
 
-        let mut record_data = self.record_reader.consume_record_data()?;
+        let mut record_data = self.record_reader.consume_record_data();
 
         if T::get_physical_type() == PhysicalType::BOOLEAN {
             let mut boolean_buffer = 
BooleanBufferBuilder::new(record_data.len());
@@ -162,7 +162,7 @@ where
         let array_data = ArrayDataBuilder::new(arrow_data_type)
             .len(self.record_reader.num_values())
             .add_buffer(record_data)
-            .null_bit_buffer(self.record_reader.consume_bitmap_buffer()?);
+            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
 
         let array_data = unsafe { array_data.build_unchecked() };
         let array = match T::get_physical_type() {
@@ -227,8 +227,8 @@ where
         };
 
         // save definition and repetition buffers
-        self.def_levels_buffer = self.record_reader.consume_def_levels()?;
-        self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+        self.def_levels_buffer = self.record_reader.consume_def_levels();
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
         self.record_reader.reset();
         Ok(array)
     }
diff --git a/parquet/src/arrow/arrow_reader.rs 
b/parquet/src/arrow/arrow_reader.rs
index 89406cd61..c5d1f66e5 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -86,7 +86,6 @@ impl ArrowReaderOptions {
     ///
     /// For 
example:[ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
     ///
-
     /// Set `skip_arrow_metadata` to true, to skip decoding this
     pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
         Self {
diff --git a/parquet/src/arrow/record_reader/mod.rs 
b/parquet/src/arrow/record_reader/mod.rs
index 023a538a2..af75dbb49 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -218,32 +218,32 @@ where
     /// The implementation has side effects. It will create a new buffer to 
hold those
     /// definition level values that have already been read into memory but 
not counted
     /// as record values, e.g. those from `self.num_values` to 
`self.values_written`.
-    pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
-        Ok(match self.def_levels.as_mut() {
+    pub fn consume_def_levels(&mut self) -> Option<Buffer> {
+        match self.def_levels.as_mut() {
             Some(x) => x.split_levels(self.num_values),
             None => None,
-        })
+        }
     }
 
     /// Return repetition level data.
     /// The side effect is similar to `consume_def_levels`.
-    pub fn consume_rep_levels(&mut self) -> Result<Option<Buffer>> {
-        Ok(match self.rep_levels.as_mut() {
+    pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
+        match self.rep_levels.as_mut() {
             Some(x) => Some(x.split_off(self.num_values)),
             None => None,
-        })
+        }
     }
 
     /// Returns currently stored buffer data.
     /// The side effect is similar to `consume_def_levels`.
-    pub fn consume_record_data(&mut self) -> Result<V::Output> {
-        Ok(self.records.split_off(self.num_values))
+    pub fn consume_record_data(&mut self) -> V::Output {
+        self.records.split_off(self.num_values)
     }
 
     /// Returns currently stored null bitmap data.
     /// The side effect is similar to `consume_def_levels`.
-    pub fn consume_bitmap_buffer(&mut self) -> Result<Option<Buffer>> {
-        Ok(self.consume_bitmap()?.map(|b| b.into_buffer()))
+    pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
+        self.consume_bitmap().map(|b| b.into_buffer())
     }
 
     /// Reset state of record reader.
@@ -256,11 +256,10 @@ where
     }
 
     /// Returns bitmap data.
-    pub fn consume_bitmap(&mut self) -> Result<Option<Bitmap>> {
-        Ok(self
-            .def_levels
+    pub fn consume_bitmap(&mut self) -> Option<Bitmap> {
+        self.def_levels
             .as_mut()
-            .map(|levels| levels.split_bitmask(self.num_values)))
+            .map(|levels| levels.split_bitmask(self.num_values))
     }
 
     /// Try to read one batch of data.
@@ -296,7 +295,7 @@ where
         }
 
         let values_read = max(levels_read, values_read);
-        self.set_values_written(self.values_written + values_read)?;
+        self.set_values_written(self.values_written + values_read);
         Ok(values_read)
     }
 
@@ -339,8 +338,7 @@ where
         }
     }
 
-    #[allow(clippy::unnecessary_wraps)]
-    fn set_values_written(&mut self, new_values_written: usize) -> Result<()> {
+    fn set_values_written(&mut self, new_values_written: usize) {
         self.values_written = new_values_written;
         self.records.set_len(self.values_written);
 
@@ -351,8 +349,6 @@ where
         if let Some(ref mut buf) = self.def_levels {
             buf.set_len(self.values_written)
         };
-
-        Ok(())
     }
 }
 
@@ -365,42 +361,15 @@ mod tests {
     use arrow::buffer::Buffer;
 
     use crate::basic::Encoding;
-    use crate::column::page::Page;
-    use crate::column::page::PageReader;
     use crate::data_type::Int32Type;
-    use crate::errors::Result;
     use crate::schema::parser::parse_message_type;
     use crate::schema::types::SchemaDescriptor;
-    use crate::util::test_common::page_util::{DataPageBuilder, 
DataPageBuilderImpl};
+    use crate::util::test_common::page_util::{
+        DataPageBuilder, DataPageBuilderImpl, InMemoryPageReader,
+    };
 
     use super::RecordReader;
 
-    struct TestPageReader {
-        pages: Box<dyn Iterator<Item = Page> + Send>,
-    }
-
-    impl TestPageReader {
-        pub fn new(pages: Vec<Page>) -> Self {
-            Self {
-                pages: Box::new(pages.into_iter()),
-            }
-        }
-    }
-
-    impl PageReader for TestPageReader {
-        fn get_next_page(&mut self) -> Result<Option<Page>> {
-            Ok(self.pages.next())
-        }
-    }
-
-    impl Iterator for TestPageReader {
-        type Item = Result<Page>;
-
-        fn next(&mut self) -> Option<Self::Item> {
-            self.get_next_page().transpose()
-        }
-    }
-
     #[test]
     fn test_read_required_records() {
         // Construct column schema
@@ -436,7 +405,7 @@ mod tests {
             pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
             let page = pb.consume();
 
-            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
             record_reader.set_page_reader(page_reader).unwrap();
             assert_eq!(2, record_reader.read_records(2).unwrap());
             assert_eq!(2, record_reader.num_records());
@@ -459,7 +428,7 @@ mod tests {
             pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
             let page = pb.consume();
 
-            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
             record_reader.set_page_reader(page_reader).unwrap();
             assert_eq!(2, record_reader.read_records(10).unwrap());
             assert_eq!(7, record_reader.num_records());
@@ -469,12 +438,9 @@ mod tests {
         let mut bb = Int32BufferBuilder::new(7);
         bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]);
         let expected_buffer = bb.finish();
-        assert_eq!(
-            expected_buffer,
-            record_reader.consume_record_data().unwrap()
-        );
-        assert_eq!(None, record_reader.consume_def_levels().unwrap());
-        assert_eq!(None, record_reader.consume_bitmap().unwrap());
+        assert_eq!(expected_buffer, record_reader.consume_record_data());
+        assert_eq!(None, record_reader.consume_def_levels());
+        assert_eq!(None, record_reader.consume_bitmap());
     }
 
     #[test]
@@ -520,7 +486,7 @@ mod tests {
             pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
             let page = pb.consume();
 
-            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
             record_reader.set_page_reader(page_reader).unwrap();
             assert_eq!(2, record_reader.read_records(2).unwrap());
             assert_eq!(2, record_reader.num_records());
@@ -546,7 +512,7 @@ mod tests {
             pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
             let page = pb.consume();
 
-            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
             record_reader.set_page_reader(page_reader).unwrap();
             assert_eq!(2, record_reader.read_records(10).unwrap());
             assert_eq!(7, record_reader.num_records());
@@ -559,20 +525,17 @@ mod tests {
         let expected_def_levels = bb.finish();
         assert_eq!(
             Some(expected_def_levels),
-            record_reader.consume_def_levels().unwrap()
+            record_reader.consume_def_levels()
         );
 
         // Verify bitmap
         let expected_valid = &[false, true, false, true, true, false, true];
         let expected_buffer = 
Buffer::from_iter(expected_valid.iter().cloned());
         let expected_bitmap = Bitmap::from(expected_buffer);
-        assert_eq!(
-            Some(expected_bitmap),
-            record_reader.consume_bitmap().unwrap()
-        );
+        assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
 
         // Verify result record data
-        let actual = record_reader.consume_record_data().unwrap();
+        let actual = record_reader.consume_record_data();
         let actual_values = actual.typed_data::<i32>();
 
         let expected = &[0, 7, 0, 6, 3, 0, 8];
@@ -631,7 +594,7 @@ mod tests {
             pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
             let page = pb.consume();
 
-            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
             record_reader.set_page_reader(page_reader).unwrap();
 
             assert_eq!(1, record_reader.read_records(1).unwrap());
@@ -659,7 +622,7 @@ mod tests {
             pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
             let page = pb.consume();
 
-            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
             record_reader.set_page_reader(page_reader).unwrap();
 
             assert_eq!(1, record_reader.read_records(10).unwrap());
@@ -673,20 +636,17 @@ mod tests {
         let expected_def_levels = bb.finish();
         assert_eq!(
             Some(expected_def_levels),
-            record_reader.consume_def_levels().unwrap()
+            record_reader.consume_def_levels()
         );
 
         // Verify bitmap
         let expected_valid = &[true, false, false, true, true, true, true, 
true, true];
         let expected_buffer = 
Buffer::from_iter(expected_valid.iter().cloned());
         let expected_bitmap = Bitmap::from(expected_buffer);
-        assert_eq!(
-            Some(expected_bitmap),
-            record_reader.consume_bitmap().unwrap()
-        );
+        assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
 
         // Verify result record data
-        let actual = record_reader.consume_record_data().unwrap();
+        let actual = record_reader.consume_record_data();
         let actual_values = actual.typed_data::<i32>();
         let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9];
         assert_eq!(actual_values.len(), expected.len());
@@ -731,7 +691,7 @@ mod tests {
             pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
             let page = pb.consume();
 
-            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
             record_reader.set_page_reader(page_reader).unwrap();
 
             assert_eq!(1000, record_reader.read_records(1000).unwrap());
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index b5a52f6e2..a97787ccf 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -17,7 +17,7 @@
 
 //! Contains column reader API.
 
-use std::cmp::{max, min};
+use std::cmp::min;
 
 use super::page::{Page, PageReader};
 use crate::basic::*;
@@ -163,26 +163,18 @@ where
         }
     }
 
-    /// Reads a batch of values of at most `batch_size`.
+    /// Reads a batch of values of at most `batch_size`, returning a tuple 
containing the
+    /// actual number of non-null values read, followed by the corresponding 
number of levels,
+    /// i.e, the total number of values including nulls, empty lists, etc...
     ///
-    /// This will try to read from the row group, and fills up at most 
`batch_size` values
-    /// for `def_levels`, `rep_levels` and `values`. It will stop either when 
the row
-    /// group is depleted or `batch_size` values has been read, or there is no 
space
-    /// in the input slices (values/definition levels/repetition levels).
+    /// If the max definition level is 0, `def_levels` will be ignored, 
otherwise it will be
+    /// populated with the number of levels read, with an error returned if it 
is `None`.
     ///
-    /// Note that in case the field being read is not required, `values` could 
contain
-    /// less values than `def_levels`. Also note that this will skip reading 
def / rep
-    /// levels if the field is required / not repeated, respectively.
+    /// If the max repetition level is 0, `rep_levels` will be ignored, 
otherwise it will be
+    /// populated with the number of levels read, with an error returned if it 
is `None`.
     ///
-    /// If `def_levels` or `rep_levels` is `None`, this will also skip reading 
the
-    /// respective levels. This is useful when the caller of this function 
knows in
-    /// advance that the field is required and non-repeated, therefore can 
avoid
-    /// allocating memory for the levels data. Note that if field has 
definition
-    /// levels, but caller provides None, there might be inconsistency between
-    /// levels/values (see comments below).
-    ///
-    /// Returns a tuple where the first element is the actual number of values 
read,
-    /// and the second element is the actual number of levels read.
+    /// `values` will be contiguously populated with the non-null values. Note 
that if the column
+    /// is not required, this may be less than either `batch_size` or the 
number of levels read
     #[inline]
     pub fn read_batch(
         &mut self,
@@ -205,84 +197,65 @@ where
 
         // Read exhaustively all pages until we read all batch_size 
values/levels
         // or there are no more values/levels to read.
-        while max(values_read, levels_read) < batch_size {
+        while levels_read < batch_size {
             if !self.has_next()? {
                 break;
             }
 
             // Batch size for the current iteration
-            let iter_batch_size = {
-                // Compute approximate value based on values decoded so far
-                let mut adjusted_size = min(
-                    batch_size,
-                    (self.num_buffered_values - self.num_decoded_values) as 
usize,
-                );
-
-                // Adjust batch size by taking into account how much data there
-                // to read. As batch_size is also smaller than value and level
-                // slices (if available), this ensures that available space is 
not
-                // exceeded.
-                adjusted_size = min(adjusted_size, batch_size - values_read);
-                adjusted_size = min(adjusted_size, batch_size - levels_read);
-
-                adjusted_size
-            };
+            let iter_batch_size = (batch_size - levels_read)
+                .min((self.num_buffered_values - self.num_decoded_values) as 
usize);
 
             // If the field is required and non-repeated, there are no 
definition levels
-            let (num_def_levels, null_count) = match def_levels.as_mut() {
-                Some(levels) if self.descr.max_def_level() > 0 => {
+            let null_count = match self.descr.max_def_level() > 0 {
+                true => {
+                    let levels = def_levels
+                        .as_mut()
+                        .ok_or_else(|| general_err!("must specify definition 
levels"))?;
+
                     let num_def_levels = self
                         .def_level_decoder
                         .as_mut()
                         .expect("def_level_decoder be set")
-                        .read(*levels, levels_read..levels_read + 
iter_batch_size)?;
+                        .read(levels, levels_read..levels_read + 
iter_batch_size)?;
+
+                    if num_def_levels != iter_batch_size {
+                        return Err(general_err!("insufficient definition 
levels read from column - expected {}, got {}", iter_batch_size, 
num_def_levels));
+                    }
 
-                    let null_count = levels.count_nulls(
+                    levels.count_nulls(
                         levels_read..levels_read + num_def_levels,
                         self.descr.max_def_level(),
-                    );
-                    (num_def_levels, null_count)
+                    )
                 }
-                _ => (0, 0),
+                false => 0,
             };
 
-            let num_rep_levels = match rep_levels.as_mut() {
-                Some(levels) if self.descr.max_rep_level() > 0 => self
+            if self.descr.max_rep_level() > 0 {
+                let levels = rep_levels
+                    .as_mut()
+                    .ok_or_else(|| general_err!("must specify repetition 
levels"))?;
+
+                let rep_levels = self
                     .rep_level_decoder
                     .as_mut()
                     .expect("rep_level_decoder be set")
-                    .read(levels, levels_read..levels_read + iter_batch_size)?,
-                _ => 0,
-            };
+                    .read(levels, levels_read..levels_read + iter_batch_size)?;
 
-            // At this point we have read values, definition and repetition 
levels.
-            // If both definition and repetition levels are defined, their 
counts
-            // should be equal. Values count is always less or equal to 
definition levels.
-            if num_def_levels != 0
-                && num_rep_levels != 0
-                && num_rep_levels != num_def_levels
-            {
-                return Err(general_err!(
-                    "inconsistent number of levels read - def: {}, rep: {}",
-                    num_def_levels,
-                    num_rep_levels
-                ));
+                if rep_levels != iter_batch_size {
+                    return Err(general_err!("insufficient repetition levels 
read from column - expected {}, got {}", iter_batch_size, rep_levels));
+                }
             }
 
-            // Note that if field is not required, but no definition levels 
are provided,
-            // we would read values of batch size and (if provided, of course) 
repetition
-            // levels of batch size - [!] they will not be synced, because 
only definition
-            // levels enforce number of non-null values to read.
-
             let values_to_read = iter_batch_size - null_count;
             let curr_values_read = self
                 .values_decoder
                 .read(values, values_read..values_read + values_to_read)?;
 
-            if num_def_levels != 0 && curr_values_read != num_def_levels - 
null_count {
+            if curr_values_read != values_to_read {
                 return Err(general_err!(
                     "insufficient values read from column - expected: {}, got: 
{}",
-                    num_def_levels - null_count,
+                    values_to_read,
                     curr_values_read
                 ));
             }
@@ -290,9 +263,8 @@ where
             // Update all "return" counters and internal state.
 
             // This is to account for when def or rep levels are not provided
-            let curr_levels_read = max(num_def_levels, num_rep_levels);
-            self.num_decoded_values += max(curr_levels_read, curr_values_read) 
as u32;
-            levels_read += curr_levels_read;
+            self.num_decoded_values += iter_batch_size as u32;
+            levels_read += iter_batch_size;
             values_read += curr_values_read;
         }
 
@@ -302,8 +274,7 @@ where
     /// Reads a new page and set up the decoders for levels, values or 
dictionary.
     /// Returns false if there's no page left.
     fn read_new_page(&mut self) -> Result<bool> {
-        #[allow(while_true)]
-        while true {
+        loop {
             match self.page_reader.get_next_page()? {
                 // No more page to read
                 None => return Ok(false),
@@ -433,8 +404,6 @@ where
                 }
             }
         }
-
-        Ok(true)
     }
 
     #[inline]
@@ -484,12 +453,12 @@ mod tests {
     use super::*;
 
     use rand::distributions::uniform::SampleUniform;
-    use std::{collections::VecDeque, sync::Arc, vec::IntoIter};
+    use std::{collections::VecDeque, sync::Arc};
 
     use crate::basic::Type as PhysicalType;
-    use crate::column::page::Page;
     use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as 
SchemaType};
     use crate::util::test_common::make_pages;
+    use crate::util::test_common::page_util::InMemoryPageReader;
 
     const NUM_LEVELS: usize = 128;
     const NUM_PAGES: usize = 2;
@@ -1036,7 +1005,7 @@ mod tests {
         } else {
             0
         };
-        let max_rep_level = if def_levels.is_some() {
+        let max_rep_level = if rep_levels.is_some() {
             MAX_REP_LEVEL
         } else {
             0
@@ -1055,8 +1024,8 @@ mod tests {
             NUM_PAGES,
             NUM_LEVELS,
             batch_size,
-            std::i32::MIN,
-            std::i32::MAX,
+            i32::MIN,
+            i32::MAX,
             values,
             def_levels,
             rep_levels,
@@ -1235,7 +1204,8 @@ mod tests {
                 use_v2,
             );
             let max_def_level = desc.max_def_level();
-            let page_reader = TestPageReader::new(Vec::from(pages));
+            let max_rep_level = desc.max_rep_level();
+            let page_reader = InMemoryPageReader::new(pages);
             let column_reader: ColumnReader =
                 get_column_reader(desc, Box::new(page_reader));
             let mut typed_column_reader = 
get_typed_column_reader::<T>(column_reader);
@@ -1276,7 +1246,8 @@ mod tests {
                 "values content doesn't match"
             );
 
-            if let Some(ref levels) = def_levels {
+            if max_def_level > 0 {
+                let levels = def_levels.as_ref().unwrap();
                 assert!(
                     levels.len() >= curr_levels_read,
                     "def_levels.len() >= levels_read"
@@ -1288,7 +1259,8 @@ mod tests {
                 );
             }
 
-            if let Some(ref levels) = rep_levels {
+            if max_rep_level > 0 {
+                let levels = rep_levels.as_ref().unwrap();
                 assert!(
                     levels.len() >= curr_levels_read,
                     "rep_levels.len() >= levels_read"
@@ -1300,44 +1272,10 @@ mod tests {
                 );
             }
 
-            if def_levels.is_none() && rep_levels.is_none() {
-                assert!(
-                    curr_levels_read == 0,
-                    "expected to read 0 levels, found {}",
-                    curr_levels_read
-                );
-            } else if def_levels.is_some() && max_def_level > 0 {
-                assert!(
-                    curr_levels_read >= curr_values_read,
-                    "expected levels read to be greater than values read"
-                );
-            }
-        }
-    }
-
-    struct TestPageReader {
-        pages: IntoIter<Page>,
-    }
-
-    impl TestPageReader {
-        pub fn new(pages: Vec<Page>) -> Self {
-            Self {
-                pages: pages.into_iter(),
-            }
-        }
-    }
-
-    impl PageReader for TestPageReader {
-        fn get_next_page(&mut self) -> Result<Option<Page>> {
-            Ok(self.pages.next())
-        }
-    }
-
-    impl Iterator for TestPageReader {
-        type Item = Result<Page>;
-
-        fn next(&mut self) -> Option<Self::Item> {
-            self.get_next_page().transpose()
+            assert!(
+                curr_levels_read >= curr_values_read,
+                "expected levels read to be greater than values read"
+            );
         }
     }
 }
diff --git a/parquet/src/util/test_common/page_util.rs 
b/parquet/src/util/test_common/page_util.rs
index ffa559f3f..3719d280a 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -100,11 +100,7 @@ impl DataPageBuilder for DataPageBuilderImpl {
     }
 
     fn add_def_levels(&mut self, max_levels: i16, def_levels: &[i16]) {
-        assert!(
-            self.num_values == def_levels.len() as u32,
-            "Must call `add_rep_levels() first!`"
-        );
-
+        self.num_values = def_levels.len() as u32;
         self.def_levels_byte_len = self.add_levels(max_levels, def_levels);
     }
 

Reply via email to