etseidl commented on code in PR #8080:
URL: https://github.com/apache/arrow-rs/pull/8080#discussion_r2289390515


##########
parquet/src/file/metadata/push_decoder.rs:
##########
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::ParquetError;
+use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, 
ParquetMetaDataReader};
+use crate::DecodeResult;
+
+/// A push decoder for [`ParquetMetaData`].
+///
+/// This structure implements a push API based version of the 
[`ParquetMetaDataReader`], which
+/// decouples the IO from the metadata decoding logic.
+///
+/// You can use this decoder to customize your IO operations, as shown in the
+/// examples below for minimizing bytes read, prefetching data, or
+/// using async IO.
+///
+/// # Example
+///
+/// The most basic usage is to feed the decoder with the necessary byte ranges
+/// as requested as shown below.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// #   let mut buffer = vec![0];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// #
+/// # let file_len = file_bytes.len() as u64;
+/// // The `ParquetMetaDataPushDecoder` needs to know the file length.
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// // try to decode the metadata. If more data is needed, the decoder will 
tell you what ranges
+/// loop {
+///     match decoder.try_decode() {
+///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // 
decode successful
+///        Ok(DecodeResult::NeedsData(ranges)) => {
+///           // The decoder needs more data
+///           //
+///           // In this example, we call a function that returns the bytes 
for each given range.
+///           // In a real application, you would likely read the data from a 
file or network.
+///           let data = ranges.iter().map(|range| get_range(range)).collect();
+///           // Push the data into the decoder and try to decode again on the 
next iteration.
+///           decoder.push_ranges(ranges, data).unwrap();
+///        }
+///        Ok(DecodeResult::Finished) => { unreachable!("returned metadata in 
previous match arm") }
+///        Err(e) => return Err(e),
+///     }
+/// }
+/// # }
+/// ```
+///
+/// # Example with "prefetching"
+///
+/// By default, the [`ParquetMetaDataPushDecoder`] will requests only the 
exact byte
+/// ranges it needs. This minimizes the number of bytes read, however it
+/// requires at least two IO operations to read the metadata - one to read the
+/// footer and then one to read the metadata.
+///
+/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
+/// IO operations are required to read the metadata, as the page index is
+/// not part of the normal metadata footer.
+///
+/// To reduce the number of IO operations in systems with high per operation
+/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
+/// the data into the decoder before calling [`Self::try_decode`]. If you do
+/// not push enough bytes, the decoder will return the ranges that are still
+/// needed.
+///
+/// This approach can also be used when you the entire file already in memory

Review Comment:
   ```suggestion
   /// This approach can also be used when you have the entire file already in 
memory
   ```



##########
parquet/src/file/metadata/push_decoder.rs:
##########
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::ParquetError;
+use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, 
ParquetMetaDataReader};
+use crate::DecodeResult;
+
+/// A push decoder for [`ParquetMetaData`].
+///
+/// This structure implements a push API based version of the 
[`ParquetMetaDataReader`], which
+/// decouples the IO from the metadata decoding logic.
+///
+/// You can use this decoder to customize your IO operations, as shown in the
+/// examples below for minimizing bytes read, prefetching data, or
+/// using async IO.
+///
+/// # Example
+///
+/// The most basic usage is to feed the decoder with the necessary byte ranges
+/// as requested as shown below.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// #   let mut buffer = vec![0];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// #
+/// # let file_len = file_bytes.len() as u64;
+/// // The `ParquetMetaDataPushDecoder` needs to know the file length.
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// // try to decode the metadata. If more data is needed, the decoder will 
tell you what ranges
+/// loop {
+///     match decoder.try_decode() {
+///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // 
decode successful
+///        Ok(DecodeResult::NeedsData(ranges)) => {
+///           // The decoder needs more data
+///           //
+///           // In this example, we call a function that returns the bytes 
for each given range.
+///           // In a real application, you would likely read the data from a 
file or network.
+///           let data = ranges.iter().map(|range| get_range(range)).collect();
+///           // Push the data into the decoder and try to decode again on the 
next iteration.
+///           decoder.push_ranges(ranges, data).unwrap();
+///        }
+///        Ok(DecodeResult::Finished) => { unreachable!("returned metadata in 
previous match arm") }
+///        Err(e) => return Err(e),
+///     }
+/// }
+/// # }
+/// ```
+///
+/// # Example with "prefetching"
+///
+/// By default, the [`ParquetMetaDataPushDecoder`] will requests only the 
exact byte
+/// ranges it needs. This minimizes the number of bytes read, however it
+/// requires at least two IO operations to read the metadata - one to read the
+/// footer and then one to read the metadata.
+///
+/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
+/// IO operations are required to read the metadata, as the page index is
+/// not part of the normal metadata footer.
+///
+/// To reduce the number of IO operations in systems with high per operation
+/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
+/// the data into the decoder before calling [`Self::try_decode`]. If you do
+/// not push enough bytes, the decoder will return the ranges that are still
+/// needed.
+///
+/// This approach can also be used when you the entire file already in memory
+/// for other reasons.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// #   let mut buffer = vec![0];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// #
+/// let file_len = file_bytes.len() as u64;
+/// // for this example, we "prefetch" all the bytes which we have in memory,
+/// // but in a real application, you would likely read a chunk from the end
+/// // for example 1MB.
+/// let prefetched_bytes = file_bytes.clone();
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// // push the prefetched bytes into the decoder
+/// decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
+/// // The decoder will now be able to decode the metadata. Note in a real 
application,
+/// // unless you can guarantee that the pushed data is enough to decode the 
metadata,
+/// // you still need to call `try_decode` in a loop until it returns 
`DecodeResult::Data`
+/// // as shown in  the previous example
+///  match decoder.try_decode() {
+///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // 
decode successful]
+///        other @ _ => { panic!("expected DecodeResult::Data, got: 
{other:?}") }
+///    }

