This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d4d11fe7a4 Assume Pages Delimit Records When Offset Index Loaded
(#4921) (#4943)
d4d11fe7a4 is described below
commit d4d11fe7a47b529429020848f2ac0f63659500d6
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Oct 17 22:09:12 2023 +0100
Assume Pages Delimit Records When Offset Index Loaded (#4921) (#4943)
* Assume records not split across pages (#4921)
* More test
* Add PageReader::at_record_boundary
* Fix flush partial
---
parquet/src/arrow/array_reader/mod.rs | 2 +-
parquet/src/arrow/async_reader/mod.rs | 96 ++++++++++++++++++++++++++++++++++-
parquet/src/column/page.rs | 14 +++++
parquet/src/column/reader.rs | 8 +--
parquet/src/column/reader/decoder.rs | 7 +++
parquet/src/file/serialized_reader.rs | 9 ++++
6 files changed, 129 insertions(+), 7 deletions(-)
diff --git a/parquet/src/arrow/array_reader/mod.rs
b/parquet/src/arrow/array_reader/mod.rs
index 625ac034ef..a4ee504059 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -152,7 +152,7 @@ where
Ok(records_read)
}
-/// Uses `record_reader` to skip up to `batch_size` records from`pages`
+/// Uses `record_reader` to skip up to `batch_size` records from `pages`
///
/// Returns the number of records skipped, which can be less than `batch_size`
if
/// pages is exhausted
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 4b3eebf2e6..875fff4dac 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -878,12 +878,17 @@ mod tests {
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
use arrow::error::Result as ArrowResult;
+ use arrow_array::builder::{ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
- use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar,
StringArray};
- use futures::TryStreamExt;
+ use arrow_array::{
+ Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray,
UInt64Array,
+ };
+ use arrow_schema::{DataType, Field, Schema};
+ use futures::{StreamExt, TryStreamExt};
use rand::{thread_rng, Rng};
use std::sync::Mutex;
+ use tempfile::tempfile;
#[derive(Clone)]
struct TestReader {
@@ -1677,4 +1682,91 @@ mod tests {
assert!(sbbf.check(&"Hello"));
assert!(!sbbf.check(&"Hello_Not_Exists"));
}
+
+ #[tokio::test]
+ async fn test_nested_skip() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("col_1", DataType::UInt64, false),
+ Field::new_list("col_2", Field::new("item", DataType::Utf8, true),
true),
+ ]));
+
+ // Default writer properties
+ let props = WriterProperties::builder()
+ .set_data_page_row_count_limit(256)
+ .set_write_batch_size(256)
+ .set_max_row_group_size(1024);
+
+ // Write data
+ let mut file = tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(&mut file, schema.clone(),
Some(props.build())).unwrap();
+
+ let mut builder = ListBuilder::new(StringBuilder::new());
+ for id in 0..1024 {
+ match id % 3 {
+ 0 => builder
+ .append_value([Some("val_1".to_string()),
Some(format!("id_{id}"))]),
+ 1 => builder.append_value([Some(format!("id_{id}"))]),
+ _ => builder.append_null(),
+ }
+ }
+ let refs = vec![
+ Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
+ Arc::new(builder.finish()) as ArrayRef,
+ ];
+
+ let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let selections = [
+ RowSelection::from(vec![
+ RowSelector::skip(313),
+ RowSelector::select(1),
+ RowSelector::skip(709),
+ RowSelector::select(1),
+ ]),
+ RowSelection::from(vec![
+ RowSelector::skip(255),
+ RowSelector::select(1),
+ RowSelector::skip(767),
+ RowSelector::select(1),
+ ]),
+ RowSelection::from(vec![
+ RowSelector::select(255),
+ RowSelector::skip(1),
+ RowSelector::select(767),
+ RowSelector::skip(1),
+ ]),
+ RowSelection::from(vec![
+ RowSelector::skip(254),
+ RowSelector::select(1),
+ RowSelector::select(1),
+ RowSelector::skip(767),
+ RowSelector::select(1),
+ ]),
+ ];
+
+ for selection in selections {
+ let expected = selection.row_count();
+ // Read data
+ let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
+ tokio::fs::File::from_std(file.try_clone().unwrap()),
+ ArrowReaderOptions::new().with_page_index(true),
+ )
+ .await
+ .unwrap();
+
+ reader = reader.with_row_selection(selection);
+
+ let mut stream = reader.build().unwrap();
+
+ let mut total_rows = 0;
+ while let Some(rb) = stream.next().await {
+ let rb = rb.unwrap();
+ total_rows += rb.num_rows();
+ }
+ assert_eq!(total_rows, expected);
+ }
+ }
}
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index ec9af2aa27..933e423862 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -320,6 +320,20 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send
{
/// Skips reading the next page, returns an error if no
/// column index information
fn skip_next_page(&mut self) -> Result<()>;
+
+ /// Returns `true` if the next page can be assumed to contain the start of
a new record
+ ///
+ /// Prior to parquet V2 the specification was ambiguous as to whether a
single record
+ /// could be split across multiple pages, and prior to [(#4327)] the Rust
writer would do
+ /// this in certain situations. However, correctly interpreting the offset
index relies on
+ /// this assumption holding [(#4943)], and so this mechanism is provided
for a [`PageReader`]
+ /// to signal this to the calling context
+ ///
+ /// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
+ /// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
+ fn at_record_boundary(&mut self) -> Result<bool> {
+ Ok(self.peek_next_page()?.is_none())
+ }
}
/// API for writing pages in a column chunk.
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 3ce00622e9..52ad4d644c 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -269,7 +269,7 @@ where
// Reached end of page, which implies records_read <
remaining_records
// as otherwise would have stopped reading before
reaching the end
assert!(records_read < remaining_records); // Sanity
check
- records_read += 1;
+ records_read += reader.flush_partial() as usize;
}
(records_read, levels_read)
}
@@ -380,7 +380,7 @@ where
// Reached end of page, which implies records_read <
remaining_records
// as otherwise would have stopped reading before
reaching the end
assert!(records_read < remaining_records); // Sanity
check
- records_read += 1;
+ records_read += decoder.flush_partial() as usize;
}
(records_read, levels_read)
@@ -491,7 +491,7 @@ where
offset += bytes_read;
self.has_record_delimiter =
-
self.page_reader.peek_next_page()?.is_none();
+ self.page_reader.at_record_boundary()?;
self.rep_level_decoder
.as_mut()
@@ -548,7 +548,7 @@ where
// across multiple pages, however, the parquet
writer
// used to do this so we preserve backwards
compatibility
self.has_record_delimiter =
-
self.page_reader.peek_next_page()?.is_none();
+ self.page_reader.at_record_boundary()?;
self.rep_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
diff --git a/parquet/src/column/reader/decoder.rs
b/parquet/src/column/reader/decoder.rs
index 369b335dc9..27ffb7637e 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -102,6 +102,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
num_records: usize,
num_levels: usize,
) -> Result<(usize, usize)>;
+
+ /// Flush any partially read or skipped record
+ fn flush_partial(&mut self) -> bool;
}
pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
@@ -519,6 +522,10 @@ impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl
{
}
Ok((total_records_read, total_levels_read))
}
+
+ fn flush_partial(&mut self) -> bool {
+ std::mem::take(&mut self.has_partial)
+ }
}
#[cfg(test)]
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 4bc484144a..b60d30ffea 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -770,6 +770,15 @@ impl<R: ChunkReader> PageReader for
SerializedPageReader<R> {
}
}
}
+
+ fn at_record_boundary(&mut self) -> Result<bool> {
+ match &mut self.state {
+ SerializedPageReaderState::Values { .. } => {
+ Ok(self.peek_next_page()?.is_none())
+ }
+ SerializedPageReaderState::Pages { .. } => Ok(true),
+ }
+ }
}
#[cfg(test)]