This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d10d962ac Add integration test for scan rows with selection (#2158)
d10d962ac is described below

commit d10d962ac85a9675c511ff8f783d1b455588d220
Author: Yang Jiang <[email protected]>
AuthorDate: Wed Jul 27 17:27:03 2022 +0800

    Add integration test for scan rows with selection (#2158)
    
    * add test
    
    * fix some skip bug.
    
    * add it.
    
    * fix skip in head.
    
    * refine test case
    
    * fix fmt.
    
    * fix clippy
    
    * fix comment.
    
    * fix comment
---
 parquet/src/arrow/array_reader/byte_array.rs       |   3 +-
 .../arrow/array_reader/byte_array_dictionary.rs    |   3 +-
 .../src/arrow/array_reader/complex_object_array.rs |   8 +-
 parquet/src/arrow/array_reader/mod.rs              |  27 +++
 parquet/src/arrow/array_reader/null_array.rs       |   3 +-
 parquet/src/arrow/array_reader/primitive_array.rs  |   3 +-
 parquet/src/arrow/arrow_reader.rs                  | 223 ++++++++++++++++++++-
 parquet/src/arrow/record_reader/mod.rs             |  12 +-
 parquet/src/column/reader.rs                       |   6 +
 9 files changed, 276 insertions(+), 12 deletions(-)

diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index 34b38e1be..a29888f70 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
 use crate::arrow::buffer::offset_buffer::OffsetBuffer;
 use crate::arrow::record_reader::buffer::ScalarValue;
 use crate::arrow::record_reader::GenericRecordReader;
@@ -120,6 +120,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for 
ByteArrayReader<I> {
     }
 
     fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
         self.record_reader.skip_records(num_records)
     }
 
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs 
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 486dfe211..eba9e578f 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -25,7 +25,7 @@ use arrow::buffer::Buffer;
 use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
 
 use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, 
ByteArrayDecoderPlain};
-use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
 use crate::arrow::buffer::{
     dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
 };
@@ -181,6 +181,7 @@ where
     }
 
     fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
         self.record_reader.skip_records(num_records)
     }
 
diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs 
b/parquet/src/arrow/array_reader/complex_object_array.rs
index 6e7585ff9..1390866cf 100644
--- a/parquet/src/arrow/array_reader/complex_object_array.rs
+++ b/parquet/src/arrow/array_reader/complex_object_array.rs
@@ -166,7 +166,13 @@ where
     fn skip_records(&mut self, num_records: usize) -> Result<usize> {
         match self.column_reader.as_mut() {
             Some(reader) => reader.skip_records(num_records),
-            None => Ok(0),
+            None => {
+                if self.next_column_reader()? {
+                    
self.column_reader.as_mut().unwrap().skip_records(num_records)
+                }else {
+                    Ok(0)
+                }
+            }
         }
     }
 
diff --git a/parquet/src/arrow/array_reader/mod.rs 
b/parquet/src/arrow/array_reader/mod.rs
index e30c33bba..a9d8cc0fa 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -144,3 +144,30 @@ where
     }
     Ok(records_read)
 }
+
+/// Uses `pages` to set up to `record_reader` 's `column_reader`
+///
+/// If we skip records before all read operation,
+/// need set `column_reader` by `set_page_reader`
+/// for constructing `def_level_decoder` and `rep_level_decoder`.
+fn set_column_reader<V, CV>(
+    record_reader: &mut GenericRecordReader<V, CV>,
+    pages: &mut dyn PageIterator,
+) -> Result<bool>
+where
+    V: ValuesBuffer + Default,
+    CV: ColumnValueDecoder<Slice = V::Slice>,
+{
+    return if record_reader.column_reader().is_none() {
+        // If we skip records before all read operation
+        // we need set `column_reader` by `set_page_reader`
+        if let Some(page_reader) = pages.next() {
+            record_reader.set_page_reader(page_reader?)?;
+            Ok(true)
+        } else {
+            Ok(false)
+        }
+    } else {
+        Ok(true)
+    };
+}
diff --git a/parquet/src/arrow/array_reader/null_array.rs 
b/parquet/src/arrow/array_reader/null_array.rs
index b207d8b2c..a8c50b87f 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
 use crate::arrow::record_reader::buffer::ScalarValue;
 use crate::arrow::record_reader::RecordReader;
 use crate::column::page::PageIterator;
