alamb commented on code in PR #8080: URL: https://github.com/apache/arrow-rs/pull/8080#discussion_r2260409228
########## parquet/src/errors.rs: ########## @@ -52,6 +52,9 @@ pub enum ParquetError { /// Returned when a function needs more data to complete properly. The `usize` field indicates /// the total number of bytes required, not the number of additional bytes. NeedMoreData(usize), + /// Returned when a function needs more data to complete properly. Review Comment: This is the one part of this PR I am not sure about Since the `ParquetError` is (now) marked as `#[non_exhaustive]` I don't think this is technically a breaking change. However, it would be really nice to avoid a new enum -- I will comment about this later in the PR ########## parquet/src/util/push_buffers.rs: ########## @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; +use crate::file::reader::{ChunkReader, Length}; +use bytes::Bytes; +use std::fmt::Display; +use std::ops::Range; + +/// Holds multiple buffers of data that have been requested by the ParquetDecoder +/// +/// This is the in-memory buffer for the ParquetDecoder +/// +/// Features it has: +/// 1. Zero copy as much as possible +/// 2. Keeps non contiguous ranges of bytes +#[derive(Debug, Clone)] +pub(crate) struct PushBuffers { + /// the virtual "offset" of this buffers (added to any request) + offset: u64, + /// The total length of the file being decoded + file_len: u64, + /// The ranges of data that are available for decoding (not adjusted for offset) + ranges: Vec<Range<u64>>, + /// The buffers of data that can be used to decode the Parquet file + buffers: Vec<Bytes>, +} + +impl Display for PushBuffers { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "Buffers (offset: {}, file_len: {})", + self.offset, self.file_len + )?; + writeln!(f, "Available Ranges (w/ offset):")?; + for range in &self.ranges { + writeln!( + f, + " {}..{} ({}..{}): {} bytes", + range.start, + range.end, + range.start + self.offset, + range.end + self.offset, + range.end - range.start + )?; + } + + Ok(()) + } +} + +impl PushBuffers { + /// Create a new Buffers instance with the given file length + pub fn new(file_len: u64) -> Self { + Self { + offset: 0, + file_len, + ranges: Vec::new(), + buffers: Vec::new(), + } + } + + /// Push all the ranges and buffers + pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) { + assert_eq!( + ranges.len(), + buffers.len(), + "Number of ranges must match number of buffers" + ); + for (range, buffer) in ranges.into_iter().zip(buffers.into_iter()) { + self.push_range(range, buffer); + } + } + + /// Push a new range and its associated buffer + pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) { + assert_eq!( + (range.end - range.start) as usize, + buffer.len(), + "Range length must match buffer length" + ); + self.ranges.push(range); + self.buffers.push(buffer); + } + + /// Returns true if the Buffers contains data for the given range + pub fn has_range(&self, range: &Range<u64>) -> bool { + self.ranges + .iter() + .any(|r| r.start <= range.start && r.end >= range.end) + } + + fn iter(&self) -> impl Iterator<Item = (&Range<u64>, &Bytes)> { + self.ranges.iter().zip(self.buffers.iter()) + } + + /// return the file length of the Parquet file being read + pub fn file_len(&self) -> u64 { + self.file_len + } + + /// Specify a new offset + pub fn with_offset(mut self, offset: u64) -> Self { + self.offset = offset; + self + } +} + +impl Length for PushBuffers { + fn len(&self) -> u64 { + self.file_len + } +} + +/// less efficinet implementation of Read for Buffers +impl std::io::Read for PushBuffers { Review Comment: This std::io::Read impl is needed so the current thrift decoder can read bytes I suspect when @etseidl is done with his custom thrift decoder we can remove this impl - https://github.com/apache/arrow-rs/issues/5854 ########## parquet/src/file/metadata/push_decoder.rs: ########## @@ -0,0 +1,553 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use crate::DecodeResult; + +/// A push decoder for [`ParquetMetaData`]. +/// +/// This structure implements a push API based version of the [`ParquetMetaDataReader`], which +/// decouples the IO from the metadata decoding logic. +/// +/// You can use this decoder to customize your IO operations, as shown in the +/// examples below for minimizing bytes read, prefetching data, or +/// using async IO. +/// +/// # Example +/// +/// The most basic usage is to feed the decoder with the necessary byte ranges +/// as requested as shown below. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> { +/// # let file_bytes = { +/// # let mut buffer = vec![0]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # // mimic IO by returning a function that returns the bytes for a given range +/// # let get_range = |range: &Range<u64>| -> Bytes { +/// # let start = range.start as usize; +/// # let end = range.end as usize; +/// # file_bytes.slice(start..end) +/// # }; +/// # +/// # let file_len = file_bytes.len() as u64; +/// // The `ParquetMetaDataPushDecoder` needs to know the file length. +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// // try to decode the metadata. If more data is needed, the decoder will tell you what ranges +/// loop { +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful +/// Ok(DecodeResult::NeedsData(ranges)) => { +/// // The decoder needs more data +/// // +/// // In this example, we call a function that returns the bytes for each given range. +/// // In a real application, you would likely read the data from a file or network. +/// let data = ranges.iter().map(|range| get_range(range)).collect(); +/// // Push the data into the decoder and try to decode again on the next iteration. +/// decoder.push_ranges(ranges, data).unwrap(); +/// } +/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") } +/// Err(e) => return Err(e), +/// } +/// } +/// # } +/// ``` +/// +/// # Example with "prefetching" +/// +/// By default, the [`ParquetMetaDataPushDecoder`] will requests only the exact byte +/// ranges it needs. This minimizes the number of bytes read, however it +/// requires at least two IO operations to read the metadata - one to read the +/// footer and then one to read the metadata. +/// +/// If the file has a "Page Index" (see [Self::with_page_indexes]), three +/// IO operations are required to read the metadata, as the page index is +/// not part of the normal metadata footer. +/// +/// To reduce the number of IO operations in systems with high per operation +/// overhead (e.g. cloud storage), you can "prefetch" the data and then push +/// the data into the decoder before calling [`Self::try_decode`]. If you do +/// not push enough bytes, the decoder will return the ranges that are still +/// needed. +/// +/// This approach can also be used when you the entire file already in memory +/// for other reasons. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> { +/// # let file_bytes = { +/// # let mut buffer = vec![0]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # +/// let file_len = file_bytes.len() as u64; +/// // for this example, we "prefetch" all the bytes which we have in memory, +/// // but in a real application, you would likely read a chunk from the end +/// // for example 1MB. +/// let prefetched_bytes = file_bytes.clone(); +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// // push the prefetched bytes into the decoder +/// decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap(); +/// // The decoder will now be able to decode the metadata. Note in a real application, +/// // unless you can guarantee that the pushed data is enough to decode the metadata, +/// // you still need to call `try_decode` in a loop until it returns `DecodeResult::Data` +/// // as shown in the previous example +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful] +/// other @ _ => { panic!("expected DecodeResult::Data, got: {other:?}") } +/// } +/// # } +/// ``` +/// +/// # Example using [`AsyncRead`] +/// +/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source that can +/// provide byte ranges, including async IO sources. However, it does not +/// implement async IO itself. To use async IO, you simply write an async +/// wrapper around it that reads the required byte ranges and pushes them into the +/// decoder. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// // this function decodes Parquet Metadata from anything that implements +/// // [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File +/// async fn decode_metadata( +/// file_len: u64, +/// mut async_source: impl AsyncRead + AsyncSeek + Unpin +/// ) -> Result<ParquetMetaData, ParquetError> { +/// // We need a ParquetMetaDataPushDecoder to decode the metadata. +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// loop { +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful +/// Ok(DecodeResult::NeedsData(ranges)) => { +/// // The decoder needs more data +/// // +/// // In this example we use the AsyncRead and AsyncSeek traits to read the +/// // required ranges from the async source. +/// let mut data = Vec::with_capacity(ranges.len()); +/// for range in &ranges { +/// let mut buffer = vec![0; (range.end - range.start) as usize]; +/// async_source.seek(std::io::SeekFrom::Start(range.start)).await?; +/// async_source.read_exact(&mut buffer).await?; +/// data.push(Bytes::from(buffer)); +/// } +/// // Push the data into the decoder and try to decode again on the next iteration. +/// decoder.push_ranges(ranges, data).unwrap(); +/// } +/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") } +/// Err(e) => return Err(e), +/// } +/// } +/// } +/// ``` +/// [`AsyncRead`]: tokio::io::AsyncRead +#[derive(Debug)] +pub struct ParquetMetaDataPushDecoder { + done: bool, + metadata_reader: ParquetMetaDataReader, + buffers: crate::util::push_buffers::PushBuffers, +} + +impl ParquetMetaDataPushDecoder { + /// Create a new `ParquetMetaDataPushDecoder` with the given file length. + /// + /// By default, this will read page indexes and column indexes. See + /// [`ParquetMetaDataPushDecoder::with_page_indexes`] for more detail. + /// + /// See examples on [`ParquetMetaDataPushDecoder`]. + pub fn try_new(file_len: u64) -> std::result::Result<Self, ParquetError> { + if file_len < 8 { + return Err(ParquetError::General(format!( + "Parquet files are at least 8 bytes long, but file length is {file_len}" + ))); + }; + + let metadata_reader = ParquetMetaDataReader::new().with_page_indexes(true); + + Ok(Self { + done: false, + metadata_reader, + buffers: crate::util::push_buffers::PushBuffers::new(file_len), + }) + } + + /// Enable or disable reading the page index structures described in + /// "[Parquet page index]: Layout to Support Page Skipping". + /// + /// Defaults to `true` + /// + /// This requires + /// 1. The Parquet file to have been written with page indexes + /// 2. Additional data to be pushed into the decoder (as the page indexes are not part of the thrift footer) + /// + /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_page_indexes(mut self, val: bool) -> Self { + self.metadata_reader = self.metadata_reader.with_page_indexes(val); + self + } + + /// Push the data into the decoder's buffer. + /// + /// The decoder does not immediately attempt to decode the metadata + /// after pushing data. Instead, it accumulates the pushed data until you + /// call [`Self::try_decode`]. + /// + /// # Determining required data: + /// + /// To determine what ranges are required to decode the metadata, you can + /// either: + /// + /// 1. Call [`Self::try_decode`] first to get the exact ranges required (see + /// example on [`Self`]) + /// + /// 2. Speculatively push any data that you have available, which may + /// include more than the footer data or requested bytes. + /// + /// Speculatively pushing data can be used when "prefetching" data. See + /// example on [`Self`] + pub fn push_ranges( + &mut self, + ranges: Vec<std::ops::Range<u64>>, + buffers: Vec<bytes::Bytes>, + ) -> std::result::Result<(), String> { + if self.done { + return Err( + "ParquetMetaDataPushDecoder: cannot push data after decoding is finished" + .to_string(), + ); + } + self.buffers.push_ranges(ranges, buffers); + Ok(()) + } + + /// Try to decode the metadata from the pushed data, returning the + /// decoded metadata or an error if not enough data is available. + pub fn try_decode( + &mut self, + ) -> std::result::Result<DecodeResult<ParquetMetaData>, ParquetError> { + if self.done { + return Ok(DecodeResult::Finished); + } + + // need to have the last 8 bytes of the file to decode the metadata + let file_len = self.buffers.file_len(); + if !self.buffers.has_range(&(file_len - 8..file_len)) { + #[expect(clippy::single_range_in_vec_init)] + return Ok(DecodeResult::NeedsData(vec![file_len - 8..file_len])); + } + + // Try to parse the metadata from the buffers we have. + // If we don't have enough data, it will return a `ParquetError::NeedMoreData` + // with the number of bytes needed to complete the metadata parsing. + // If we have enough data, it will return `Ok(())` and we can + let maybe_metadata = self + .metadata_reader + .try_parse_sized(&self.buffers, self.buffers.file_len()); + + match maybe_metadata { + Ok(()) => { + // Metadata successfully parsed, proceed to decode the row groups + let metadata = self.metadata_reader.finish()?; + self.done = true; + Ok(DecodeResult::Data(metadata)) + } + + Err(ParquetError::NeedMoreData(needed)) => { + let needed = needed as u64; + let Some(start_offset) = file_len.checked_sub(needed) else { + return Err(ParquetError::General(format!( + "Parquet metadata reader needs at least {needed} bytes, but file length is only {file_len}" + ))); + }; + let needed_range = start_offset..start_offset + needed; + // needs `needed_range` bytes at the end of the file + Ok(DecodeResult::NeedsData(vec![needed_range])) + } + Err(ParquetError::NeedMoreDataRange(range)) => Ok(DecodeResult::NeedsData(vec![range])), + + Err(e) => Err(e), // some other error, pass back + } + } +} + +// These tests use the arrow writer to create a parquet file in memory +// so they need the arrow feature and the test feature +#[cfg(all(test, feature = "arrow"))] +mod tests { + use super::*; + use crate::arrow::ArrowWriter; + use crate::file::properties::WriterProperties; + use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray}; + use bytes::Bytes; + use std::fmt::Debug; + use std::ops::Range; + use std::sync::{Arc, LazyLock}; + + /// It is possible to decode the metadata from the entire file at once before being asked Review Comment: I am also quite pleased with these tests as I think it makes clear what "IO" is happening compared to different operations ########## parquet/src/lib.rs: ########## @@ -179,3 +181,18 @@ pub mod record; pub mod schema; pub mod thrift; + +/// What data is needed to read the next item from a decoder. Review Comment: This is a new public API for returning results or requesting more data. It is also used in the parquet push decoder -- you can see it all wired up here: https://github.com/apache/arrow-rs/pull/7997 ########## parquet/src/file/metadata/push_decoder.rs: ########## @@ -0,0 +1,553 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use crate::DecodeResult; + +/// A push decoder for [`ParquetMetaData`]. Review Comment: I am quite pleased with this API and I think the examples show the main IO patterns that we would want: 1. Feed exactly the byte ranges needed 2. "prefetch" a bunch of the data to avoid multiple IOs 3. Read via the standard library `AsyncRead` trait, which has been asked for several times. ########## parquet/src/file/metadata/push_decoder.rs: ########## @@ -0,0 +1,553 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use crate::DecodeResult; + +/// A push decoder for [`ParquetMetaData`]. +/// +/// This structure implements a push API based version of the [`ParquetMetaDataReader`], which +/// decouples the IO from the metadata decoding logic. +/// +/// You can use this decoder to customize your IO operations, as shown in the +/// examples below for minimizing bytes read, prefetching data, or +/// using async IO. +/// +/// # Example +/// +/// The most basic usage is to feed the decoder with the necessary byte ranges +/// as requested as shown below. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> { +/// # let file_bytes = { +/// # let mut buffer = vec![0]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # // mimic IO by returning a function that returns the bytes for a given range +/// # let get_range = |range: &Range<u64>| -> Bytes { +/// # let start = range.start as usize; +/// # let end = range.end as usize; +/// # file_bytes.slice(start..end) +/// # }; +/// # +/// # let file_len = file_bytes.len() as u64; +/// // The `ParquetMetaDataPushDecoder` needs to know the file length. +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// // try to decode the metadata. If more data is needed, the decoder will tell you what ranges +/// loop { Review Comment: I am probably too exuberant, but I am really pleased with this API and how it works ########## parquet/src/util/push_buffers.rs: ########## @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; +use crate::file::reader::{ChunkReader, Length}; +use bytes::Bytes; +use std::fmt::Display; +use std::ops::Range; + +/// Holds multiple buffers of data that have been requested by the ParquetDecoder Review Comment: This is the in memory buffer that holds the data needed for decoding. It is also used by the parquet push decoder in https://github.com/apache/arrow-rs/pull/7997 ########## parquet/src/file/metadata/push_decoder.rs: ########## @@ -0,0 +1,553 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use crate::DecodeResult; + +/// A push decoder for [`ParquetMetaData`]. +/// +/// This structure implements a push API based version of the [`ParquetMetaDataReader`], which +/// decouples the IO from the metadata decoding logic. +/// +/// You can use this decoder to customize your IO operations, as shown in the +/// examples below for minimizing bytes read, prefetching data, or +/// using async IO. +/// +/// # Example +/// +/// The most basic usage is to feed the decoder with the necessary byte ranges +/// as requested as shown below. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> { +/// # let file_bytes = { +/// # let mut buffer = vec![0]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # // mimic IO by returning a function that returns the bytes for a given range +/// # let get_range = |range: &Range<u64>| -> Bytes { +/// # let start = range.start as usize; +/// # let end = range.end as usize; +/// # file_bytes.slice(start..end) +/// # }; +/// # +/// # let file_len = file_bytes.len() as u64; +/// // The `ParquetMetaDataPushDecoder` needs to know the file length. +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// // try to decode the metadata. If more data is needed, the decoder will tell you what ranges +/// loop { +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful +/// Ok(DecodeResult::NeedsData(ranges)) => { +/// // The decoder needs more data +/// // +/// // In this example, we call a function that returns the bytes for each given range. +/// // In a real application, you would likely read the data from a file or network. +/// let data = ranges.iter().map(|range| get_range(range)).collect(); +/// // Push the data into the decoder and try to decode again on the next iteration. +/// decoder.push_ranges(ranges, data).unwrap(); +/// } +/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") } +/// Err(e) => return Err(e), +/// } +/// } +/// # } +/// ``` +/// +/// # Example with "prefetching" +/// +/// By default, the [`ParquetMetaDataPushDecoder`] will requests only the exact byte +/// ranges it needs. This minimizes the number of bytes read, however it +/// requires at least two IO operations to read the metadata - one to read the +/// footer and then one to read the metadata. +/// +/// If the file has a "Page Index" (see [Self::with_page_indexes]), three +/// IO operations are required to read the metadata, as the page index is +/// not part of the normal metadata footer. +/// +/// To reduce the number of IO operations in systems with high per operation +/// overhead (e.g. cloud storage), you can "prefetch" the data and then push +/// the data into the decoder before calling [`Self::try_decode`]. If you do +/// not push enough bytes, the decoder will return the ranges that are still +/// needed. +/// +/// This approach can also be used when you the entire file already in memory +/// for other reasons. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> { +/// # let file_bytes = { +/// # let mut buffer = vec![0]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # +/// let file_len = file_bytes.len() as u64; +/// // for this example, we "prefetch" all the bytes which we have in memory, +/// // but in a real application, you would likely read a chunk from the end +/// // for example 1MB. +/// let prefetched_bytes = file_bytes.clone(); +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// // push the prefetched bytes into the decoder +/// decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap(); +/// // The decoder will now be able to decode the metadata. Note in a real application, +/// // unless you can guarantee that the pushed data is enough to decode the metadata, +/// // you still need to call `try_decode` in a loop until it returns `DecodeResult::Data` +/// // as shown in the previous example +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful] +/// other @ _ => { panic!("expected DecodeResult::Data, got: {other:?}") } +/// } +/// # } +/// ``` +/// +/// # Example using [`AsyncRead`] +/// +/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source that can +/// provide byte ranges, including async IO sources. However, it does not +/// implement async IO itself. To use async IO, you simply write an async +/// wrapper around it that reads the required byte ranges and pushes them into the +/// decoder. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// // this function decodes Parquet Metadata from anything that implements +/// // [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File +/// async fn decode_metadata( +/// file_len: u64, +/// mut async_source: impl AsyncRead + AsyncSeek + Unpin +/// ) -> Result<ParquetMetaData, ParquetError> { +/// // We need a ParquetMetaDataPushDecoder to decode the metadata. +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// loop { +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful +/// Ok(DecodeResult::NeedsData(ranges)) => { +/// // The decoder needs more data +/// // +/// // In this example we use the AsyncRead and AsyncSeek traits to read the +/// // required ranges from the async source. +/// let mut data = Vec::with_capacity(ranges.len()); +/// for range in &ranges { +/// let mut buffer = vec![0; (range.end - range.start) as usize]; +/// async_source.seek(std::io::SeekFrom::Start(range.start)).await?; +/// async_source.read_exact(&mut buffer).await?; +/// data.push(Bytes::from(buffer)); +/// } +/// // Push the data into the decoder and try to decode again on the next iteration. +/// decoder.push_ranges(ranges, data).unwrap(); +/// } +/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") } +/// Err(e) => return Err(e), +/// } +/// } +/// } +/// ``` +/// [`AsyncRead`]: tokio::io::AsyncRead +#[derive(Debug)] +pub struct ParquetMetaDataPushDecoder { + done: bool, + metadata_reader: ParquetMetaDataReader, Review Comment: Internally the current implementation just calls out to the existing ParquetMetaDataReader However, I think long term it would make more sense to reverse the logic, and have the decoding machinery to live in this decoder, refactor `ParquetMetaDataReader` to use the push decoder. Given this PR is already quite large, I figured this would be a good follow on to do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org