alamb commented on code in PR #6431: URL: https://github.com/apache/arrow-rs/pull/6431#discussion_r1772011081
########## parquet/src/file/metadata/reader.rs: ########## @@ -0,0 +1,954 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{io::Read, ops::Range, sync::Arc}; + +use bytes::Bytes; + +use crate::basic::ColumnOrder; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; +use crate::file::reader::ChunkReader; +use crate::file::{FOOTER_SIZE, PARQUET_MAGIC}; +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use crate::schema::types; +use crate::schema::types::SchemaDescriptor; +use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; + +#[cfg(feature = "async")] +use crate::arrow::async_reader::MetadataFetch; + +/// Reads the [`ParquetMetaData`] from a byte stream. +/// +/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of +/// the Parquet metadata. +/// +/// # Example +/// ```no_run +/// # use parquet::file::metadata::ParquetMetaDataReader; +/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } +/// // read parquet metadata including page indexes +/// let file = open_parquet_file("some_path.parquet"); +/// let mut reader = ParquetMetaDataReader::new() +/// .with_page_indexes(true); +/// reader.try_parse(&file).unwrap(); +/// let metadata = reader.finish().unwrap(); +/// assert!(metadata.column_index().is_some()); +/// assert!(metadata.offset_index().is_some()); +/// ``` +#[derive(Default)] +pub struct ParquetMetaDataReader { + metadata: Option<ParquetMetaData>, + column_index: bool, + offset_index: bool, + prefetch_hint: Option<usize>, +} + +impl ParquetMetaDataReader { + /// Create a new [`ParquetMetaDataReader`] + pub fn new() -> Self { + Default::default() + } + + /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct + /// obtained via other means. + pub fn new_with_metadata(metadata: ParquetMetaData) -> Self { + Self { + metadata: Some(metadata), + ..Default::default() + } + } + + /// Enable or disable reading the page index structures described in + /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to: + /// `self.with_column_indexes(val).with_offset_indexes(val)` + /// + /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_page_indexes(self, val: bool) -> Self { + self.with_column_indexes(val).with_offset_indexes(val) + } + + /// Enable or disable reading the Parquet [ColumnIndex] structure. + /// + /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_column_indexes(mut self, val: bool) -> Self { + self.column_index = val; + self + } + + /// Enable or disable reading the Parquet [OffsetIndex] structure. + /// + /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_offset_indexes(mut self, val: bool) -> Self { + self.offset_index = val; + self + } + + /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`]. + /// Only used for the asynchronous [`Self::try_load()`] method. + /// + /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the + /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes. + /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes + /// needed to decode the page index structures, if they have been requested. To avoid + /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed + /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and + /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result + /// in extra fetches being performed. + pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self { + self.prefetch_hint = prefetch; + self + } + + /// Indicates whether a [`ParquetMetaData`] is present. + pub fn has_metadata(&self) -> bool { + self.metadata.is_some() + } + + /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place. + pub fn finish(&mut self) -> Result<ParquetMetaData> { + self.metadata + .take() + .ok_or_else(|| general_err!("could not parse parquet metadata")) + } + + /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass. + /// + /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete Review Comment: It would be really neat if @adriangb could comment / provide an example of using this API even when we don't have the entire file (aka faking out the offsets). Definitely as a follow on ########## parquet/src/file/metadata/reader.rs: ########## @@ -0,0 +1,954 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{io::Read, ops::Range, sync::Arc}; + +use bytes::Bytes; + +use crate::basic::ColumnOrder; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; +use crate::file::reader::ChunkReader; +use crate::file::{FOOTER_SIZE, PARQUET_MAGIC}; +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use crate::schema::types; +use crate::schema::types::SchemaDescriptor; +use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; + +#[cfg(feature = "async")] +use crate::arrow::async_reader::MetadataFetch; + +/// Reads the [`ParquetMetaData`] from a byte stream. +/// +/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of +/// the Parquet metadata. +/// +/// # Example +/// ```no_run +/// # use parquet::file::metadata::ParquetMetaDataReader; +/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } +/// // read parquet metadata including page indexes +/// let file = open_parquet_file("some_path.parquet"); +/// let mut reader = ParquetMetaDataReader::new() +/// .with_page_indexes(true); +/// reader.try_parse(&file).unwrap(); +/// let metadata = reader.finish().unwrap(); +/// assert!(metadata.column_index().is_some()); +/// assert!(metadata.offset_index().is_some()); +/// ``` +#[derive(Default)] +pub struct ParquetMetaDataReader { + metadata: Option<ParquetMetaData>, + column_index: bool, + offset_index: bool, + prefetch_hint: Option<usize>, +} + +impl ParquetMetaDataReader { + /// Create a new [`ParquetMetaDataReader`] + pub fn new() -> Self { + Default::default() + } + + /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct + /// obtained via other means. + pub fn new_with_metadata(metadata: ParquetMetaData) -> Self { + Self { + metadata: Some(metadata), + ..Default::default() + } + } + + /// Enable or disable reading the page index structures described in + /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to: + /// `self.with_column_indexes(val).with_offset_indexes(val)` + /// + /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_page_indexes(self, val: bool) -> Self { + self.with_column_indexes(val).with_offset_indexes(val) + } + + /// Enable or disable reading the Parquet [ColumnIndex] structure. + /// + /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_column_indexes(mut self, val: bool) -> Self { + self.column_index = val; + self + } + + /// Enable or disable reading the Parquet [OffsetIndex] structure. + /// + /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_offset_indexes(mut self, val: bool) -> Self { + self.offset_index = val; + self + } + + /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`]. + /// Only used for the asynchronous [`Self::try_load()`] method. + /// + /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the + /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes. + /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes + /// needed to decode the page index structures, if they have been requested. To avoid + /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed + /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and + /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result + /// in extra fetches being performed. + pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self { + self.prefetch_hint = prefetch; + self + } + + /// Indicates whether a [`ParquetMetaData`] is present. + pub fn has_metadata(&self) -> bool { + self.metadata.is_some() + } + + /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place. + pub fn finish(&mut self) -> Result<ParquetMetaData> { + self.metadata + .take() + .ok_or_else(|| general_err!("could not parse parquet metadata")) + } + + /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass. + /// + /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete + /// the request, and must include the Parquet footer. If page indexes are desired, the buffer + /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. + /// + /// This call will consume `self`. + /// + /// Example + /// ```no_run + /// # use parquet::file::metadata::ParquetMetaDataReader; + /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } + /// // read parquet metadata including page indexes + /// let file = open_parquet_file("some_path.parquet"); + /// let metadata = ParquetMetaDataReader::new() + /// .with_page_indexes(true) + /// .parse(&file).unwrap(); + /// ``` + pub fn parse<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> { Review Comment: I found the fact that parse consumed self was slightly confusing given try_parse did not. Maybe we could call it `build()` or `parse_and_finish` to reflect it returns Self 🤔 ########## parquet/src/file/metadata/reader.rs: ########## @@ -0,0 +1,954 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{io::Read, ops::Range, sync::Arc}; + +use bytes::Bytes; + +use crate::basic::ColumnOrder; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; +use crate::file::reader::ChunkReader; +use crate::file::{FOOTER_SIZE, PARQUET_MAGIC}; +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use crate::schema::types; +use crate::schema::types::SchemaDescriptor; +use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; + +#[cfg(feature = "async")] +use crate::arrow::async_reader::MetadataFetch; + +/// Reads the [`ParquetMetaData`] from a byte stream. +/// +/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of +/// the Parquet metadata. +/// +/// # Example +/// ```no_run +/// # use parquet::file::metadata::ParquetMetaDataReader; +/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } +/// // read parquet metadata including page indexes +/// let file = open_parquet_file("some_path.parquet"); +/// let mut reader = ParquetMetaDataReader::new() +/// .with_page_indexes(true); +/// reader.try_parse(&file).unwrap(); +/// let metadata = reader.finish().unwrap(); +/// assert!(metadata.column_index().is_some()); +/// assert!(metadata.offset_index().is_some()); +/// ``` +#[derive(Default)] +pub struct ParquetMetaDataReader { + metadata: Option<ParquetMetaData>, + column_index: bool, + offset_index: bool, + prefetch_hint: Option<usize>, +} + +impl ParquetMetaDataReader { + /// Create a new [`ParquetMetaDataReader`] + pub fn new() -> Self { + Default::default() + } + + /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct + /// obtained via other means. + pub fn new_with_metadata(metadata: ParquetMetaData) -> Self { + Self { + metadata: Some(metadata), + ..Default::default() + } + } + + /// Enable or disable reading the page index structures described in + /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to: + /// `self.with_column_indexes(val).with_offset_indexes(val)` + /// + /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_page_indexes(self, val: bool) -> Self { + self.with_column_indexes(val).with_offset_indexes(val) + } + + /// Enable or disable reading the Parquet [ColumnIndex] structure. + /// + /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_column_indexes(mut self, val: bool) -> Self { + self.column_index = val; + self + } + + /// Enable or disable reading the Parquet [OffsetIndex] structure. + /// + /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_offset_indexes(mut self, val: bool) -> Self { + self.offset_index = val; + self + } + + /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`]. + /// Only used for the asynchronous [`Self::try_load()`] method. + /// + /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the + /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes. + /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes + /// needed to decode the page index structures, if they have been requested. To avoid + /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed + /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and + /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result + /// in extra fetches being performed. + pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self { + self.prefetch_hint = prefetch; + self + } + + /// Indicates whether a [`ParquetMetaData`] is present. + pub fn has_metadata(&self) -> bool { + self.metadata.is_some() + } + + /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place. + pub fn finish(&mut self) -> Result<ParquetMetaData> { + self.metadata + .take() + .ok_or_else(|| general_err!("could not parse parquet metadata")) + } + + /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass. + /// + /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete + /// the request, and must include the Parquet footer. If page indexes are desired, the buffer + /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. + /// + /// This call will consume `self`. + /// + /// Example + /// ```no_run + /// # use parquet::file::metadata::ParquetMetaDataReader; + /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } + /// // read parquet metadata including page indexes + /// let file = open_parquet_file("some_path.parquet"); + /// let metadata = ParquetMetaDataReader::new() + /// .with_page_indexes(true) + /// .parse(&file).unwrap(); + /// ``` + pub fn parse<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> { + self.try_parse(reader)?; + self.finish() + } + + /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`]. + /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete + /// the request, and must include the Parquet footer. If page indexes are desired, the buffer + /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. + pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> { + self.try_parse_sized(reader, reader.len() as usize) + } + + /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader` + /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary + /// when the page indexes are desired. `reader` must have access to the Parquet footer. + /// + /// Using this function also allows for retrying with a larger buffer. + /// + /// Example + /// ```no_run + /// # use parquet::file::metadata::ParquetMetaDataReader; + /// # use parquet::errors::ParquetError; + /// # use crate::parquet::file::reader::Length; + /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<usize>) -> bytes::Bytes { unimplemented!(); } + /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } + /// let file = open_parquet_file("some_path.parquet"); + /// let len = file.len() as usize; + /// let bytes = get_bytes(&file, 1000..len); + /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); + /// match reader.try_parse_sized(&bytes, len) { + /// Ok(_) => (), + /// Err(ParquetError::IndexOutOfBound(needed, _)) => { + /// let bytes = get_bytes(&file, len - needed..len); + /// reader.try_parse_sized(&bytes, len).unwrap(); + /// } + /// _ => panic!("unexpected error") + /// } + /// let metadata = reader.finish().unwrap(); + /// ``` + pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> { + self.metadata = match Self::parse_metadata(reader) { + Ok(metadata) => Some(metadata), + // FIXME: throughout this module ParquetError::IndexOutOfBound is used to indicate the + // need for more data. This is not it's intended use. The plan is to add a NeedMoreData + // value to the enum, but this would be a breaking change. This will be done as + // 54.0.0 draws nearer. + Err(ParquetError::IndexOutOfBound(needed, _)) => { + // If reader is the same length as `file_size` then presumably there is no more to + // read, so return an EOF error. + if file_size == reader.len() as usize || needed > file_size { + return Err(eof_err!( + "Parquet file too small. Size is {} but need {}", + file_size, + needed + )); + } else { + // Ask for a larger buffer + return Err(ParquetError::IndexOutOfBound(needed, file_size)); + } + } + Err(e) => return Err(e), + }; + + // we can return if page indexes aren't requested + if !self.column_index && !self.offset_index { + return Ok(()); + } + + self.read_page_indexes_sized(reader, file_size) + } + + /// Read the page index structures when a [`ParquetMetaData`] has already been obtained. + /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. + pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> { + self.read_page_indexes_sized(reader, reader.len() as usize) + } + + /// Read the page index structures when a [`ParquetMetaData`] has already been obtained. + /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is + /// a [`Bytes`] struct containing the tail of the file). + /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. + pub fn read_page_indexes_sized<R: ChunkReader>( + &mut self, + reader: &R, + file_size: usize, + ) -> Result<()> { + if self.metadata.is_none() { + return Err(general_err!("Footer metadata is not present")); Review Comment: ```suggestion return Err(general_err!("Tried to read page indexes without ParquetMetaData metadata")); ``` ########## parquet/src/file/metadata/reader.rs: ########## @@ -0,0 +1,954 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{io::Read, ops::Range, sync::Arc}; + +use bytes::Bytes; + +use crate::basic::ColumnOrder; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; +use crate::file::reader::ChunkReader; +use crate::file::{FOOTER_SIZE, PARQUET_MAGIC}; +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use crate::schema::types; +use crate::schema::types::SchemaDescriptor; +use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; + +#[cfg(feature = "async")] +use crate::arrow::async_reader::MetadataFetch; + +/// Reads the [`ParquetMetaData`] from a byte stream. +/// +/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of +/// the Parquet metadata. +/// +/// # Example +/// ```no_run +/// # use parquet::file::metadata::ParquetMetaDataReader; +/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } +/// // read parquet metadata including page indexes +/// let file = open_parquet_file("some_path.parquet"); +/// let mut reader = ParquetMetaDataReader::new() +/// .with_page_indexes(true); +/// reader.try_parse(&file).unwrap(); +/// let metadata = reader.finish().unwrap(); +/// assert!(metadata.column_index().is_some()); +/// assert!(metadata.offset_index().is_some()); +/// ``` +#[derive(Default)] +pub struct ParquetMetaDataReader { + metadata: Option<ParquetMetaData>, + column_index: bool, + offset_index: bool, + prefetch_hint: Option<usize>, +} + +impl ParquetMetaDataReader { + /// Create a new [`ParquetMetaDataReader`] + pub fn new() -> Self { + Default::default() + } + + /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct + /// obtained via other means. + pub fn new_with_metadata(metadata: ParquetMetaData) -> Self { + Self { + metadata: Some(metadata), + ..Default::default() + } + } + + /// Enable or disable reading the page index structures described in + /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to: + /// `self.with_column_indexes(val).with_offset_indexes(val)` + /// + /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_page_indexes(self, val: bool) -> Self { + self.with_column_indexes(val).with_offset_indexes(val) + } + + /// Enable or disable reading the Parquet [ColumnIndex] structure. + /// + /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_column_indexes(mut self, val: bool) -> Self { + self.column_index = val; + self + } + + /// Enable or disable reading the Parquet [OffsetIndex] structure. + /// + /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_offset_indexes(mut self, val: bool) -> Self { + self.offset_index = val; + self + } + + /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`]. + /// Only used for the asynchronous [`Self::try_load()`] method. + /// + /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the + /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes. + /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes + /// needed to decode the page index structures, if they have been requested. To avoid + /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed + /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and + /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result + /// in extra fetches being performed. + pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self { + self.prefetch_hint = prefetch; + self + } + + /// Indicates whether a [`ParquetMetaData`] is present. + pub fn has_metadata(&self) -> bool { + self.metadata.is_some() + } + + /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place. + pub fn finish(&mut self) -> Result<ParquetMetaData> { + self.metadata + .take() + .ok_or_else(|| general_err!("could not parse parquet metadata")) + } + + /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass. + /// + /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete + /// the request, and must include the Parquet footer. If page indexes are desired, the buffer + /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. + /// + /// This call will consume `self`. + /// + /// Example + /// ```no_run + /// # use parquet::file::metadata::ParquetMetaDataReader; + /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } + /// // read parquet metadata including page indexes + /// let file = open_parquet_file("some_path.parquet"); + /// let metadata = ParquetMetaDataReader::new() + /// .with_page_indexes(true) + /// .parse(&file).unwrap(); + /// ``` + pub fn parse<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> { + self.try_parse(reader)?; + self.finish() + } + + /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`]. + /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete + /// the request, and must include the Parquet footer. If page indexes are desired, the buffer + /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. + pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> { + self.try_parse_sized(reader, reader.len() as usize) + } + + /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader` + /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary + /// when the page indexes are desired. `reader` must have access to the Parquet footer. + /// + /// Using this function also allows for retrying with a larger buffer. Review Comment: I think it would help here to document what errors are returned (specifically how "needs more buffer" is communicated) I see it is partly covered in the example ########## parquet/src/file/metadata/reader.rs: ########## @@ -0,0 +1,954 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{io::Read, ops::Range, sync::Arc}; + +use bytes::Bytes; + +use crate::basic::ColumnOrder; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; +use crate::file::reader::ChunkReader; +use crate::file::{FOOTER_SIZE, PARQUET_MAGIC}; +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use crate::schema::types; +use crate::schema::types::SchemaDescriptor; +use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; + +#[cfg(feature = "async")] +use crate::arrow::async_reader::MetadataFetch; + +/// Reads the [`ParquetMetaData`] from a byte stream. +/// +/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of +/// the Parquet metadata. +/// +/// # Example +/// ```no_run +/// # use parquet::file::metadata::ParquetMetaDataReader; +/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } +/// // read parquet metadata including page indexes +/// let file = open_parquet_file("some_path.parquet"); +/// let mut reader = ParquetMetaDataReader::new() +/// .with_page_indexes(true); +/// reader.try_parse(&file).unwrap(); +/// let metadata = reader.finish().unwrap(); +/// assert!(metadata.column_index().is_some()); +/// assert!(metadata.offset_index().is_some()); +/// ``` +#[derive(Default)] +pub struct ParquetMetaDataReader { + metadata: Option<ParquetMetaData>, + column_index: bool, + offset_index: bool, + prefetch_hint: Option<usize>, +} + +impl ParquetMetaDataReader { + /// Create a new [`ParquetMetaDataReader`] + pub fn new() -> Self { + Default::default() + } + + /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct + /// obtained via other means. + pub fn new_with_metadata(metadata: ParquetMetaData) -> Self { + Self { + metadata: Some(metadata), + ..Default::default() + } + } + + /// Enable or disable reading the page index structures described in + /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to: + /// `self.with_column_indexes(val).with_offset_indexes(val)` + /// + /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_page_indexes(self, val: bool) -> Self { + self.with_column_indexes(val).with_offset_indexes(val) + } + + /// Enable or disable reading the Parquet [ColumnIndex] structure. + /// + /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_column_indexes(mut self, val: bool) -> Self { + self.column_index = val; + self + } + + /// Enable or disable reading the Parquet [OffsetIndex] structure. + /// + /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_offset_indexes(mut self, val: bool) -> Self { + self.offset_index = val; + self + } + + /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`]. + /// Only used for the asynchronous [`Self::try_load()`] method. + /// + /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the + /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes. + /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes + /// needed to decode the page index structures, if they have been requested. To avoid + /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed + /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and + /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result + /// in extra fetches being performed. + pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self { + self.prefetch_hint = prefetch; + self + } + + /// Indicates whether a [`ParquetMetaData`] is present. + pub fn has_metadata(&self) -> bool { + self.metadata.is_some() + } + + /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place. + pub fn finish(&mut self) -> Result<ParquetMetaData> { + self.metadata + .take() + .ok_or_else(|| general_err!("could not parse parquet metadata")) + } + + /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass. + /// + /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete + /// the request, and must include the Parquet footer. If page indexes are desired, the buffer + /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. + /// + /// This call will consume `self`. + /// + /// Example + /// ```no_run + /// # use parquet::file::metadata::ParquetMetaDataReader; + /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } + /// // read parquet metadata including page indexes + /// let file = open_parquet_file("some_path.parquet"); + /// let metadata = ParquetMetaDataReader::new() + /// .with_page_indexes(true) + /// .parse(&file).unwrap(); + /// ``` + pub fn parse<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> { + self.try_parse(reader)?; + self.finish() + } + + /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`]. + /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete Review Comment: Adding whitespace makes it render more nicely in cargo docs ```suggestion /// /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete ``` ########## parquet/src/file/metadata/reader.rs: ########## @@ -0,0 +1,954 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{io::Read, ops::Range, sync::Arc}; + +use bytes::Bytes; + +use crate::basic::ColumnOrder; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; +use crate::file::reader::ChunkReader; +use crate::file::{FOOTER_SIZE, PARQUET_MAGIC}; +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use crate::schema::types; +use crate::schema::types::SchemaDescriptor; +use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; + +#[cfg(feature = "async")] +use crate::arrow::async_reader::MetadataFetch; + +/// Reads the [`ParquetMetaData`] from a byte stream. +/// +/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of +/// the Parquet metadata. +/// +/// # Example +/// ```no_run +/// # use parquet::file::metadata::ParquetMetaDataReader; +/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } +/// // read parquet metadata including page indexes +/// let file = open_parquet_file("some_path.parquet"); +/// let mut reader = ParquetMetaDataReader::new() +/// .with_page_indexes(true); +/// reader.try_parse(&file).unwrap(); +/// let metadata = reader.finish().unwrap(); +/// assert!(metadata.column_index().is_some()); +/// assert!(metadata.offset_index().is_some()); +/// ``` +#[derive(Default)] +pub struct ParquetMetaDataReader { + metadata: Option<ParquetMetaData>, + column_index: bool, + offset_index: bool, + prefetch_hint: Option<usize>, +} + +impl ParquetMetaDataReader { + /// Create a new [`ParquetMetaDataReader`] + pub fn new() -> Self { + Default::default() + } + + /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct + /// obtained via other means. + pub fn new_with_metadata(metadata: ParquetMetaData) -> Self { + Self { + metadata: Some(metadata), + ..Default::default() + } + } + + /// Enable or disable reading the page index structures described in + /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to: + /// `self.with_column_indexes(val).with_offset_indexes(val)` + /// + /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_page_indexes(self, val: bool) -> Self { + self.with_column_indexes(val).with_offset_indexes(val) + } + + /// Enable or disable reading the Parquet [ColumnIndex] structure. + /// + /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_column_indexes(mut self, val: bool) -> Self { + self.column_index = val; + self + } + + /// Enable or disable reading the Parquet [OffsetIndex] structure. + /// + /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_offset_indexes(mut self, val: bool) -> Self { + self.offset_index = val; + self + } + + /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`]. + /// Only used for the asynchronous [`Self::try_load()`] method. + /// + /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the + /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes. + /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes + /// needed to decode the page index structures, if they have been requested. To avoid + /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed + /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and + /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result + /// in extra fetches being performed. + pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self { + self.prefetch_hint = prefetch; + self + } + + /// Indicates whether a [`ParquetMetaData`] is present. Review Comment: ```suggestion /// Indicates whether this reader has a [`ParquetMetaData`] internally. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
