This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new c757829b7 Simplify ColumnReader::read_batch (#1995)
c757829b7 is described below
commit c757829b76ab95af18850fa173434d9747843849
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Jul 5 13:39:25 2022 +0100
Simplify ColumnReader::read_batch (#1995)
Miscellaneous parquet cleanups
---
parquet/src/arrow/array_reader/byte_array.rs | 8 +-
.../arrow/array_reader/byte_array_dictionary.rs | 8 +-
parquet/src/arrow/array_reader/null_array.rs | 6 +-
parquet/src/arrow/array_reader/primitive_array.rs | 8 +-
parquet/src/arrow/arrow_reader.rs | 1 -
parquet/src/arrow/record_reader/mod.rs | 108 ++++---------
parquet/src/column/reader.rs | 176 +++++++--------------
parquet/src/util/test_common/page_util.rs | 6 +-
8 files changed, 107 insertions(+), 214 deletions(-)
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index 9e0f83fa9..95620d940 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -113,10 +113,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for
ByteArrayReader<I> {
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
read_records(&mut self.record_reader, self.pages.as_mut(),
batch_size)?;
- let buffer = self.record_reader.consume_record_data()?;
- let null_buffer = self.record_reader.consume_bitmap_buffer()?;
- self.def_levels_buffer = self.record_reader.consume_def_levels()?;
- self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+ let buffer = self.record_reader.consume_record_data();
+ let null_buffer = self.record_reader.consume_bitmap_buffer();
+ self.def_levels_buffer = self.record_reader.consume_def_levels();
+ self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();
Ok(buffer.into_array(null_buffer, self.data_type.clone()))
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 0cd67206f..77f7916ed 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -173,12 +173,12 @@ where
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
read_records(&mut self.record_reader, self.pages.as_mut(),
batch_size)?;
- let buffer = self.record_reader.consume_record_data()?;
- let null_buffer = self.record_reader.consume_bitmap_buffer()?;
+ let buffer = self.record_reader.consume_record_data();
+ let null_buffer = self.record_reader.consume_bitmap_buffer();
let array = buffer.into_array(null_buffer, &self.data_type)?;
- self.def_levels_buffer = self.record_reader.consume_def_levels()?;
- self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+ self.def_levels_buffer = self.record_reader.consume_def_levels();
+ self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();
Ok(array)
diff --git a/parquet/src/arrow/array_reader/null_array.rs
b/parquet/src/arrow/array_reader/null_array.rs
index 53ac0852f..4b592025d 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -86,11 +86,11 @@ where
let array =
arrow::array::NullArray::new(self.record_reader.num_values());
// save definition and repetition buffers
- self.def_levels_buffer = self.record_reader.consume_def_levels()?;
- self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+ self.def_levels_buffer = self.record_reader.consume_def_levels();
+ self.rep_levels_buffer = self.record_reader.consume_rep_levels();
// Must consume bitmap buffer
- self.record_reader.consume_bitmap_buffer()?;
+ self.record_reader.consume_bitmap_buffer();
self.record_reader.reset();
Ok(Arc::new(array))
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs
b/parquet/src/arrow/array_reader/primitive_array.rs
index 222b595c2..c25df89a6 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -148,7 +148,7 @@ where
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary
- let mut record_data = self.record_reader.consume_record_data()?;
+ let mut record_data = self.record_reader.consume_record_data();
if T::get_physical_type() == PhysicalType::BOOLEAN {
let mut boolean_buffer =
BooleanBufferBuilder::new(record_data.len());
@@ -162,7 +162,7 @@ where
let array_data = ArrayDataBuilder::new(arrow_data_type)
.len(self.record_reader.num_values())
.add_buffer(record_data)
- .null_bit_buffer(self.record_reader.consume_bitmap_buffer()?);
+ .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
let array_data = unsafe { array_data.build_unchecked() };
let array = match T::get_physical_type() {
@@ -227,8 +227,8 @@ where
};
// save definition and repetition buffers
- self.def_levels_buffer = self.record_reader.consume_def_levels()?;
- self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
+ self.def_levels_buffer = self.record_reader.consume_def_levels();
+ self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();
Ok(array)
}
diff --git a/parquet/src/arrow/arrow_reader.rs
b/parquet/src/arrow/arrow_reader.rs
index 89406cd61..c5d1f66e5 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -86,7 +86,6 @@ impl ArrowReaderOptions {
///
/// For
example:[ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
///
-
/// Set `skip_arrow_metadata` to true, to skip decoding this
pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
Self {
diff --git a/parquet/src/arrow/record_reader/mod.rs
b/parquet/src/arrow/record_reader/mod.rs
index 023a538a2..af75dbb49 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -218,32 +218,32 @@ where
/// The implementation has side effects. It will create a new buffer to
hold those
/// definition level values that have already been read into memory but
not counted
/// as record values, e.g. those from `self.num_values` to
`self.values_written`.
- pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
- Ok(match self.def_levels.as_mut() {
+ pub fn consume_def_levels(&mut self) -> Option<Buffer> {
+ match self.def_levels.as_mut() {
Some(x) => x.split_levels(self.num_values),
None => None,
- })
+ }
}
/// Return repetition level data.
/// The side effect is similar to `consume_def_levels`.
- pub fn consume_rep_levels(&mut self) -> Result<Option<Buffer>> {
- Ok(match self.rep_levels.as_mut() {
+ pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
+ match self.rep_levels.as_mut() {
Some(x) => Some(x.split_off(self.num_values)),
None => None,
- })
+ }
}
/// Returns currently stored buffer data.
/// The side effect is similar to `consume_def_levels`.
- pub fn consume_record_data(&mut self) -> Result<V::Output> {
- Ok(self.records.split_off(self.num_values))
+ pub fn consume_record_data(&mut self) -> V::Output {
+ self.records.split_off(self.num_values)
}
/// Returns currently stored null bitmap data.
/// The side effect is similar to `consume_def_levels`.
- pub fn consume_bitmap_buffer(&mut self) -> Result<Option<Buffer>> {
- Ok(self.consume_bitmap()?.map(|b| b.into_buffer()))
+ pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
+ self.consume_bitmap().map(|b| b.into_buffer())
}
/// Reset state of record reader.
@@ -256,11 +256,10 @@ where
}
/// Returns bitmap data.
- pub fn consume_bitmap(&mut self) -> Result<Option<Bitmap>> {
- Ok(self
- .def_levels
+ pub fn consume_bitmap(&mut self) -> Option<Bitmap> {
+ self.def_levels
.as_mut()
- .map(|levels| levels.split_bitmask(self.num_values)))
+ .map(|levels| levels.split_bitmask(self.num_values))
}
/// Try to read one batch of data.
@@ -296,7 +295,7 @@ where
}
let values_read = max(levels_read, values_read);
- self.set_values_written(self.values_written + values_read)?;
+ self.set_values_written(self.values_written + values_read);
Ok(values_read)
}
@@ -339,8 +338,7 @@ where
}
}
- #[allow(clippy::unnecessary_wraps)]
- fn set_values_written(&mut self, new_values_written: usize) -> Result<()> {
+ fn set_values_written(&mut self, new_values_written: usize) {
self.values_written = new_values_written;
self.records.set_len(self.values_written);
@@ -351,8 +349,6 @@ where
if let Some(ref mut buf) = self.def_levels {
buf.set_len(self.values_written)
};
-
- Ok(())
}
}
@@ -365,42 +361,15 @@ mod tests {
use arrow::buffer::Buffer;
use crate::basic::Encoding;
- use crate::column::page::Page;
- use crate::column::page::PageReader;
use crate::data_type::Int32Type;
- use crate::errors::Result;
use crate::schema::parser::parse_message_type;
use crate::schema::types::SchemaDescriptor;
- use crate::util::test_common::page_util::{DataPageBuilder,
DataPageBuilderImpl};
+ use crate::util::test_common::page_util::{
+ DataPageBuilder, DataPageBuilderImpl, InMemoryPageReader,
+ };
use super::RecordReader;
- struct TestPageReader {
- pages: Box<dyn Iterator<Item = Page> + Send>,
- }
-
- impl TestPageReader {
- pub fn new(pages: Vec<Page>) -> Self {
- Self {
- pages: Box::new(pages.into_iter()),
- }
- }
- }
-
- impl PageReader for TestPageReader {
- fn get_next_page(&mut self) -> Result<Option<Page>> {
- Ok(self.pages.next())
- }
- }
-
- impl Iterator for TestPageReader {
- type Item = Result<Page>;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.get_next_page().transpose()
- }
- }
-
#[test]
fn test_read_required_records() {
// Construct column schema
@@ -436,7 +405,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
- let page_reader = Box::new(TestPageReader::new(vec![page]));
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(2).unwrap());
assert_eq!(2, record_reader.num_records());
@@ -459,7 +428,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
- let page_reader = Box::new(TestPageReader::new(vec![page]));
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(10).unwrap());
assert_eq!(7, record_reader.num_records());
@@ -469,12 +438,9 @@ mod tests {
let mut bb = Int32BufferBuilder::new(7);
bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]);
let expected_buffer = bb.finish();
- assert_eq!(
- expected_buffer,
- record_reader.consume_record_data().unwrap()
- );
- assert_eq!(None, record_reader.consume_def_levels().unwrap());
- assert_eq!(None, record_reader.consume_bitmap().unwrap());
+ assert_eq!(expected_buffer, record_reader.consume_record_data());
+ assert_eq!(None, record_reader.consume_def_levels());
+ assert_eq!(None, record_reader.consume_bitmap());
}
#[test]
@@ -520,7 +486,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
- let page_reader = Box::new(TestPageReader::new(vec![page]));
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(2).unwrap());
assert_eq!(2, record_reader.num_records());
@@ -546,7 +512,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
- let page_reader = Box::new(TestPageReader::new(vec![page]));
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(10).unwrap());
assert_eq!(7, record_reader.num_records());
@@ -559,20 +525,17 @@ mod tests {
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
- record_reader.consume_def_levels().unwrap()
+ record_reader.consume_def_levels()
);
// Verify bitmap
let expected_valid = &[false, true, false, true, true, false, true];
let expected_buffer =
Buffer::from_iter(expected_valid.iter().cloned());
let expected_bitmap = Bitmap::from(expected_buffer);
- assert_eq!(
- Some(expected_bitmap),
- record_reader.consume_bitmap().unwrap()
- );
+ assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
// Verify result record data
- let actual = record_reader.consume_record_data().unwrap();
+ let actual = record_reader.consume_record_data();
let actual_values = actual.typed_data::<i32>();
let expected = &[0, 7, 0, 6, 3, 0, 8];
@@ -631,7 +594,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
- let page_reader = Box::new(TestPageReader::new(vec![page]));
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(1, record_reader.read_records(1).unwrap());
@@ -659,7 +622,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
- let page_reader = Box::new(TestPageReader::new(vec![page]));
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(1, record_reader.read_records(10).unwrap());
@@ -673,20 +636,17 @@ mod tests {
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
- record_reader.consume_def_levels().unwrap()
+ record_reader.consume_def_levels()
);
// Verify bitmap
let expected_valid = &[true, false, false, true, true, true, true,
true, true];
let expected_buffer =
Buffer::from_iter(expected_valid.iter().cloned());
let expected_bitmap = Bitmap::from(expected_buffer);
- assert_eq!(
- Some(expected_bitmap),
- record_reader.consume_bitmap().unwrap()
- );
+ assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
// Verify result record data
- let actual = record_reader.consume_record_data().unwrap();
+ let actual = record_reader.consume_record_data();
let actual_values = actual.typed_data::<i32>();
let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9];
assert_eq!(actual_values.len(), expected.len());
@@ -731,7 +691,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
- let page_reader = Box::new(TestPageReader::new(vec![page]));
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(1000, record_reader.read_records(1000).unwrap());
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index b5a52f6e2..a97787ccf 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -17,7 +17,7 @@
//! Contains column reader API.
-use std::cmp::{max, min};
+use std::cmp::min;
use super::page::{Page, PageReader};
use crate::basic::*;
@@ -163,26 +163,18 @@ where
}
}
- /// Reads a batch of values of at most `batch_size`.
+ /// Reads a batch of values of at most `batch_size`, returning a tuple
containing the
+ /// actual number of non-null values read, followed by the corresponding
number of levels,
+ /// i.e, the total number of values including nulls, empty lists, etc...
///
- /// This will try to read from the row group, and fills up at most
`batch_size` values
- /// for `def_levels`, `rep_levels` and `values`. It will stop either when
the row
- /// group is depleted or `batch_size` values has been read, or there is no
space
- /// in the input slices (values/definition levels/repetition levels).
+ /// If the max definition level is 0, `def_levels` will be ignored,
otherwise it will be
+ /// populated with the number of levels read, with an error returned if it
is `None`.
///
- /// Note that in case the field being read is not required, `values` could
contain
- /// less values than `def_levels`. Also note that this will skip reading
def / rep
- /// levels if the field is required / not repeated, respectively.
+ /// If the max repetition level is 0, `rep_levels` will be ignored,
otherwise it will be
+ /// populated with the number of levels read, with an error returned if it
is `None`.
///
- /// If `def_levels` or `rep_levels` is `None`, this will also skip reading
the
- /// respective levels. This is useful when the caller of this function
knows in
- /// advance that the field is required and non-repeated, therefore can
avoid
- /// allocating memory for the levels data. Note that if field has
definition
- /// levels, but caller provides None, there might be inconsistency between
- /// levels/values (see comments below).
- ///
- /// Returns a tuple where the first element is the actual number of values
read,
- /// and the second element is the actual number of levels read.
+ /// `values` will be contiguously populated with the non-null values. Note
that if the column
+ /// is not required, this may be less than either `batch_size` or the
number of levels read
#[inline]
pub fn read_batch(
&mut self,
@@ -205,84 +197,65 @@ where
// Read exhaustively all pages until we read all batch_size
values/levels
// or there are no more values/levels to read.
- while max(values_read, levels_read) < batch_size {
+ while levels_read < batch_size {
if !self.has_next()? {
break;
}
// Batch size for the current iteration
- let iter_batch_size = {
- // Compute approximate value based on values decoded so far
- let mut adjusted_size = min(
- batch_size,
- (self.num_buffered_values - self.num_decoded_values) as
usize,
- );
-
- // Adjust batch size by taking into account how much data there
- // to read. As batch_size is also smaller than value and level
- // slices (if available), this ensures that available space is
not
- // exceeded.
- adjusted_size = min(adjusted_size, batch_size - values_read);
- adjusted_size = min(adjusted_size, batch_size - levels_read);
-
- adjusted_size
- };
+ let iter_batch_size = (batch_size - levels_read)
+ .min((self.num_buffered_values - self.num_decoded_values) as
usize);
// If the field is required and non-repeated, there are no
definition levels
- let (num_def_levels, null_count) = match def_levels.as_mut() {
- Some(levels) if self.descr.max_def_level() > 0 => {
+ let null_count = match self.descr.max_def_level() > 0 {
+ true => {
+ let levels = def_levels
+ .as_mut()
+ .ok_or_else(|| general_err!("must specify definition
levels"))?;
+
let num_def_levels = self
.def_level_decoder
.as_mut()
.expect("def_level_decoder be set")
- .read(*levels, levels_read..levels_read +
iter_batch_size)?;
+ .read(levels, levels_read..levels_read +
iter_batch_size)?;
+
+ if num_def_levels != iter_batch_size {
+ return Err(general_err!("insufficient definition
levels read from column - expected {}, got {}", iter_batch_size,
num_def_levels));
+ }
- let null_count = levels.count_nulls(
+ levels.count_nulls(
levels_read..levels_read + num_def_levels,
self.descr.max_def_level(),
- );
- (num_def_levels, null_count)
+ )
}
- _ => (0, 0),
+ false => 0,
};
- let num_rep_levels = match rep_levels.as_mut() {
- Some(levels) if self.descr.max_rep_level() > 0 => self
+ if self.descr.max_rep_level() > 0 {
+ let levels = rep_levels
+ .as_mut()
+ .ok_or_else(|| general_err!("must specify repetition
levels"))?;
+
+ let rep_levels = self
.rep_level_decoder
.as_mut()
.expect("rep_level_decoder be set")
- .read(levels, levels_read..levels_read + iter_batch_size)?,
- _ => 0,
- };
+ .read(levels, levels_read..levels_read + iter_batch_size)?;
- // At this point we have read values, definition and repetition
levels.
- // If both definition and repetition levels are defined, their
counts
- // should be equal. Values count is always less or equal to
definition levels.
- if num_def_levels != 0
- && num_rep_levels != 0
- && num_rep_levels != num_def_levels
- {
- return Err(general_err!(
- "inconsistent number of levels read - def: {}, rep: {}",
- num_def_levels,
- num_rep_levels
- ));
+ if rep_levels != iter_batch_size {
+ return Err(general_err!("insufficient repetition levels
read from column - expected {}, got {}", iter_batch_size, rep_levels));
+ }
}
- // Note that if field is not required, but no definition levels
are provided,
- // we would read values of batch size and (if provided, of course)
repetition
- // levels of batch size - [!] they will not be synced, because
only definition
- // levels enforce number of non-null values to read.
-
let values_to_read = iter_batch_size - null_count;
let curr_values_read = self
.values_decoder
.read(values, values_read..values_read + values_to_read)?;
- if num_def_levels != 0 && curr_values_read != num_def_levels -
null_count {
+ if curr_values_read != values_to_read {
return Err(general_err!(
"insufficient values read from column - expected: {}, got:
{}",
- num_def_levels - null_count,
+ values_to_read,
curr_values_read
));
}
@@ -290,9 +263,8 @@ where
// Update all "return" counters and internal state.
// This is to account for when def or rep levels are not provided
- let curr_levels_read = max(num_def_levels, num_rep_levels);
- self.num_decoded_values += max(curr_levels_read, curr_values_read)
as u32;
- levels_read += curr_levels_read;
+ self.num_decoded_values += iter_batch_size as u32;
+ levels_read += iter_batch_size;
values_read += curr_values_read;
}
@@ -302,8 +274,7 @@ where
/// Reads a new page and set up the decoders for levels, values or
dictionary.
/// Returns false if there's no page left.
fn read_new_page(&mut self) -> Result<bool> {
- #[allow(while_true)]
- while true {
+ loop {
match self.page_reader.get_next_page()? {
// No more page to read
None => return Ok(false),
@@ -433,8 +404,6 @@ where
}
}
}
-
- Ok(true)
}
#[inline]
@@ -484,12 +453,12 @@ mod tests {
use super::*;
use rand::distributions::uniform::SampleUniform;
- use std::{collections::VecDeque, sync::Arc, vec::IntoIter};
+ use std::{collections::VecDeque, sync::Arc};
use crate::basic::Type as PhysicalType;
- use crate::column::page::Page;
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as
SchemaType};
use crate::util::test_common::make_pages;
+ use crate::util::test_common::page_util::InMemoryPageReader;
const NUM_LEVELS: usize = 128;
const NUM_PAGES: usize = 2;
@@ -1036,7 +1005,7 @@ mod tests {
} else {
0
};
- let max_rep_level = if def_levels.is_some() {
+ let max_rep_level = if rep_levels.is_some() {
MAX_REP_LEVEL
} else {
0
@@ -1055,8 +1024,8 @@ mod tests {
NUM_PAGES,
NUM_LEVELS,
batch_size,
- std::i32::MIN,
- std::i32::MAX,
+ i32::MIN,
+ i32::MAX,
values,
def_levels,
rep_levels,
@@ -1235,7 +1204,8 @@ mod tests {
use_v2,
);
let max_def_level = desc.max_def_level();
- let page_reader = TestPageReader::new(Vec::from(pages));
+ let max_rep_level = desc.max_rep_level();
+ let page_reader = InMemoryPageReader::new(pages);
let column_reader: ColumnReader =
get_column_reader(desc, Box::new(page_reader));
let mut typed_column_reader =
get_typed_column_reader::<T>(column_reader);
@@ -1276,7 +1246,8 @@ mod tests {
"values content doesn't match"
);
- if let Some(ref levels) = def_levels {
+ if max_def_level > 0 {
+ let levels = def_levels.as_ref().unwrap();
assert!(
levels.len() >= curr_levels_read,
"def_levels.len() >= levels_read"
@@ -1288,7 +1259,8 @@ mod tests {
);
}
- if let Some(ref levels) = rep_levels {
+ if max_rep_level > 0 {
+ let levels = rep_levels.as_ref().unwrap();
assert!(
levels.len() >= curr_levels_read,
"rep_levels.len() >= levels_read"
@@ -1300,44 +1272,10 @@ mod tests {
);
}
- if def_levels.is_none() && rep_levels.is_none() {
- assert!(
- curr_levels_read == 0,
- "expected to read 0 levels, found {}",
- curr_levels_read
- );
- } else if def_levels.is_some() && max_def_level > 0 {
- assert!(
- curr_levels_read >= curr_values_read,
- "expected levels read to be greater than values read"
- );
- }
- }
- }
-
- struct TestPageReader {
- pages: IntoIter<Page>,
- }
-
- impl TestPageReader {
- pub fn new(pages: Vec<Page>) -> Self {
- Self {
- pages: pages.into_iter(),
- }
- }
- }
-
- impl PageReader for TestPageReader {
- fn get_next_page(&mut self) -> Result<Option<Page>> {
- Ok(self.pages.next())
- }
- }
-
- impl Iterator for TestPageReader {
- type Item = Result<Page>;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.get_next_page().transpose()
+ assert!(
+ curr_levels_read >= curr_values_read,
+ "expected levels read to be greater than values read"
+ );
}
}
}
diff --git a/parquet/src/util/test_common/page_util.rs
b/parquet/src/util/test_common/page_util.rs
index ffa559f3f..3719d280a 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -100,11 +100,7 @@ impl DataPageBuilder for DataPageBuilderImpl {
}
fn add_def_levels(&mut self, max_levels: i16, def_levels: &[i16]) {
- assert!(
- self.num_values == def_levels.len() as u32,
- "Must call `add_rep_levels() first!`"
- );
-
+ self.num_values = def_levels.len() as u32;
self.def_levels_byte_len = self.add_levels(max_levels, def_levels);
}