@@ -97,6 +97,7 @@ where
     }
 
     fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
         self.record_reader.skip_records(num_records)
     }
 
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index 0488e254c..700b12b0a 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::array_reader::{read_records, set_column_reader, ArrayReader};
 use crate::arrow::record_reader::buffer::ScalarValue;
 use crate::arrow::record_reader::RecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
@@ -222,6 +222,7 @@ where
     }
 
     fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
         self.record_reader.skip_records(num_records)
     }
 
diff --git a/parquet/src/arrow/arrow_reader.rs 
b/parquet/src/arrow/arrow_reader.rs
index f0129b780..8b4641d68 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -33,6 +33,7 @@ use crate::arrow::ProjectionMask;
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{KeyValue, ParquetMetaData};
 use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
+use crate::file::serialized_reader::ReadOptionsBuilder;
 use crate::schema::types::SchemaDescriptor;
 
 /// Arrow reader api.
@@ -217,7 +218,15 @@ impl ParquetFileArrowReader {
         chunk_reader: R,
         options: ArrowReaderOptions,
     ) -> Result<Self> {
-        let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
+        let file_reader = if options.selection.is_some() {
+            let options = ReadOptionsBuilder::new().with_page_index().build();
+            Arc::new(SerializedFileReader::new_with_options(
+                chunk_reader,
+                options,
+            )?)
+        } else {
+            Arc::new(SerializedFileReader::new(chunk_reader)?)
+        };
         Ok(Self::new_with_options(file_reader, options))
     }
 
@@ -298,12 +307,15 @@ impl Iterator for ParquetRecordBatchReader {
                     continue;
                 }
 
+                // try to read record
                 let to_read = match 