Review Comment:
   ```suggestion
   ///      Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // 
decode successful]
   ///      other @ _ => { panic!("expected DecodeResult::Data, got: 
{other:?}") }
   ///  }
   ```



##########
parquet/src/file/metadata/push_decoder.rs:
##########
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::ParquetError;
+use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, 
ParquetMetaDataReader};
+use crate::DecodeResult;
+
+/// A push decoder for [`ParquetMetaData`].
+///
+/// This structure implements a push API based version of the 
[`ParquetMetaDataReader`], which
+/// decouples the IO from the metadata decoding logic.
+///
+/// You can use this decoder to customize your IO operations, as shown in the
+/// examples below for minimizing bytes read, prefetching data, or
+/// using async IO.
+///
+/// # Example
+///
+/// The most basic usage is to feed the decoder with the necessary byte ranges
+/// as requested as shown below.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// #   let mut buffer = vec![0];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// #
+/// # let file_len = file_bytes.len() as u64;
+/// // The `ParquetMetaDataPushDecoder` needs to know the file length.
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// // try to decode the metadata. If more data is needed, the decoder will 
tell you what ranges
+/// loop {
+///     match decoder.try_decode() {
+///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // 
decode successful
+///        Ok(DecodeResult::NeedsData(ranges)) => {
+///           // The decoder needs more data
+///           //
+///           // In this example, we call a function that returns the bytes 
for each given range.
+///           // In a real application, you would likely read the data from a 
file or network.
+///           let data = ranges.iter().map(|range| get_range(range)).collect();
+///           // Push the data into the decoder and try to decode again on the 
next iteration.
+///           decoder.push_ranges(ranges, data).unwrap();
+///        }
+///        Ok(DecodeResult::Finished) => { unreachable!("returned metadata in 
previous match arm") }
+///        Err(e) => return Err(e),
+///     }
+/// }
+/// # }
+/// ```
+///
+/// # Example with "prefetching"
+///
+/// By default, the [`ParquetMetaDataPushDecoder`] will requests only the 
exact byte
+/// ranges it needs. This minimizes the number of bytes read, however it
+/// requires at least two IO operations to read the metadata - one to read the
+/// footer and then one to read the metadata.
+///
+/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
+/// IO operations are required to read the metadata, as the page index is
+/// not part of the normal metadata footer.
+///
+/// To reduce the number of IO operations in systems with high per operation
+/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
+/// the data into the decoder before calling [`Self::try_decode`]. If you do
+/// not push enough bytes, the decoder will return the ranges that are still
+/// needed.
+///
+/// This approach can also be used when you the entire file already in memory
+/// for other reasons.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// #   let mut buffer = vec![0];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// #
+/// let file_len = file_bytes.len() as u64;
+/// // for this example, we "prefetch" all the bytes which we have in memory,

Review Comment:
   ```suggestion
   /// // For this example, we "prefetch" all the bytes which we have in memory,
   ```



