This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d4d11fe7a4 Assume Pages Delimit Records When Offset Index Loaded 
(#4921) (#4943)
d4d11fe7a4 is described below

commit d4d11fe7a47b529429020848f2ac0f63659500d6
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Oct 17 22:09:12 2023 +0100

    Assume Pages Delimit Records When Offset Index Loaded (#4921) (#4943)
    
    * Assume records not split across pages (#4921)
    
    * More test
    
    * Add PageReader::at_record_boundary
    
    * Fix flush partial
---
 parquet/src/arrow/array_reader/mod.rs |  2 +-
 parquet/src/arrow/async_reader/mod.rs | 96 ++++++++++++++++++++++++++++++++++-
 parquet/src/column/page.rs            | 14 +++++
 parquet/src/column/reader.rs          |  8 +--
 parquet/src/column/reader/decoder.rs  |  7 +++
 parquet/src/file/serialized_reader.rs |  9 ++++
 6 files changed, 129 insertions(+), 7 deletions(-)

diff --git a/parquet/src/arrow/array_reader/mod.rs 
b/parquet/src/arrow/array_reader/mod.rs
index 625ac034ef..a4ee504059 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -152,7 +152,7 @@ where
     Ok(records_read)
 }
 
-/// Uses `record_reader` to skip up to `batch_size` records from`pages`
+/// Uses `record_reader` to skip up to `batch_size` records from `pages`
 ///
 /// Returns the number of records skipped, which can be less than `batch_size` 
if
 /// pages is exhausted
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 4b3eebf2e6..875fff4dac 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -878,12 +878,17 @@ mod tests {
     use crate::file::properties::WriterProperties;
     use arrow::compute::kernels::cmp::eq;
     use arrow::error::Result as ArrowResult;
+    use arrow_array::builder::{ListBuilder, StringBuilder};
     use arrow_array::cast::AsArray;
     use arrow_array::types::Int32Type;
-    use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, 
StringArray};
-    use futures::TryStreamExt;
+    use arrow_array::{
+        Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, 
UInt64Array,
+    };
+    use arrow_schema::{DataType, Field, Schema};
+    use futures::{StreamExt, TryStreamExt};
     use rand::{thread_rng, Rng};
     use std::sync::Mutex;
+    use tempfile::tempfile;
 
     #[derive(Clone)]
     struct TestReader {
@@ -1677,4 +1682,91 @@ mod tests {
         assert!(sbbf.check(&"Hello"));
         assert!(!sbbf.check(&"Hello_Not_Exists"));
     }
+
+    #[tokio::test]
+    async fn test_nested_skip() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("col_1", DataType::UInt64, false),
+            Field::new_list("col_2", Field::new("item", DataType::Utf8, true), 
true),
+        ]));
+
+        // Default writer properties
+        let props = WriterProperties::builder()
+            .set_data_page_row_count_limit(256)
+            .set_write_batch_size(256)
+            .set_max_row_group_size(1024);
+
+        // Write data
+        let mut file = tempfile().unwrap();
+        let mut writer =
+            ArrowWriter::try_new(&mut file, schema.clone(), 
Some(props.build())).unwrap();
+
+        let mut builder = ListBuilder::new(StringBuilder::new());
+        for id in 0..1024 {
+            match id % 3 {
+                0 => builder
+                    .append_value([Some("val_1".to_string()), 
Some(format!("id_{id}"))]),
+                1 => builder.append_value([Some(format!("id_{id}"))]),
+                _ => builder.append_null(),
+            }
+        }
+        let refs = vec![
+            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
+            Arc::new(builder.finish()) as ArrayRef,
+        ];
+
+        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        let selections = [
+            RowSelection::from(vec![
+                RowSelector::skip(313),
+                RowSelector::select(1),
+                RowSelector::skip(709),
+                RowSelector::select(1),
+            ]),
+            RowSelection::from(vec![
+                RowSelector::skip(255),
+                RowSelector::select(1),
+                RowSelector::skip(767),
+                RowSelector::select(1),
+            ]),
+            RowSelection::from(vec![
+                RowSelector::select(255),
+                RowSelector::skip(1),
+                RowSelector::select(767),
+                RowSelector::skip(1),
+            ]),
+            RowSelection::from(vec![
+                RowSelector::skip(254),
+                RowSelector::select(1),
+                RowSelector::select(1),
+                RowSelector::skip(767),
+                RowSelector::select(1),
+            ]),
+        ];
+
+        for selection in selections {
+            let expected = selection.row_count();
+            // Read data
+            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
+                tokio::fs::File::from_std(file.try_clone().unwrap()),
+                ArrowReaderOptions::new().with_page_index(true),
+            )
+            .await
+            .unwrap();
+
+            reader = reader.with_row_selection(selection);
+
+            let mut stream = reader.build().unwrap();
+
+            let mut total_rows = 0;
+            while let Some(rb) = stream.next().await {
+                let rb = rb.unwrap();
+                total_rows += rb.num_rows();
+            }
+            assert_eq!(total_rows, expected);
+        }
+    }
 }
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index ec9af2aa27..933e423862 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -320,6 +320,20 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send 
{
     /// Skips reading the next page, returns an error if no
     /// column index information
     fn skip_next_page(&mut self) -> Result<()>;
+
+    /// Returns `true` if the next page can be assumed to contain the start of 
a new record
+    ///
+    /// Prior to parquet V2 the specification was ambiguous as to whether a 
single record
+    /// could be split across multiple pages, and prior to [(#4327)] the Rust 
writer would do
+    /// this in certain situations. However, correctly interpreting the offset 
index relies on
+    /// this assumption holding [(#4943)], and so this mechanism is provided 
for a [`PageReader`]
+    /// to signal this to the calling context
+    ///
+    /// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
+    /// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
+    fn at_record_boundary(&mut self) -> Result<bool> {
+        Ok(self.peek_next_page()?.is_none())
+    }
 }
 
 /// API for writing pages in a column chunk.
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 3ce00622e9..52ad4d644c 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -269,7 +269,7 @@ where
                         // Reached end of page, which implies records_read < 
