alamb commented on code in PR #6392:
URL: https://github.com/apache/arrow-rs/pull/6392#discussion_r1767234846
##########
parquet/src/errors.rs:
##########
@@ -61,6 +66,8 @@ impl std::fmt::Display for ParquetError {
write!(fmt, "Index {index} out of bound: {bound}")
}
ParquetError::External(e) => write!(fmt, "External: {e}"),
+ ParquetError::NeedMoreData(needed) => write!(fmt, "NeedMoreData:
{needed}"),
Review Comment:
A nitpick is that these seems pretty similar . I wonder if it would make
sense to combine them somehow 🤔
##########
parquet/src/file/metadata/reader.rs:
##########
@@ -0,0 +1,687 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{io::Read, ops::Range, sync::Arc};
+
+use bytes::Bytes;
+
+use crate::basic::ColumnOrder;
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{acc_range, decode_column_index,
decode_offset_index};
+use crate::file::reader::ChunkReader;
+use crate::file::{FOOTER_SIZE, PARQUET_MAGIC};
+use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as
TFileMetaData};
+use crate::schema::types;
+use crate::schema::types::SchemaDescriptor;
+use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
+
+#[cfg(feature = "async")]
+use crate::arrow::async_reader::MetadataFetch;
+
+/// Reads the [`ParquetMetaData`] from the footer of a Parquet file.
+///
+/// This function is a wrapper around [`ParquetMetaDataReader`]. The input,
which must implement
+/// [`ChunkReader`], may be a [`std::fs::File`] or [`Bytes`]. In the latter
case, the passed in
+/// buffer must contain the contents of the entire file if any of the Parquet
[Page Index]
+/// structures are to be populated (controlled via the `column_index` and
`offset_index`
+/// arguments).
+///
+/// [Page Index]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+pub fn parquet_metadata_from_file<R: ChunkReader>(
Review Comment:
A nitpick is maybe we can call this "parquet_metadata_from_reader`
Also I wonder if instead of a new API it would make sense to always directly
use `ParquetMetaDataReader` directly. That would certainly be more verbose, but
it also might be more explicit.
For the common case that the wrapping code won't retry (aka all the
callsites of `parquet_metadata_from_file`, we could also add some sort of
consuming API too that combines `try_parse` and `finish` to make it less
verbose. Something like
```rust
let metadata = ParquetMetaDataReader::new()
.with_column_indexes(column_index)
.with_offset_indexes(offset_index)
.parse(file)?;
##########
parquet/src/file/metadata/reader.rs:
##########
@@ -0,0 +1,687 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{io::Read, ops::Range, sync::Arc};
+
+use bytes::Bytes;
+
+use crate::basic::ColumnOrder;
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{acc_range, decode_column_index,
decode_offset_index};
+use crate::file::reader::ChunkReader;
+use crate::file::{FOOTER_SIZE, PARQUET_MAGIC};
+use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as
TFileMetaData};
+use crate::schema::types;
+use crate::schema::types::SchemaDescriptor;
+use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
+
+#[cfg(feature = "async")]
+use crate::arrow::async_reader::MetadataFetch;
+
+/// Reads the [`ParquetMetaData`] from the footer of a Parquet file.
+///
+/// This function is a wrapper around [`ParquetMetaDataReader`]. The input,
which must implement
+/// [`ChunkReader`], may be a [`std::fs::File`] or [`Bytes`]. In the latter
case, the passed in
+/// buffer must contain the contents of the entire file if any of the Parquet
[Page Index]
+/// structures are to be populated (controlled via the `column_index` and
`offset_index`
+/// arguments).
+///
+/// [Page Index]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+pub fn parquet_metadata_from_file<R: ChunkReader>(
+ file: &R,
+ column_index: bool,
+ offset_index: bool,
+) -> Result<ParquetMetaData> {
+ let mut reader = ParquetMetaDataReader::new()
+ .with_column_indexes(column_index)
+ .with_offset_indexes(offset_index);
+ reader.try_parse(file)?;
+ reader.finish()
+}
+
+/// Reads the [`ParquetMetaData`] from a byte stream.
+///
+/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a
description of
+/// the Parquet metadata.
+///
+/// # Example
+/// ```no_run
+/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
+/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
+/// // read parquet metadata including page indexes
+/// let file = open_parquet_file("some_path.parquet");
+/// let mut reader = ParquetMetaDataReader::new()
+/// .with_page_indexes(true);
+/// reader.try_parse(&file).unwrap();
+/// let metadata = reader.finish().unwrap();
+/// assert!(metadata.column_index().is_some());
+/// assert!(metadata.offset_index().is_some());
+/// ```
+#[derive(Default)]
+pub struct ParquetMetaDataReader {
+ metadata: Option<ParquetMetaData>,
+ column_index: bool,
+ offset_index: bool,
+ prefetch_hint: Option<usize>,
+}
+
+impl ParquetMetaDataReader {
+ /// Create a new [`ParquetMetaDataReader`]
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Create a new [`ParquetMetaDataReader`] populated with a
[`ParquetMetaData`] struct
+ /// obtained via other means. Primarily intended for use with
[`Self::load_page_index()`].
+ pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
+ Self {
+ metadata: Some(metadata),
+ ..Default::default()
+ }
+ }
+
+ /// Enable or disable reading the page index structures described in
+ /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to:
+ /// `self.with_column_indexes(val).with_offset_indexes(val)`
+ ///
+ /// [Parquet page index]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+ pub fn with_page_indexes(self, val: bool) -> Self {
+ self.with_column_indexes(val).with_offset_indexes(val)
+ }
+
+ /// Enable or disable reading the Parquet [ColumnIndex] structure.
+ ///
+ /// [ColumnIndex]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+ pub fn with_column_indexes(mut self, val: bool) -> Self {
+ self.column_index = val;
+ self
+ }
+
+ /// Enable or disable reading the Parquet [OffsetIndex] structure.
+ ///
+ /// [OffsetIndex]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+ pub fn with_offset_indexes(mut self, val: bool) -> Self {
+ self.offset_index = val;
+ self
+ }
+
+ /// Provide a hint as to the number of bytes needed to fully parse the
[`ParquetMetaData`].
+ /// Only used for the asynchronous [`Self::try_load()`] method.
+ ///
+ /// By default, the reader will first fetch the last 8 bytes of the input
file to obtain the
+ /// size of the footer metadata. A second fetch will be performed to
obtain the needed bytes.
+ /// After parsing the footer metadata, a third fetch will be performed to
obtain the bytes
+ /// needed to decode the page index structures, if they have been
requested. To avoid
+ /// unnecessary fetches, `prefetch` can be set to an estimate of the
number of bytes needed
+ /// to fully decode the [`ParquetMetaData`], which can reduce the number
of fetch requests and
+ /// reduce latency. Setting `prefetch` too small will not trigger an
error, but will result
+ /// in extra fetches being performed.
+ pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
+ self.prefetch_hint = prefetch;
+ self
+ }
+
+ /// Return the parsed [`ParquetMetaData`] struct.
+ pub fn finish(&mut self) -> Result<ParquetMetaData> {
+ self.metadata
+ .take()
+ .ok_or_else(|| general_err!("could not parse parquet metadata"))
+ }
+
+ /// Attempts to parse the footer metadata (and optionally page indexes)
given a [`ChunkReader`].
+ /// If `reader` is [`Bytes`] based, then the buffer must contain
sufficient bytes to complete
+ /// the request. If page indexes are desired, the buffer must contain the
entire file, or
+ /// [`Self::try_parse_range()`] should be used.
+ pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
+ self.try_parse_range(reader, 0..reader.len() as usize)
+ }
+
+ /// Same as [`Self::try_parse()`], but only `file_range` bytes of the
original file are
+ /// available. `file_range.end` must point to the end of the file.
+ pub fn try_parse_range<R: ChunkReader>(
+ &mut self,
+ reader: &R,
+ file_range: Range<usize>,
+ ) -> Result<()> {
+ self.metadata = match Self::parse_metadata(reader) {
+ Ok(metadata) => Some(metadata),
+ Err(ParquetError::NeedMoreData(needed)) => {
+ // If the provided range starts at 0, then presumably there is
no more to read,
+ // so return an EOF error.
+ if file_range.start == 0 {
+ return Err(eof_err!(
+ "Parquet file too small. Size is {} but need {needed}",
+ reader.len()
+ ));
+ } else {
+ return Err(ParquetError::NeedLargerRange(
+ file_range.end - needed..file_range.end,
+ ));
+ }
+ }
+ Err(e) => return Err(e),
+ };
+
+ // we can return if page indexes aren't requested
+ if !self.column_index && !self.offset_index {
+ return Ok(());
+ }
+
+ // TODO(ets): what is the correct behavior for missing page indexes?
MetadataLoader would
+ // leave them as `None`, while the parser in
`index_reader::read_columns_indexes` returns a
+ // vector of empty vectors.
+ // I think it's best to leave them as `None`.
+
+ // Get bounds needed for page indexes (if any are present in the file).
+ let Some(range) = self.range_for_page_index() else {
+ return Ok(());
+ };
+
+ // Check to see if needed range is within `file_range`. Checking
`range.end` seems
+ // redundant, but it guards against `range_for_page_index()` returning
garbage.
+ if !(file_range.contains(&range.start) &&
file_range.contains(&range.end)) {
+ return
Err(ParquetError::NeedLargerRange(range.start..file_range.end));
+ }
+
+ let bytes_needed = range.end - range.start;
+ let bytes = reader.get_bytes((range.start - file_range.start) as u64,
bytes_needed)?;
+ let offset = range.start;
+
+ self.parse_column_index(&bytes, offset)?;
+ self.parse_offset_index(&bytes, offset)?;
+
+ Ok(())
+ }
+
+ /// Attempts to (asynchronously) parse the footer metadata (and optionally
page indexes)
+ /// given a [`MetadataFetch`]. The file size must be known to use this
function.
Review Comment:
It might also be good to note here that `try_load` will attempt to minimize
the number of calls to `fetch` by prefetching but may make potentially multiple
requests depending on how the data is laid out.
As an aside (and not changed in this PR), I found the use of `MetadataFetch`
as basically an async version of `ChunkReader` confusing when trying to
understand this API
##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -237,8 +236,10 @@ where
Fut: Future<Output = Result<Bytes>> + Send,
{
let fetch = MetadataFetchFn(fetch);
- let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
- Ok(loader.finish())
+ // TODO(ets): should add option to read page index to this function
Review Comment:
An alternative perhaps would be to deprecate `fetch_parquet_metadata`
entirely and suggest people use `ParquetMetaDataReader` which s more complete
and full featured -- I think we could deprecate this function in a minor
release (we can't remover it until a major release)
##########
parquet/src/file/metadata/reader.rs:
##########
@@ -0,0 +1,687 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{io::Read, ops::Range, sync::Arc};
+
+use bytes::Bytes;
+
+use crate::basic::ColumnOrder;
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{acc_range, decode_column_index,
decode_offset_index};
+use crate::file::reader::ChunkReader;
+use crate::file::{FOOTER_SIZE, PARQUET_MAGIC};
+use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as
TFileMetaData};
+use crate::schema::types;
+use crate::schema::types::SchemaDescriptor;
+use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
+
+#[cfg(feature = "async")]
+use crate::arrow::async_reader::MetadataFetch;
+
+/// Reads the [`ParquetMetaData`] from the footer of a Parquet file.
+///
+/// This function is a wrapper around [`ParquetMetaDataReader`]. The input,
which must implement
+/// [`ChunkReader`], may be a [`std::fs::File`] or [`Bytes`]. In the latter
case, the passed in
+/// buffer must contain the contents of the entire file if any of the Parquet
[Page Index]
+/// structures are to be populated (controlled via the `column_index` and
`offset_index`
+/// arguments).
+///
+/// [Page Index]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+pub fn parquet_metadata_from_file<R: ChunkReader>(
+ file: &R,
+ column_index: bool,
+ offset_index: bool,
+) -> Result<ParquetMetaData> {
+ let mut reader = ParquetMetaDataReader::new()
+ .with_column_indexes(column_index)
+ .with_offset_indexes(offset_index);
+ reader.try_parse(file)?;
+ reader.finish()
+}
+
+/// Reads the [`ParquetMetaData`] from a byte stream.
+///
+/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a
description of
+/// the Parquet metadata.
+///
+/// # Example
+/// ```no_run
+/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
+/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
+/// // read parquet metadata including page indexes
+/// let file = open_parquet_file("some_path.parquet");
+/// let mut reader = ParquetMetaDataReader::new()
+/// .with_page_indexes(true);
+/// reader.try_parse(&file).unwrap();
+/// let metadata = reader.finish().unwrap();
+/// assert!(metadata.column_index().is_some());
+/// assert!(metadata.offset_index().is_some());
+/// ```
+#[derive(Default)]
+pub struct ParquetMetaDataReader {
+ metadata: Option<ParquetMetaData>,
+ column_index: bool,
+ offset_index: bool,
+ prefetch_hint: Option<usize>,
+}
+
+impl ParquetMetaDataReader {
+ /// Create a new [`ParquetMetaDataReader`]
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Create a new [`ParquetMetaDataReader`] populated with a
[`ParquetMetaData`] struct
+ /// obtained via other means. Primarily intended for use with
[`Self::load_page_index()`].
+ pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
+ Self {
+ metadata: Some(metadata),
+ ..Default::default()
+ }
+ }
+
+ /// Enable or disable reading the page index structures described in
+ /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to:
+ /// `self.with_column_indexes(val).with_offset_indexes(val)`
+ ///
+ /// [Parquet page index]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+ pub fn with_page_indexes(self, val: bool) -> Self {
+ self.with_column_indexes(val).with_offset_indexes(val)
+ }
+
+ /// Enable or disable reading the Parquet [ColumnIndex] structure.
+ ///
+ /// [ColumnIndex]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+ pub fn with_column_indexes(mut self, val: bool) -> Self {
+ self.column_index = val;
+ self
+ }
+
+ /// Enable or disable reading the Parquet [OffsetIndex] structure.
+ ///
+ /// [OffsetIndex]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+ pub fn with_offset_indexes(mut self, val: bool) -> Self {
+ self.offset_index = val;
+ self
+ }
+
+ /// Provide a hint as to the number of bytes needed to fully parse the
[`ParquetMetaData`].
+ /// Only used for the asynchronous [`Self::try_load()`] method.
+ ///
+ /// By default, the reader will first fetch the last 8 bytes of the input
file to obtain the
+ /// size of the footer metadata. A second fetch will be performed to
obtain the needed bytes.
+ /// After parsing the footer metadata, a third fetch will be performed to
obtain the bytes
+ /// needed to decode the page index structures, if they have been
requested. To avoid
+ /// unnecessary fetches, `prefetch` can be set to an estimate of the
number of bytes needed
+ /// to fully decode the [`ParquetMetaData`], which can reduce the number
of fetch requests and
+ /// reduce latency. Setting `prefetch` too small will not trigger an
error, but will result
+ /// in extra fetches being performed.
+ pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
+ self.prefetch_hint = prefetch;
+ self
+ }
+
+ /// Return the parsed [`ParquetMetaData`] struct.
+ pub fn finish(&mut self) -> Result<ParquetMetaData> {
+ self.metadata
+ .take()
+ .ok_or_else(|| general_err!("could not parse parquet metadata"))
+ }
+
+ /// Attempts to parse the footer metadata (and optionally page indexes)
given a [`ChunkReader`].
+ /// If `reader` is [`Bytes`] based, then the buffer must contain
sufficient bytes to complete
+ /// the request. If page indexes are desired, the buffer must contain the
entire file, or
+ /// [`Self::try_parse_range()`] should be used.
+ pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
+ self.try_parse_range(reader, 0..reader.len() as usize)
+ }
+
+ /// Same as [`Self::try_parse()`], but only `file_range` bytes of the
original file are
+ /// available. `file_range.end` must point to the end of the file.
Review Comment:
I was a little confused about how `file_range` works in this case (given
that it seems to me that `ChunkReader` would in theory allow reading arbitrary
ranges)
Is the idea that `try_parse_range` limits the requests to the `reader` so
they are only within `file_range`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]