##########
parquet/src/util/push_buffers.rs:
##########
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::ParquetError;
+use crate::file::reader::{ChunkReader, Length};
+use bytes::Bytes;
+use std::fmt::Display;
+use std::ops::Range;
+
+/// Holds multiple buffers of data
+///
+/// This is the in-memory buffer for the ParquetDecoder and 
ParquetMetadataDecoders
+///
+/// Features:
+/// 1. Zero copy
+/// 2. non contiguous ranges of bytes
+///
+/// # Non Coalescing

Review Comment:
   ❤️ 



##########
parquet/src/file/metadata/push_decoder.rs:
##########
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::ParquetError;
+use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, 
ParquetMetaDataReader};
+use crate::DecodeResult;
+
+/// A push decoder for [`ParquetMetaData`].
+///
+/// This structure implements a push API based version of the 
[`ParquetMetaDataReader`], which
+/// decouples the IO from the metadata decoding logic.
+///
+/// You can use this decoder to customize your IO operations, as shown in the
+/// examples below for minimizing bytes read, prefetching data, or
+/// using async IO.
+///
+/// # Example
+///
+/// The most basic usage is to feed the decoder with the necessary byte ranges
+/// as requested as shown below.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// #   let mut buffer = vec![0];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// #
+/// # let file_len = file_bytes.len() as u64;
+/// // The `ParquetMetaDataPushDecoder` needs to know the file length.
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// // try to decode the metadata. If more data is needed, the decoder will 
tell you what ranges
+/// loop {
+///     match decoder.try_decode() {
+///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // 
decode successful
+///        Ok(DecodeResult::NeedsData(ranges)) => {
+///           // The decoder needs more data
+///           //
+///           // In this example, we call a function that returns the bytes 
for each given range.
+///           // In a real application, you would likely read the data from a 
file or network.
+///           let data = ranges.iter().map(|range| get_range(range)).collect();
+///           // Push the data into the decoder and try to decode again on the 
next iteration.
+///           decoder.push_ranges(ranges, data).unwrap();
+///        }
+///        Ok(DecodeResult::Finished) => { unreachable!("returned metadata in 
previous match arm") }
+///        Err(e) => return Err(e),
+///     }
+/// }
+/// # }
+/// ```
+///
+/// # Example with "prefetching"
+///
+/// By default, the [`ParquetMetaDataPushDecoder`] will requests only the 
exact byte
+/// ranges it needs. This minimizes the number of bytes read, however it
+/// requires at least two IO operations to read the metadata - one to read the
+/// footer and then one to read the metadata.
+///
+/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
+/// IO operations are required to read the metadata, as the page index is
+/// not part of the normal metadata footer.
+///
+/// To reduce the number of IO operations in systems with high per operation
+/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
+/// the data into the decoder before calling [`Self::try_decode`]. If you do
+/// not push enough bytes, the decoder will return the ranges that are still
+/// needed.
+///
+/// This approach can also be used when you the entire file already in memory
+/// for other reasons.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// #   let mut buffer = vec![0];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// #
+/// let file_len = file_bytes.len() as u64;
+/// // for this example, we "prefetch" all the bytes which we have in memory,
+/// // but in a real application, you would likely read a chunk from the end
+/// // for example 1MB.
+/// let prefetched_bytes = file_bytes.clone();
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// // push the prefetched bytes into the decoder
+/// decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
+/// // The decoder will now be able to decode the metadata. Note in a real 
application,
+/// // unless you can guarantee that the pushed data is enough to decode the 
metadata,
+/// // you still need to call `try_decode` in a loop until it returns 
`DecodeResult::Data`
+/// // as shown in  the previous example
+///  match decoder.try_decode() {
+///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // 
decode successful]
+///        other @ _ => { panic!("expected DecodeResult::Data, got: 
{other:?}") }
+///    }
+/// # }
+/// ```
+///
+/// # Example using [`AsyncRead`]
+///
+/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source 
that can
+/// provide byte ranges, including async IO sources. However, it does not
+/// implement async IO itself. To use async IO, you simply write an async
+/// wrapper around it that reads the required byte ranges and pushes them into 
the
+/// decoder.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// // this function decodes Parquet Metadata from anything that implements
+/// // [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
+/// async fn decode_metadata(
+///   file_len: u64,
+///   mut async_source: impl AsyncRead + AsyncSeek + Unpin
+/// ) -> Result<ParquetMetaData, ParquetError> {
+///   // We need a ParquetMetaDataPushDecoder to decode the metadata.
+///   let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+///   loop {
+///     match decoder.try_decode() {
+///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // 
decode successful
+///        Ok(DecodeResult::NeedsData(ranges)) => {
+///           // The decoder needs more data
+///           //
+///           // In this example we use the AsyncRead and AsyncSeek traits to 
read the
+///           // required ranges from the async source.
+///           let mut data = Vec::with_capacity(ranges.len());
+///           for range in &ranges {
+///             let mut buffer = vec![0; (range.end - range.start) as usize];
+///             
async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
+///             async_source.read_exact(&mut buffer).await?;
+///            data.push(Bytes::from(buffer));