remaining_records
                         // as otherwise would have stopped reading before 
reaching the end
                         assert!(records_read < remaining_records); // Sanity 
check
-                        records_read += 1;
+                        records_read += reader.flush_partial() as usize;
                     }
                     (records_read, levels_read)
                 }
@@ -380,7 +380,7 @@ where
                         // Reached end of page, which implies records_read < 
remaining_records
                         // as otherwise would have stopped reading before 
reaching the end
                         assert!(records_read < remaining_records); // Sanity 
check
-                        records_read += 1;
+                        records_read += decoder.flush_partial() as usize;
                     }
 
                     (records_read, levels_read)
@@ -491,7 +491,7 @@ where
                                 offset += bytes_read;
 
                                 self.has_record_delimiter =
-                                    
self.page_reader.peek_next_page()?.is_none();
+                                    self.page_reader.at_record_boundary()?;
 
                                 self.rep_level_decoder
                                     .as_mut()
@@ -548,7 +548,7 @@ where
                                 // across multiple pages, however, the parquet 
writer
                                 // used to do this so we preserve backwards 
compatibility
                                 self.has_record_delimiter =
-                                    
self.page_reader.peek_next_page()?.is_none();
+                                    self.page_reader.at_record_boundary()?;
 
                                 
self.rep_level_decoder.as_mut().unwrap().set_data(
                                     Encoding::RLE,
diff --git a/parquet/src/column/reader/decoder.rs 
b/parquet/src/column/reader/decoder.rs
index 369b335dc9..27ffb7637e 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -102,6 +102,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
         num_records: usize,
         num_levels: usize,
     ) -> Result<(usize, usize)>;
+
+    /// Flush any partially read or skipped record
+    fn flush_partial(&mut self) -> bool;
 }
 
 pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
@@ -519,6 +522,10 @@ impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl 
{
         }
         Ok((total_records_read, total_levels_read))
     }
+
+    fn flush_partial(&mut self) -> bool {
+        std::mem::take(&mut self.has_partial)
+    }
 }
 
 #[cfg(test)]
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 4bc484144a..b60d30ffea 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -770,6 +770,15 @@ impl<R: ChunkReader> PageReader for 
SerializedPageReader<R> {
             }
         }
     }
+
+    fn at_record_boundary(&mut self) -> Result<bool> {
+        match &mut self.state {
+            SerializedPageReaderState::Values { .. } => {
+                Ok(self.peek_next_page()?.is_none())
+            }
+            SerializedPageReaderState::Pages { .. } => Ok(true),
+        }
+    }
 }
 
 #[cfg(test)]

Reply via email to