front.row_count.checked_sub(self.batch_size) {
-                    Some(remaining) => {
-                        selection.push_front(RowSelection::skip(remaining));
+                    Some(remaining) if remaining != 0 => {
+                        // if page row count less than batch_size we must set 
batch size to page row count.
+                        // add check avoid dead loop
+                        selection.push_front(RowSelection::select(remaining));
                         self.batch_size
                     }
-                    None => front.row_count,
+                    _ => front.row_count,
                 };
 
                 break to_read;
@@ -390,6 +402,7 @@ mod tests {
 
     use crate::arrow::arrow_reader::{
         ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
+        ParquetRecordBatchReader, RowSelection,
     };
     use crate::arrow::buffer::converter::{
         BinaryArrayConverter, Converter, FixedSizeArrayConverter, 
FromConverter,
@@ -1591,4 +1604,206 @@ mod tests {
         test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1);
         test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE);
     }
+
+    #[test]
+    fn test_scan_row_with_selection() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
+        let test_file = File::open(&path).unwrap();
+
+        // total row count 7300
+        // 1. test selection len more than one page row count
+        let batch_size = 1000;
+        let expected_data = create_expect_batch(&test_file, batch_size);
+
+        let selections = create_test_selection(batch_size, 7300, false);
+        let skip_reader = create_skip_reader(&test_file, batch_size, 
selections);
+        let mut total_row_count = 0;
+        let mut index = 0;
+        for batch in skip_reader {
+            let batch = batch.unwrap();
+            assert_eq!(batch, expected_data.get(index).unwrap().clone());
+            index += 2;
+            let num = batch.num_rows();
+            assert!(num == batch_size || num == 300);
+            total_row_count += num;
+        }
+        assert_eq!(total_row_count, 4000);
+
+        let selections = create_test_selection(batch_size, 7300, true);
+        let skip_reader = create_skip_reader(&test_file, batch_size, 
selections);
+        let mut total_row_count = 0;
+        let mut index = 1;
+        for batch in skip_reader {
+            let batch = batch.unwrap();
+            assert_eq!(batch, expected_data.get(index).unwrap().clone());
+            index += 2;
+            let num = batch.num_rows();
+            //the lase batch will be 300
+            assert!(num == batch_size || num == 300);
+            total_row_count += num;
+        }
+        assert_eq!(total_row_count, 3300);
+
+        // 2. test selection len less than one page row count
+        let batch_size = 20;
+        let expected_data = create_expect_batch(&test_file, batch_size);
+        let selections = create_test_selection(batch_size, 7300, false);
+
+        let skip_reader = create_skip_reader(&test_file, batch_size, 
selections);
+        let mut total_row_count = 0;
+        let mut index = 0;
+        for batch in skip_reader {
+            let batch = batch.unwrap();
+            assert_eq!(batch, expected_data.get(index).unwrap().clone());
+            index += 2;
+            let num = batch.num_rows();
+            assert_eq!(num, batch_size);
+            total_row_count += num;
+        }
+        assert_eq!(total_row_count, 3660);
+
+        let selections = create_test_selection(batch_size, 7300, true);
+        let skip_reader = create_skip_reader(&test_file, batch_size, 
selections);
+        let mut total_row_count = 0;
+        let mut index = 1;
+        for batch in skip_reader {
+            let batch = batch.unwrap();
+            assert_eq!(batch, expected_data.get(index).unwrap().clone());
+            index += 2;
+            let num = batch.num_rows();
+            assert_eq!(num, batch_size);
+            total_row_count += num;
+        }
+        assert_eq!(total_row_count, 3640);
+
+        // 3. test selection_len less than batch_size
+        let batch_size = 20;
+        let selection_len = 5;
+        let expected_data_batch = create_expect_batch(&test_file, batch_size);
+        let expected_data_selection = create_expect_batch(&test_file, 
selection_len);
+        let selections = create_test_selection(selection_len, 7300, false);
+        let skip_reader = create_skip_reader(&test_file, batch_size, 
selections);
+
+        let mut total_row_count = 0;
+
+        for batch in skip_reader {
+            let batch = batch.unwrap();
+            let num = batch.num_rows();
+            assert!(num == batch_size || num == selection_len);
+            if num == batch_size {
+                assert_eq!(
+                    batch,
+                    expected_data_batch
+                        .get(total_row_count / batch_size)
+                        .unwrap()
+                        .clone()
+                );
+                total_row_count += batch_size;
+            } else if num == selection_len {
+                assert_eq!(
+                    batch,
+                    expected_data_selection
+                        .get(total_row_count / selection_len)
+                        .unwrap()
+                        .clone()
+                );
+                total_row_count += selection_len;
+            }
+            // add skip offset
+            total_row_count += selection_len;
+        }
+
+        // 4. test selection_len more than batch_size
+        // If batch_size < selection_len will divide selection(50, read) ->
+        // selection(20, read), selection(20, read), selection(10, read)
+        let batch_size = 20;
+        let selection_len = 50;
+        let another_batch_size = 10;
+        let expected_data_batch = create_expect_batch(&test_file, batch_size);
+        let expected_data_batch2 = create_expect_batch(&test_file, 
another_batch_size);
+        let selections = create_test_selection(selection_len, 7300, false);
+        let skip_reader = create_skip_reader(&test_file, batch_size, 
selections);
+
+        let mut total_row_count = 0;
+
+        for batch in skip_reader {
+            let batch = batch.unwrap();
+            let num = batch.num_rows();
+            assert!(num == batch_size || num == another_batch_size);
+            if num == batch_size {
+                assert_eq!(
+                    batch,
+                    expected_data_batch
+                        .get(total_row_count / batch_size)
+                        .unwrap()
+                        .clone()
+                );
+                total_row_count += batch_size;
+            } else if num == another_batch_size {
+                assert_eq!(
+                    batch,
+                    expected_data_batch2
+                        .get(total_row_count / another_batch_size)
+                        .unwrap()
+                        .clone()
+                );
+                total_row_count += 10;
+                // add skip offset
+                total_row_count += selection_len;
+            }
+        }
+
+        fn create_skip_reader(
+            test_file: &File,
+            batch_size: usize,
+            selections: Vec<RowSelection>,
+        ) -> ParquetRecordBatchReader {
+            let arrow_reader_options =
+                ArrowReaderOptions::new().with_row_selection(selections);
+
+            let mut skip_arrow_reader = 
ParquetFileArrowReader::try_new_with_options(
+                test_file.try_clone().unwrap(),
+                arrow_reader_options,
+            )
+            .unwrap();
+            skip_arrow_reader.get_record_reader(batch_size).unwrap()
+        }
+
+        fn create_test_selection(
+            step_len: usize,
+            total_len: usize,
+            skip_first: bool,
+        ) -> Vec<RowSelection> {
+            let mut remaining = total_len;
+            let mut skip = skip_first;
+            let mut vec = vec![];
+            while remaining != 0 {
+                let step = if remaining > step_len {
+                    step_len
+                } else {
+                    remaining
+                };
+                vec.push(RowSelection {
+                    row_count: step,
+                    skip,
+                });
+                remaining -= step;
+                skip = !skip;
+            }
+            vec
+        }
+
+        fn create_expect_batch(test_file: &File, batch_size: usize) -> 
Vec<RecordBatch> {
+            let mut serial_arrow_reader =
+                
ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap();
+            let serial_reader =
+                serial_arrow_reader.get_record_reader(batch_size).unwrap();
+            let mut expected_data = vec![];
+            for batch in serial_reader {
+                expected_data.push(batch.unwrap());
+            }
+            expected_data
+        }
+    }
 }
