This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new bbb47a690 Expose page-level arrow reader API (#4298) (#4307)
bbb47a690 is described below
commit bbb47a69023094809b69110c79a8f18d191e54f1
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed May 31 11:49:48 2023 +0100
Expose page-level arrow reader API (#4298) (#4307)
* Expose page-level arrow reader API (#4298)
* Make scan_ranges public
* Review feedback
---
parquet/src/arrow/array_reader/builder.rs | 20 +++++-----
parquet/src/arrow/array_reader/list_array.rs | 4 +-
parquet/src/arrow/array_reader/mod.rs | 26 ++++--------
parquet/src/arrow/arrow_reader/mod.rs | 39 +++++++++++++-----
parquet/src/arrow/arrow_reader/selection.rs | 11 +++--
parquet/src/arrow/async_reader/mod.rs | 16 +++-----
parquet/src/arrow/mod.rs | 3 +-
parquet/src/arrow/schema/complex.rs | 12 +++---
parquet/src/arrow/schema/mod.rs | 60 +++++++++++++++++++++++-----
9 files changed, 122 insertions(+), 69 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
index 5e0d05e89..bb3f40335 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -24,7 +24,7 @@ use
crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_
use crate::arrow::array_reader::{
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
- PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
+ PrimitiveArrayReader, RowGroups, StructArrayReader,
};
use crate::arrow::schema::{ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
@@ -39,7 +39,7 @@ use crate::schema::types::{ColumnDescriptor, ColumnPath,
Type};
pub fn build_array_reader(
field: Option<&ParquetField>,
mask: &ProjectionMask,
- row_groups: &dyn RowGroupCollection,
+ row_groups: &dyn RowGroups,
) -> Result<Box<dyn ArrayReader>> {
let reader = field
.and_then(|field| build_reader(field, mask, row_groups).transpose())
@@ -52,7 +52,7 @@ pub fn build_array_reader(
fn build_reader(
field: &ParquetField,
mask: &ProjectionMask,
- row_groups: &dyn RowGroupCollection,
+ row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
match field.field_type {
ParquetFieldType::Primitive { .. } => {
@@ -75,7 +75,7 @@ fn build_reader(
fn build_map_reader(
field: &ParquetField,
mask: &ProjectionMask,
- row_groups: &dyn RowGroupCollection,
+ row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
let children = field.children().unwrap();
assert_eq!(children.len(), 2);
@@ -127,7 +127,7 @@ fn build_list_reader(
field: &ParquetField,
mask: &ProjectionMask,
is_large: bool,
- row_groups: &dyn RowGroupCollection,
+ row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
let children = field.children().unwrap();
assert_eq!(children.len(), 1);
@@ -173,7 +173,7 @@ fn build_list_reader(
fn build_fixed_size_list_reader(
field: &ParquetField,
mask: &ProjectionMask,
- row_groups: &dyn RowGroupCollection,
+ row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
let children = field.children().unwrap();
assert_eq!(children.len(), 1);
@@ -210,7 +210,7 @@ fn build_fixed_size_list_reader(
fn build_primitive_reader(
field: &ParquetField,
mask: &ProjectionMask,
- row_groups: &dyn RowGroupCollection,
+ row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
let (col_idx, primitive_type) = match &field.field_type {
ParquetFieldType::Primitive {
@@ -301,7 +301,7 @@ fn build_primitive_reader(
fn build_struct_reader(
field: &ParquetField,
mask: &ProjectionMask,
- row_groups: &dyn RowGroupCollection,
+ row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
let arrow_fields = match &field.arrow_type {
DataType::Struct(children) => children,
@@ -338,7 +338,7 @@ fn build_struct_reader(
#[cfg(test)]
mod tests {
use super::*;
- use crate::arrow::schema::parquet_to_array_schema_and_fields;
+ use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::util::test_common::file_util::get_test_file;
use arrow::datatypes::Field;
@@ -352,7 +352,7 @@ mod tests {
let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
- let (_, fields) = parquet_to_array_schema_and_fields(
+ let (_, fields) = parquet_to_arrow_schema_and_fields(
file_metadata.schema_descr(),
ProjectionMask::all(),
file_metadata.key_value_metadata(),
diff --git a/parquet/src/arrow/array_reader/list_array.rs
b/parquet/src/arrow/array_reader/list_array.rs
index 932034417..7c66c5c23 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -250,7 +250,7 @@ mod tests {
use crate::arrow::array_reader::build_array_reader;
use crate::arrow::array_reader::list_array::ListArrayReader;
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
- use crate::arrow::schema::parquet_to_array_schema_and_fields;
+ use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask};
use crate::file::properties::WriterProperties;
use crate::file::reader::{FileReader, SerializedFileReader};
@@ -566,7 +566,7 @@ mod tests {
let file_metadata = file_reader.metadata().file_metadata();
let schema = file_metadata.schema_descr();
let mask = ProjectionMask::leaves(schema, vec![0]);
- let (_, fields) = parquet_to_array_schema_and_fields(
+ let (_, fields) = parquet_to_arrow_schema_and_fields(
schema,
ProjectionMask::all(),
file_metadata.key_value_metadata(),
diff --git a/parquet/src/arrow/array_reader/mod.rs
b/parquet/src/arrow/array_reader/mod.rs
index 823084b43..1e781fb73 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -28,7 +28,6 @@ use crate::arrow::record_reader::GenericRecordReader;
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::file::reader::{FilePageIterator, FileReader};
-use crate::schema::types::SchemaDescPtr;
mod builder;
mod byte_array;
@@ -100,22 +99,15 @@ pub trait ArrayReader: Send {
}
/// A collection of row groups
-pub trait RowGroupCollection {
- /// Get schema of parquet file.
- fn schema(&self) -> SchemaDescPtr;
-
+pub trait RowGroups {
/// Get the number of rows in this collection
fn num_rows(&self) -> usize;
- /// Returns an iterator over the column chunks for particular column
+ /// Returns a [`PageIterator`] for the column chunks with the given leaf
column index
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
}
-impl RowGroupCollection for Arc<dyn FileReader> {
- fn schema(&self) -> SchemaDescPtr {
- self.metadata().file_metadata().schema_descr_ptr()
- }
-
+impl RowGroups for Arc<dyn FileReader> {
fn num_rows(&self) -> usize {
self.metadata().file_metadata().num_rows() as usize
}
@@ -126,26 +118,22 @@ impl RowGroupCollection for Arc<dyn FileReader> {
}
}
-pub(crate) struct FileReaderRowGroupCollection {
+pub(crate) struct FileReaderRowGroups {
/// The underling file reader
reader: Arc<dyn FileReader>,
/// Optional list of row group indices to scan
row_groups: Option<Vec<usize>>,
}
-impl FileReaderRowGroupCollection {
- /// Creates a new [`RowGroupCollection`] from a `FileReader` and an
optional
+impl FileReaderRowGroups {
+ /// Creates a new [`RowGroups`] from a `FileReader` and an optional
/// list of row group indexes to scan
pub fn new(reader: Arc<dyn FileReader>, row_groups: Option<Vec<usize>>) ->
Self {
Self { reader, row_groups }
}
}
-impl RowGroupCollection for FileReaderRowGroupCollection {
- fn schema(&self) -> SchemaDescPtr {
- self.reader.metadata().file_metadata().schema_descr_ptr()
- }
-
+impl RowGroups for FileReaderRowGroups {
fn num_rows(&self) -> usize {
match &self.row_groups {
None => self.reader.metadata().file_metadata().num_rows() as usize,
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index f3e178bdf..9cb09c9a5 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -26,12 +26,9 @@ use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;
-use crate::arrow::array_reader::{
- build_array_reader, ArrayReader, FileReaderRowGroupCollection,
RowGroupCollection,
-};
-use crate::arrow::schema::parquet_to_array_schema_and_fields;
-use crate::arrow::schema::ParquetField;
-use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{build_array_reader, ArrayReader,
FileReaderRowGroups};
+use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
+use crate::arrow::{FieldLevels, ProjectionMask};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::{ChunkReader, SerializedFileReader};
@@ -41,6 +38,7 @@ use crate::schema::types::SchemaDescriptor;
mod filter;
mod selection;
+pub use crate::arrow::array_reader::RowGroups;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
@@ -87,7 +85,7 @@ impl<T> ArrowReaderBuilder<T> {
false => metadata.file_metadata().key_value_metadata(),
};
- let (schema, fields) = parquet_to_array_schema_and_fields(
+ let (schema, fields) = parquet_to_arrow_schema_and_fields(
metadata.file_metadata().schema_descr(),
ProjectionMask::all(),
kv_metadata,
@@ -269,8 +267,7 @@ impl<T: ChunkReader + 'static>
ArrowReaderBuilder<SyncReader<T>> {
///
/// Note: this will eagerly evaluate any `RowFilter` before returning
pub fn build(self) -> Result<ParquetRecordBatchReader> {
- let reader =
- FileReaderRowGroupCollection::new(Arc::new(self.input.0),
self.row_groups);
+ let reader = FileReaderRowGroups::new(Arc::new(self.input.0),
self.row_groups);
let mut filter = self.filter;
let mut selection = self.selection;
@@ -420,6 +417,30 @@ impl ParquetRecordBatchReader {
.build()
}
+ /// Create a new [`ParquetRecordBatchReader`] from the provided
[`RowGroups`]
+ ///
+ /// Note: this is a low-level interface see
[`ParquetRecordBatchReader::try_new`] for a
+ /// higher-level interface for reading parquet data from a file
+ pub fn try_new_with_row_groups(
+ levels: &FieldLevels,
+ row_groups: &dyn RowGroups,
+ batch_size: usize,
+ selection: Option<RowSelection>,
+ ) -> Result<Self> {
+ let array_reader = build_array_reader(
+ levels.levels.as_ref(),
+ &ProjectionMask::all(),
+ row_groups,
+ )?;
+
+ Ok(Self {
+ batch_size,
+ array_reader,
+ schema: Arc::new(Schema::new(levels.fields.clone())),
+ selection: selection.map(|s| s.trim().into()),
+ })
+ }
+
/// Create a new [`ParquetRecordBatchReader`] that will read at most
`batch_size` rows at
/// a time from [`ArrayReader`] based on the configured `selection`. If
`selection` is `None`
/// all rows will be returned
diff --git a/parquet/src/arrow/arrow_reader/selection.rs
b/parquet/src/arrow/arrow_reader/selection.rs
index 76f950620..a558f893c 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -173,9 +173,14 @@ impl RowSelection {
}
}
- /// Given an offset index, return the offset ranges for all data pages
selected by `self`
- #[cfg(any(test, feature = "async"))]
- pub(crate) fn scan_ranges(
+ /// Given an offset index, return the byte ranges for all data pages
selected by `self`
+ ///
+ /// This is useful for determining what byte ranges to fetch from
underlying storage
+ ///
+ /// Note: this method does not make any effort to combine consecutive
ranges, nor coalesce
+ /// ranges that are close together. This is instead delegated to the IO
subsystem to optimise,
+ /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges)
+ pub fn scan_ranges(
&self,
page_locations: &[crate::format::PageLocation],
) -> Vec<Range<usize>> {
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index c11033eae..f17fb0751 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -94,12 +94,11 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek,
AsyncSeekExt};
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
-use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
+use crate::arrow::array_reader::{build_array_reader, RowGroups};
use crate::arrow::arrow_reader::{
apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder,
ArrowReaderOptions,
ParquetRecordBatchReader, RowFilter, RowSelection,
};
-use crate::arrow::schema::ParquetField;
use crate::arrow::ProjectionMask;
use crate::column::page::{PageIterator, PageReader};
@@ -112,14 +111,13 @@ use crate::format::PageLocation;
use crate::file::FOOTER_SIZE;
-use crate::schema::types::SchemaDescPtr;
-
mod metadata;
pub use metadata::*;
#[cfg(feature = "object_store")]
mod store;
+use crate::arrow::schema::ParquetField;
#[cfg(feature = "object_store")]
pub use store::*;
@@ -648,11 +646,7 @@ impl<'a> InMemoryRowGroup<'a> {
}
}
-impl<'a> RowGroupCollection for InMemoryRowGroup<'a> {
- fn schema(&self) -> SchemaDescPtr {
- self.metadata.schema_descr_ptr()
- }
-
+impl<'a> RowGroups for InMemoryRowGroup<'a> {
fn num_rows(&self) -> usize {
self.row_count
}
@@ -756,7 +750,7 @@ mod tests {
use crate::arrow::arrow_reader::{
ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
};
- use crate::arrow::schema::parquet_to_array_schema_and_fields;
+ use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::ArrowWriter;
use crate::file::footer::parse_metadata;
use crate::file::page_index::index_reader;
@@ -1401,7 +1395,7 @@ mod tests {
};
let requests = async_reader.requests.clone();
- let (_, fields) = parquet_to_array_schema_and_fields(
+ let (_, fields) = parquet_to_arrow_schema_and_fields(
metadata.file_metadata().schema_descr(),
ProjectionMask::all(),
None,
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index e5211ec23..aad4925c7 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -123,7 +123,8 @@ pub use self::async_writer::AsyncArrowWriter;
use crate::schema::types::SchemaDescriptor;
pub use self::schema::{
- arrow_to_parquet_schema, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns,
+ arrow_to_parquet_schema, parquet_to_arrow_field_levels,
parquet_to_arrow_schema,
+ parquet_to_arrow_schema_by_columns, FieldLevels,
};
/// Schema metadata key used to store serialized Arrow IPC schema
diff --git a/parquet/src/arrow/schema/complex.rs
b/parquet/src/arrow/schema/complex.rs
index c1699aafc..0d19875d9 100644
--- a/parquet/src/arrow/schema/complex.rs
+++ b/parquet/src/arrow/schema/complex.rs
@@ -24,7 +24,7 @@ use crate::basic::{ConvertedType, Repetition};
use crate::errors::ParquetError;
use crate::errors::Result;
use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
-use arrow_schema::{DataType, Field, Schema, SchemaBuilder};
+use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
fn get_repetition(t: &Type) -> Repetition {
let info = t.get_basic_info();
@@ -34,7 +34,8 @@ fn get_repetition(t: &Type) -> Repetition {
}
}
-/// Representation of a parquet file, in terms of arrow schema elements
+/// Representation of a parquet schema element, in terms of arrow schema
elements
+#[derive(Debug, Clone)]
pub struct ParquetField {
/// The level which represents an insertion into the current list
/// i.e. guaranteed to be > 0 for a list type
@@ -82,6 +83,7 @@ impl ParquetField {
}
}
+#[derive(Debug, Clone)]
pub enum ParquetFieldType {
Primitive {
/// The index of the column in parquet
@@ -554,13 +556,13 @@ fn convert_field(
/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with
`leaf_columns` listing
/// the indexes of leaf columns to project, and `embedded_arrow_schema` the
optional
-/// [`Schema`] embedded in the parquet metadata
+/// [`Fields`] embedded in the parquet metadata
///
/// Note: This does not support out of order column projection
pub fn convert_schema(
schema: &SchemaDescriptor,
mask: ProjectionMask,
- embedded_arrow_schema: Option<&Schema>,
+ embedded_arrow_schema: Option<&Fields>,
) -> Result<Option<ParquetField>> {
let mut visitor = Visitor {
next_col_idx: 0,
@@ -570,7 +572,7 @@ pub fn convert_schema(
let context = VisitorContext {
rep_level: 0,
def_level: 0,
- data_type: embedded_arrow_schema.map(|s|
DataType::Struct(s.fields().clone())),
+ data_type: embedded_arrow_schema.map(|fields|
DataType::Struct(fields.clone())),
};
visitor.dispatch(&schema.root_schema_ptr(), context)
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index a80d4add3..3b9691044 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -45,7 +45,8 @@ mod primitive;
use crate::arrow::ProjectionMask;
pub(crate) use complex::{ParquetField, ParquetFieldType};
-/// Convert Parquet schema to Arrow schema including optional metadata.
+/// Convert Parquet schema to Arrow schema including optional metadata
+///
/// Attempts to decode any existing Arrow schema metadata, falling back
/// to converting the Parquet schema column-wise
pub fn parquet_to_arrow_schema(
@@ -66,11 +67,11 @@ pub fn parquet_to_arrow_schema_by_columns(
mask: ProjectionMask,
key_value_metadata: Option<&Vec<KeyValue>>,
) -> Result<Schema> {
- Ok(parquet_to_array_schema_and_fields(parquet_schema, mask,
key_value_metadata)?.0)
+ Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask,
key_value_metadata)?.0)
}
/// Extracts the arrow metadata
-pub(crate) fn parquet_to_array_schema_and_fields(
+pub(crate) fn parquet_to_arrow_schema_and_fields(
parquet_schema: &SchemaDescriptor,
mask: ProjectionMask,
key_value_metadata: Option<&Vec<KeyValue>>,
@@ -88,15 +89,56 @@ pub(crate) fn parquet_to_array_schema_and_fields(
});
}
- match complex::convert_schema(parquet_schema, mask,
maybe_schema.as_ref())? {
+ let hint = maybe_schema.as_ref().map(|s| s.fields());
+ let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask,
hint)?;
+ let schema = Schema::new_with_metadata(field_levels.fields, metadata);
+ Ok((schema, field_levels.levels))
+}
+
+/// Schema information necessary to decode a parquet file as arrow [`Fields`]
+///
+/// In particular this stores the dremel-level information necessary to
correctly
+/// interpret the encoded definition and repetition levels
+///
+/// Note: this is an opaque container intended to be used with lower-level APIs
+/// within this crate
+#[derive(Debug, Clone)]
+pub struct FieldLevels {
+ pub(crate) fields: Fields,
+ pub(crate) levels: Option<ParquetField>,
+}
+
+/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
+///
+/// Columns not included within [`ProjectionMask`] will be ignored.
+///
+/// Where a field type in `hint` is compatible with the corresponding parquet
type in `schema`, it
+/// will be used, otherwise the default arrow type for the given parquet
column type will be used.
+///
+/// This is to accommodate arrow types that cannot be round-tripped through
parquet natively.
+/// Depending on the parquet writer, this can lead to a mismatch between a
file's parquet schema
+/// and its embedded arrow schema. The parquet `schema` must be treated as
authoritative in such
+/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for
more information
+///
+/// Note: this is a low-level API, most users will want to make use of the
higher-level
+/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
+pub fn parquet_to_arrow_field_levels(
+ schema: &SchemaDescriptor,
+ mask: ProjectionMask,
+ hint: Option<&Fields>,
+) -> Result<FieldLevels> {
+ match complex::convert_schema(schema, mask, hint)? {
Some(field) => match &field.arrow_type {
- DataType::Struct(fields) => Ok((
- Schema::new_with_metadata(fields.clone(), metadata),
- Some(field),
- )),
+ DataType::Struct(fields) => Ok(FieldLevels {
+ fields: fields.clone(),
+ levels: Some(field),
+ }),
_ => unreachable!(),
},
- None => Ok((Schema::new_with_metadata(Fields::empty(), metadata),
None)),
+ None => Ok(FieldLevels {
+ fields: Fields::empty(),
+ levels: None,
+ }),
}
}