etseidl commented on code in PR #8080: URL: https://github.com/apache/arrow-rs/pull/8080#discussion_r2277889858
########## parquet/src/file/metadata/push_decoder.rs: ########## @@ -0,0 +1,553 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use crate::DecodeResult; + +/// A push decoder for [`ParquetMetaData`]. +/// +/// This structure implements a push API based version of the [`ParquetMetaDataReader`], which +/// decouples the IO from the metadata decoding logic. +/// +/// You can use this decoder to customize your IO operations, as shown in the +/// examples below for minimizing bytes read, prefetching data, or +/// using async IO. +/// +/// # Example +/// +/// The most basic usage is to feed the decoder with the necessary byte ranges +/// as requested as shown below. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> { +/// # let file_bytes = { +/// # let mut buffer = vec![0]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # // mimic IO by returning a function that returns the bytes for a given range +/// # let get_range = |range: &Range<u64>| -> Bytes { +/// # let start = range.start as usize; +/// # let end = range.end as usize; +/// # file_bytes.slice(start..end) +/// # }; +/// # +/// # let file_len = file_bytes.len() as u64; +/// // The `ParquetMetaDataPushDecoder` needs to know the file length. +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// // try to decode the metadata. If more data is needed, the decoder will tell you what ranges +/// loop { +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful +/// Ok(DecodeResult::NeedsData(ranges)) => { +/// // The decoder needs more data +/// // +/// // In this example, we call a function that returns the bytes for each given range. +/// // In a real application, you would likely read the data from a file or network. +/// let data = ranges.iter().map(|range| get_range(range)).collect(); +/// // Push the data into the decoder and try to decode again on the next iteration. +/// decoder.push_ranges(ranges, data).unwrap(); +/// } +/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") } +/// Err(e) => return Err(e), +/// } +/// } +/// # } +/// ``` +/// +/// # Example with "prefetching" +/// +/// By default, the [`ParquetMetaDataPushDecoder`] will requests only the exact byte +/// ranges it needs. This minimizes the number of bytes read, however it +/// requires at least two IO operations to read the metadata - one to read the +/// footer and then one to read the metadata. +/// +/// If the file has a "Page Index" (see [Self::with_page_indexes]), three +/// IO operations are required to read the metadata, as the page index is +/// not part of the normal metadata footer. +/// +/// To reduce the number of IO operations in systems with high per operation +/// overhead (e.g. cloud storage), you can "prefetch" the data and then push +/// the data into the decoder before calling [`Self::try_decode`]. If you do +/// not push enough bytes, the decoder will return the ranges that are still +/// needed. +/// +/// This approach can also be used when you the entire file already in memory +/// for other reasons. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> { +/// # let file_bytes = { +/// # let mut buffer = vec![0]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # +/// let file_len = file_bytes.len() as u64; +/// // for this example, we "prefetch" all the bytes which we have in memory, +/// // but in a real application, you would likely read a chunk from the end +/// // for example 1MB. +/// let prefetched_bytes = file_bytes.clone(); +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// // push the prefetched bytes into the decoder +/// decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap(); +/// // The decoder will now be able to decode the metadata. Note in a real application, +/// // unless you can guarantee that the pushed data is enough to decode the metadata, +/// // you still need to call `try_decode` in a loop until it returns `DecodeResult::Data` +/// // as shown in the previous example +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful] +/// other @ _ => { panic!("expected DecodeResult::Data, got: {other:?}") } +/// } +/// # } +/// ``` +/// +/// # Example using [`AsyncRead`] +/// +/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source that can +/// provide byte ranges, including async IO sources. However, it does not +/// implement async IO itself. To use async IO, you simply write an async +/// wrapper around it that reads the required byte ranges and pushes them into the +/// decoder. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// // this function decodes Parquet Metadata from anything that implements +/// // [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File +/// async fn decode_metadata( +/// file_len: u64, +/// mut async_source: impl AsyncRead + AsyncSeek + Unpin +/// ) -> Result<ParquetMetaData, ParquetError> { +/// // We need a ParquetMetaDataPushDecoder to decode the metadata. +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// loop { +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful +/// Ok(DecodeResult::NeedsData(ranges)) => { +/// // The decoder needs more data +/// // +/// // In this example we use the AsyncRead and AsyncSeek traits to read the +/// // required ranges from the async source. +/// let mut data = Vec::with_capacity(ranges.len()); +/// for range in &ranges { +/// let mut buffer = vec![0; (range.end - range.start) as usize]; +/// async_source.seek(std::io::SeekFrom::Start(range.start)).await?; +/// async_source.read_exact(&mut buffer).await?; +/// data.push(Bytes::from(buffer)); +/// } +/// // Push the data into the decoder and try to decode again on the next iteration. +/// decoder.push_ranges(ranges, data).unwrap(); +/// } +/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") } +/// Err(e) => return Err(e), +/// } +/// } +/// } +/// ``` +/// [`AsyncRead`]: tokio::io::AsyncRead +#[derive(Debug)] +pub struct ParquetMetaDataPushDecoder { + done: bool, + metadata_reader: ParquetMetaDataReader, Review Comment: I wholeheartedly endorse this idea. 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org