This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d10d962ac Add integration test for scan rows with selection (#2158)
d10d962ac is described below
commit d10d962ac85a9675c511ff8f783d1b455588d220
Author: Yang Jiang <[email protected]>
AuthorDate: Wed Jul 27 17:27:03 2022 +0800
Add integration test for scan rows with selection (#2158)
* add test
* fix some skip bug.
* add it.
* fix skip in head.
* refine test case
* fix fmt.
* fix clippy
* fix comment.
* fix comment
---
parquet/src/arrow/array_reader/byte_array.rs | 3 +-
.../arrow/array_reader/byte_array_dictionary.rs | 3 +-
.../src/arrow/array_reader/complex_object_array.rs | 8 +-
parquet/src/arrow/array_reader/mod.rs | 27 +++
parquet/src/arrow/array_reader/null_array.rs | 3 +-
parquet/src/arrow/array_reader/primitive_array.rs | 3 +-
parquet/src/arrow/arrow_reader.rs | 223 ++++++++++++++++++++-
parquet/src/arrow/record_reader/mod.rs | 12 +-
parquet/src/column/reader.rs | 6 +
9 files changed, 276 insertions(+), 12 deletions(-)
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index 34b38e1be..a29888f70 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
@@ -120,6 +120,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for
ByteArrayReader<I> {
}
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 486dfe211..eba9e578f 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -25,7 +25,7 @@ use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
use crate::arrow::array_reader::byte_array::{ByteArrayDecoder,
ByteArrayDecoderPlain};
-use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::buffer::{
dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
};
@@ -181,6 +181,7 @@ where
}
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}
diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs
b/parquet/src/arrow/array_reader/complex_object_array.rs
index 6e7585ff9..1390866cf 100644
--- a/parquet/src/arrow/array_reader/complex_object_array.rs
+++ b/parquet/src/arrow/array_reader/complex_object_array.rs
@@ -166,7 +166,13 @@ where
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
match self.column_reader.as_mut() {
Some(reader) => reader.skip_records(num_records),
- None => Ok(0),
+ None => {
+ if self.next_column_reader()? {
+
self.column_reader.as_mut().unwrap().skip_records(num_records)
+ }else {
+ Ok(0)
+ }
+ }
}
}
diff --git a/parquet/src/arrow/array_reader/mod.rs
b/parquet/src/arrow/array_reader/mod.rs
index e30c33bba..a9d8cc0fa 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -144,3 +144,30 @@ where
}
Ok(records_read)
}
+
+/// Uses `pages` to set up to `record_reader` 's `column_reader`
+///
+/// If we skip records before all read operation,
+/// need set `column_reader` by `set_page_reader`
+/// for constructing `def_level_decoder` and `rep_level_decoder`.
+fn set_column_reader<V, CV>(
+ record_reader: &mut GenericRecordReader<V, CV>,
+ pages: &mut dyn PageIterator,
+) -> Result<bool>
+where
+ V: ValuesBuffer + Default,
+ CV: ColumnValueDecoder<Slice = V::Slice>,
+{
+ return if record_reader.column_reader().is_none() {
+ // If we skip records before all read operation
+ // we need set `column_reader` by `set_page_reader`
+ if let Some(page_reader) = pages.next() {
+ record_reader.set_page_reader(page_reader?)?;
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ } else {
+ Ok(true)
+ };
+}
diff --git a/parquet/src/arrow/array_reader/null_array.rs
b/parquet/src/arrow/array_reader/null_array.rs
index b207d8b2c..a8c50b87f 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::RecordReader;
use crate::column::page::PageIterator;
@@ -97,6 +97,7 @@ where
}
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs
b/parquet/src/arrow/array_reader/primitive_array.rs
index 0488e254c..700b12b0a 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::array_reader::{read_records, set_column_reader, ArrayReader};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
@@ -222,6 +222,7 @@ where
}
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}
diff --git a/parquet/src/arrow/arrow_reader.rs
b/parquet/src/arrow/arrow_reader.rs
index f0129b780..8b4641d68 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -33,6 +33,7 @@ use crate::arrow::ProjectionMask;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
+use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::schema::types::SchemaDescriptor;
/// Arrow reader api.
@@ -217,7 +218,15 @@ impl ParquetFileArrowReader {
chunk_reader: R,
options: ArrowReaderOptions,
) -> Result<Self> {
- let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
+ let file_reader = if options.selection.is_some() {
+ let options = ReadOptionsBuilder::new().with_page_index().build();
+ Arc::new(SerializedFileReader::new_with_options(
+ chunk_reader,
+ options,
+ )?)
+ } else {
+ Arc::new(SerializedFileReader::new(chunk_reader)?)
+ };
Ok(Self::new_with_options(file_reader, options))
}
@@ -298,12 +307,15 @@ impl Iterator for ParquetRecordBatchReader {
continue;
}
+ // try to read record
let to_read = match
front.row_count.checked_sub(self.batch_size) {
- Some(remaining) => {
- selection.push_front(RowSelection::skip(remaining));
+ Some(remaining) if remaining != 0 => {
+ // if page row count less than batch_size we must set
batch size to page row count.
+ // add check avoid dead loop
+ selection.push_front(RowSelection::select(remaining));
self.batch_size
}
- None => front.row_count,
+ _ => front.row_count,
};
break to_read;
@@ -390,6 +402,7 @@ mod tests {
use crate::arrow::arrow_reader::{
ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
+ ParquetRecordBatchReader, RowSelection,
};
use crate::arrow::buffer::converter::{
BinaryArrayConverter, Converter, FixedSizeArrayConverter,
FromConverter,
@@ -1591,4 +1604,206 @@ mod tests {
test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1);
test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE);
}
+
+ #[test]
+ fn test_scan_row_with_selection() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
+ let test_file = File::open(&path).unwrap();
+
+ // total row count 7300
+ // 1. test selection len more than one page row count
+ let batch_size = 1000;
+ let expected_data = create_expect_batch(&test_file, batch_size);
+
+ let selections = create_test_selection(batch_size, 7300, false);
+ let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
+ let mut total_row_count = 0;
+ let mut index = 0;
+ for batch in skip_reader {
+ let batch = batch.unwrap();
+ assert_eq!(batch, expected_data.get(index).unwrap().clone());
+ index += 2;
+ let num = batch.num_rows();
+ assert!(num == batch_size || num == 300);
+ total_row_count += num;
+ }
+ assert_eq!(total_row_count, 4000);
+
+ let selections = create_test_selection(batch_size, 7300, true);
+ let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
+ let mut total_row_count = 0;
+ let mut index = 1;
+ for batch in skip_reader {
+ let batch = batch.unwrap();
+ assert_eq!(batch, expected_data.get(index).unwrap().clone());
+ index += 2;
+ let num = batch.num_rows();
+ //the lase batch will be 300
+ assert!(num == batch_size || num == 300);
+ total_row_count += num;
+ }
+ assert_eq!(total_row_count, 3300);
+
+ // 2. test selection len less than one page row count
+ let batch_size = 20;
+ let expected_data = create_expect_batch(&test_file, batch_size);
+ let selections = create_test_selection(batch_size, 7300, false);
+
+ let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
+ let mut total_row_count = 0;
+ let mut index = 0;
+ for batch in skip_reader {
+ let batch = batch.unwrap();
+ assert_eq!(batch, expected_data.get(index).unwrap().clone());
+ index += 2;
+ let num = batch.num_rows();
+ assert_eq!(num, batch_size);
+ total_row_count += num;
+ }
+ assert_eq!(total_row_count, 3660);
+
+ let selections = create_test_selection(batch_size, 7300, true);
+ let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
+ let mut total_row_count = 0;
+ let mut index = 1;
+ for batch in skip_reader {
+ let batch = batch.unwrap();
+ assert_eq!(batch, expected_data.get(index).unwrap().clone());
+ index += 2;
+ let num = batch.num_rows();
+ assert_eq!(num, batch_size);
+ total_row_count += num;
+ }
+ assert_eq!(total_row_count, 3640);
+
+ // 3. test selection_len less than batch_size
+ let batch_size = 20;
+ let selection_len = 5;
+ let expected_data_batch = create_expect_batch(&test_file, batch_size);
+ let expected_data_selection = create_expect_batch(&test_file,
selection_len);
+ let selections = create_test_selection(selection_len, 7300, false);
+ let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
+
+ let mut total_row_count = 0;
+
+ for batch in skip_reader {
+ let batch = batch.unwrap();
+ let num = batch.num_rows();
+ assert!(num == batch_size || num == selection_len);
+ if num == batch_size {
+ assert_eq!(
+ batch,
+ expected_data_batch
+ .get(total_row_count / batch_size)
+ .unwrap()
+ .clone()
+ );
+ total_row_count += batch_size;
+ } else if num == selection_len {
+ assert_eq!(
+ batch,
+ expected_data_selection
+ .get(total_row_count / selection_len)
+ .unwrap()
+ .clone()
+ );
+ total_row_count += selection_len;
+ }
+ // add skip offset
+ total_row_count += selection_len;
+ }
+
+ // 4. test selection_len more than batch_size
+ // If batch_size < selection_len will divide selection(50, read) ->
+ // selection(20, read), selection(20, read), selection(10, read)
+ let batch_size = 20;
+ let selection_len = 50;
+ let another_batch_size = 10;
+ let expected_data_batch = create_expect_batch(&test_file, batch_size);
+ let expected_data_batch2 = create_expect_batch(&test_file,
another_batch_size);
+ let selections = create_test_selection(selection_len, 7300, false);
+ let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
+
+ let mut total_row_count = 0;
+
+ for batch in skip_reader {
+ let batch = batch.unwrap();
+ let num = batch.num_rows();
+ assert!(num == batch_size || num == another_batch_size);
+ if num == batch_size {
+ assert_eq!(
+ batch,
+ expected_data_batch
+ .get(total_row_count / batch_size)
+ .unwrap()
+ .clone()
+ );
+ total_row_count += batch_size;
+ } else if num == another_batch_size {
+ assert_eq!(
+ batch,
+ expected_data_batch2
+ .get(total_row_count / another_batch_size)
+ .unwrap()
+ .clone()
+ );
+ total_row_count += 10;
+ // add skip offset
+ total_row_count += selection_len;
+ }
+ }
+
+ fn create_skip_reader(
+ test_file: &File,
+ batch_size: usize,
+ selections: Vec<RowSelection>,
+ ) -> ParquetRecordBatchReader {
+ let arrow_reader_options =
+ ArrowReaderOptions::new().with_row_selection(selections);
+
+ let mut skip_arrow_reader =
ParquetFileArrowReader::try_new_with_options(
+ test_file.try_clone().unwrap(),
+ arrow_reader_options,
+ )
+ .unwrap();
+ skip_arrow_reader.get_record_reader(batch_size).unwrap()
+ }
+
+ fn create_test_selection(
+ step_len: usize,
+ total_len: usize,
+ skip_first: bool,
+ ) -> Vec<RowSelection> {
+ let mut remaining = total_len;
+ let mut skip = skip_first;
+ let mut vec = vec![];
+ while remaining != 0 {
+ let step = if remaining > step_len {
+ step_len
+ } else {
+ remaining
+ };
+ vec.push(RowSelection {
+ row_count: step,
+ skip,
+ });
+ remaining -= step;
+ skip = !skip;
+ }
+ vec
+ }
+
+ fn create_expect_batch(test_file: &File, batch_size: usize) ->
Vec<RecordBatch> {
+ let mut serial_arrow_reader =
+
ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap();
+ let serial_reader =
+ serial_arrow_reader.get_record_reader(batch_size).unwrap();
+ let mut expected_data = vec![];
+ for batch in serial_reader {
+ expected_data.push(batch.unwrap());
+ }
+ expected_data
+ }
+ }
}
diff --git a/parquet/src/arrow/record_reader/mod.rs
b/parquet/src/arrow/record_reader/mod.rs
index 04499997e..b68f59d51 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -45,6 +45,9 @@ pub(crate) const MIN_BATCH_SIZE: usize = 1024;
pub type RecordReader<T> =
GenericRecordReader<ScalarBuffer<<T as DataType>::T>,
ColumnValueDecoderImpl<T>>;
+pub(crate) type ColumnReader<CV> =
+ GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelBufferDecoder,
CV>;
+
/// A generic stateful column reader that delimits semantic records
///
/// This type is hidden from the docs, and relies on private traits with no
@@ -56,9 +59,7 @@ pub struct GenericRecordReader<V, CV> {
records: V,
def_levels: Option<DefinitionLevelBuffer>,
rep_levels: Option<ScalarBuffer<i16>>,
- column_reader: Option<
- GenericColumnReader<ColumnLevelDecoderImpl,
DefinitionLevelBufferDecoder, CV>,
- >,
+ column_reader: Option<ColumnReader<CV>>,
/// Number of records accumulated in records
num_records: usize,
@@ -278,6 +279,11 @@ where
.map(|levels| levels.split_bitmask(self.num_values))
}
+ /// Returns column reader.
+ pub(crate) fn column_reader(&self) -> Option<&ColumnReader<CV>> {
+ self.column_reader.as_ref()
+ }
+
/// Try to read one batch of data.
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
let rep_levels = self
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index ea00bcf1b..8e0fa5a4d 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -316,9 +316,15 @@ where
self.page_reader.skip_next_page()?;
remaining -= metadata.num_rows;
continue;
+ };
+ // because self.num_buffered_values == self.num_decoded_values
means
+ // we need reads a new page and set up the decoders for levels
+ if !self.read_new_page()? {
+ return Ok(num_records - remaining);
}
}
+ // start skip values in page level
let to_read = remaining
.min((self.num_buffered_values - self.num_decoded_values) as
usize);