Review Comment:
   ```suggestion
   ///             data.push(Bytes::from(buffer));
   ```



##########
parquet/src/file/metadata/push_decoder.rs:
##########
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::ParquetError;
+use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, 
ParquetMetaDataReader};
+use crate::DecodeResult;
+
+/// A push decoder for [`ParquetMetaData`].
+///
+/// This structure implements a push API based version of the 
[`ParquetMetaDataReader`], which
+/// decouples the IO from the metadata decoding logic.
+///
+/// You can use this decoder to customize your IO operations, as shown in the
+/// examples below for minimizing bytes read, prefetching data, or
+/// using async IO.
+///
+/// # Example
+///
+/// The most basic usage is to feed the decoder with the necessary byte ranges
+/// as requested as shown below.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// #   let mut buffer = vec![0];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given 
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// #    let start = range.start as usize;
+/// #     let end = range.end as usize;
+/// #    file_bytes.slice(start..end)
+/// # };
+/// #
+/// # let file_len = file_bytes.len() as u64;
+/// // The `ParquetMetaDataPushDecoder` needs to know the file length.
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// // try to decode the metadata. If more data is needed, the decoder will 
tell you what ranges
+/// loop {
+///     match decoder.try_decode() {
+///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // 
decode successful
+///        Ok(DecodeResult::NeedsData(ranges)) => {
+///           // The decoder needs more data
+///           //
+///           // In this example, we call a function that returns the bytes 
for each given range.
+///           // In a real application, you would likely read the data from a 
file or network.
+///           let data = ranges.iter().map(|range| get_range(range)).collect();
+///           // Push the data into the decoder and try to decode again on the 
next iteration.
+///           decoder.push_ranges(ranges, data).unwrap();
+///        }
+///        Ok(DecodeResult::Finished) => { unreachable!("returned metadata in 
previous match arm") }
+///        Err(e) => return Err(e),
+///     }
+/// }
+/// # }
+/// ```
+///
+/// # Example with "prefetching"
+///
+/// By default, the [`ParquetMetaDataPushDecoder`] will requests only the 
exact byte
+/// ranges it needs. This minimizes the number of bytes read, however it
+/// requires at least two IO operations to read the metadata - one to read the
+/// footer and then one to read the metadata.
+///
+/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
+/// IO operations are required to read the metadata, as the page index is
+/// not part of the normal metadata footer.
+///
+/// To reduce the number of IO operations in systems with high per operation
+/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
+/// the data into the decoder before calling [`Self::try_decode`]. If you do
+/// not push enough bytes, the decoder will return the ranges that are still
+/// needed.
+///
+/// This approach can also be used when you the entire file already in memory
+/// for other reasons.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// #   let mut buffer = vec![0];
+/// #   let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
None).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// #
+/// let file_len = file_bytes.len() as u64;
+/// // for this example, we "prefetch" all the bytes which we have in memory,
+/// // but in a real application, you would likely read a chunk from the end
+/// // for example 1MB.
+/// let prefetched_bytes = file_bytes.clone();
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// // push the prefetched bytes into the decoder
+/// decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
+/// // The decoder will now be able to decode the metadata. Note in a real 
application,
+/// // unless you can guarantee that the pushed data is enough to decode the 
metadata,
+/// // you still need to call `try_decode` in a loop until it returns 
`DecodeResult::Data`
+/// // as shown in  the previous example
+///  match decoder.try_decode() {
+///        Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // 
decode successful]
+///        other @ _ => { panic!("expected DecodeResult::Data, got: 
{other:?}") }
+///    }
+/// # }
+/// ```
+///
+/// # Example using [`AsyncRead`]
+///
+/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source 
that can
+/// provide byte ranges, including async IO sources. However, it does not
+/// implement async IO itself. To use async IO, you simply write an async
+/// wrapper around it that reads the required byte ranges and pushes them into 
the
+/// decoder.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData, 
ParquetMetaDataPushDecoder};
+/// #
+/// // this function decodes Parquet Metadata from anything that implements

Review Comment:
   ```suggestion
   /// // This function decodes Parquet Metadata from anything that implements
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to