diff --git a/parquet/src/arrow/record_reader/mod.rs 
b/parquet/src/arrow/record_reader/mod.rs
index 04499997e..b68f59d51 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -45,6 +45,9 @@ pub(crate) const MIN_BATCH_SIZE: usize = 1024;
 pub type RecordReader<T> =
     GenericRecordReader<ScalarBuffer<<T as DataType>::T>, 
ColumnValueDecoderImpl<T>>;
 
+pub(crate) type ColumnReader<CV> =
+    GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelBufferDecoder, 
CV>;
+
 /// A generic stateful column reader that delimits semantic records
 ///
 /// This type is hidden from the docs, and relies on private traits with no
@@ -56,9 +59,7 @@ pub struct GenericRecordReader<V, CV> {
     records: V,
     def_levels: Option<DefinitionLevelBuffer>,
     rep_levels: Option<ScalarBuffer<i16>>,
-    column_reader: Option<
-        GenericColumnReader<ColumnLevelDecoderImpl, 
DefinitionLevelBufferDecoder, CV>,
-    >,
+    column_reader: Option<ColumnReader<CV>>,
 
     /// Number of records accumulated in records
     num_records: usize,
@@ -278,6 +279,11 @@ where
             .map(|levels| levels.split_bitmask(self.num_values))
     }
 
+    /// Returns column reader.
+    pub(crate) fn column_reader(&self) -> Option<&ColumnReader<CV>> {
+        self.column_reader.as_ref()
+    }
+
     /// Try to read one batch of data.
     fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
         let rep_levels = self
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index ea00bcf1b..8e0fa5a4d 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -316,9 +316,15 @@ where
                     self.page_reader.skip_next_page()?;
                     remaining -= metadata.num_rows;
                     continue;
+                };
+                // because self.num_buffered_values == self.num_decoded_values 
means
+                // we need reads a new page and set up the decoders for levels
+                if !self.read_new_page()? {
+                    return Ok(num_records - remaining);
                 }
             }
 
+            // start skip values in page level
             let to_read = remaining
                 .min((self.num_buffered_values - self.num_decoded_values) as 
usize);
 

Reply via email to