alamb commented on code in PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#discussion_r1195683538


##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,250 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load 
[`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// Parameters:
+    /// * fetch: an async function that can fetch byte ranges
+    /// * file_size: the total size of the parquet file
+    /// * footer_size_hint: footer prefetch size (see comments below)
+    ///
+    /// The length of the parquet footer, which contains file metadata, is not
+    /// known up front. Therefore this function will first issue a request to 
read
+    /// the last 8 bytes to determine the footer's precise length, before
+    /// issuing a second request to fetch the metadata bytes
+    ///
+    /// If a `prefetch` is `Some`, this will read the specified number of bytes

Review Comment:
   👍 



##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,252 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load 
[`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {

Review Comment:
   I think it is worth a comment in the code about the rationale to keep the 
struct crate private so future readers know
   
   ```suggestion
   /// 
   /// crate private until the interface is stabalized
   pub(crate) struct MetadataLoader<F> {
   ```



##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -155,4 +322,53 @@ mod tests {
             .to_string();
         assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
     }
+
+    #[tokio::test]
+    async fn test_page_index() {
+        let mut file = get_test_file("alltypes_tiny_pages.parquet");
+        let len = file.len() as usize;
+        let fetch_count = AtomicUsize::new(0);
+        let mut fetch = |range| {
+            fetch_count.fetch_add(1, Ordering::SeqCst);
+            futures::future::ready(read_range(&mut file, range))
+        };
+
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
+        let metadata = loader.finish();
+        assert!(metadata.offset_index().is_some() && 
metadata.column_index().is_some());
+
+        // Prefetch just footer exactly
+        fetch_count.store(0, Ordering::SeqCst);
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, 
Some(1729)).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+        let metadata = loader.finish();
+        assert!(metadata.offset_index().is_some() && 
metadata.column_index().is_some());
+
+        // Prefetch more than footer but not enough
+        fetch_count.store(0, Ordering::SeqCst);
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, 
Some(130649)).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+        let metadata = loader.finish();
+        assert!(metadata.offset_index().is_some() && 
metadata.column_index().is_some());
+
+        // Prefetch exactly enough
+        fetch_count.store(0, Ordering::SeqCst);
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, 
Some(130650)).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

Review Comment:
   🎉 



##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,250 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load 
[`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// Parameters:
+    /// * fetch: an async function that can fetch byte ranges
+    /// * file_size: the total size of the parquet file
+    /// * footer_size_hint: footer prefetch size (see comments below)
+    ///
+    /// The length of the parquet footer, which contains file metadata, is not
+    /// known up front. Therefore this function will first issue a request to 
read
+    /// the last 8 bytes to determine the footer's precise length, before
+    /// issuing a second request to fetch the metadata bytes
+    ///
+    /// If a `prefetch` is `Some`, this will read the specified number of bytes
+    /// in the first request, instead of 8, and only issue further requests
+    /// if additional bytes are needed. Providing a hint can therefore 
significantly
+    /// reduce the number of `fetch` requests, and consequently latency
+    pub async fn load(
+        mut fetch: F,
+        file_size: usize,
+        prefetch: Option<usize>,
+    ) -> Result<Self> {
+        if file_size < 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {file_size} is less than footer"
+            )));
+        }
 
-    // If a size hint is provided, read more than the minimum size
-    // to try and avoid a second fetch.
-    let footer_start = if let Some(size_hint) = footer_size_hint {
-        file_size.saturating_sub(size_hint)
-    } else {
-        file_size - 8
-    };
+        // If a size hint is provided, read more than the minimum size
+        // to try and avoid a second fetch.
+        let footer_start = if let Some(size_hint) = prefetch {
+            file_size.saturating_sub(size_hint)
+        } else {
+            file_size - 8
+        };
 
-    let suffix = fetch(footer_start..file_size).await?;
-    let suffix_len = suffix.len();
+        let suffix = fetch.fetch(footer_start..file_size).await?;
+        let suffix_len = suffix.len();
 
-    let mut footer = [0; 8];
-    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+        let mut footer = [0; 8];
+        footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
 
-    let length = decode_footer(&footer)?;
+        let length = decode_footer(&footer)?;
 
-    if file_size < length + 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {} is less than footer + metadata {}",
-            file_size,
-            length + 8
-        )));
+        if file_size < length + 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {} is less than footer + metadata {}",
+                file_size,
+                length + 8
+            )));
+        }
+
+        // Did not fetch the entire file metadata in the initial read, need to 
make a second request
+        let (metadata, remainder) = if length > suffix_len - 8 {
+            let metadata_start = file_size - length - 8;
+            let remaining_metadata = 
fetch.fetch(metadata_start..footer_start).await?;
+
+            let reader = 
remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
+            (read_metadata(reader)?, None)
+        } else {
+            let metadata_start = file_size - length - 8 - footer_start;
+
+            let slice = &suffix[metadata_start..suffix_len - 8];
+            (
+                read_metadata(slice)?,
+                Some((footer_start, suffix.slice(..metadata_start))),
+            )
+        };
+
+        Ok(Self {
+            fetch,
+            metadata,
+            remainder,
+        })
+    }
+
+    /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
+    pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
+        Self {
+            fetch,
+            metadata,
+            remainder: None,
+        }
     }
 
-    // Did not fetch the entire file metadata in the initial read, need to 
make a second request
-    if length > suffix_len - 8 {
-        let metadata_start = file_size - length - 8;
-        let remaining_metadata = fetch(metadata_start..footer_start).await?;
+    /// Loads the page index, if any
+    ///
+    /// * `column_index`: if true will load column index
+    /// * `offset_index`: if true will load offset index
+    pub async fn load_page_index(
+        &mut self,
+        column_index: bool,
+        offset_index: bool,
+    ) -> Result<()> {
+        if !column_index && !offset_index {
+            return Ok(());
+        }
 
-        let mut metadata = BytesMut::with_capacity(length);
+        let mut range = None;
+        for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
+            range = acc_range(range, c.column_index_range());
+            range = acc_range(range, c.offset_index_range());
+        }
+        let range = match range {
+            None => return Ok(()),
+            Some(range) => range,
+        };
 
-        metadata.put(remaining_metadata.as_ref());
-        metadata.put(&suffix[..suffix_len - 8]);
+        let data = match &self.remainder {
+            Some((remainder_start, remainder)) if *remainder_start <= 
range.start => {
+                let offset = range.start - *remainder_start;
+                remainder.slice(offset..range.end - *remainder_start + offset)
+            }
+            // Note: this will potentially fetch data already in remainder, 
this keeps things simple
+            _ => self.fetch.fetch(range.start..range.end).await?,
+        };
 
-        Ok(decode_metadata(metadata.as_ref())?)
-    } else {
-        let metadata_start = file_size - length - 8;
+        // Sanity check
+        assert_eq!(data.len(), range.end - range.start);
+        let offset = range.start;
+
+        if column_index {
+            let index = self
+                .metadata
+                .row_groups()
+                .iter()
+                .map(|x| {
+                    x.columns()
+                        .iter()
+                        .map(|c| match c.column_index_range() {
+                            Some(r) => decode_column_index(
+                                &data[r.start - offset..r.end - offset],
+                                c.column_type(),
+                            ),
+                            None => Ok(Index::NONE),
+                        })
+                        .collect::<Result<Vec<_>>>()
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            self.metadata.set_column_index(Some(index));
+        }
+
+        if offset_index {
+            let index = self
+                .metadata
+                .row_groups()
+                .iter()
+                .map(|x| {
+                    x.columns()
+                        .iter()
+                        .map(|c| match c.offset_index_range() {
+                            Some(r) => decode_offset_index(
+                                &data[r.start - offset..r.end - offset],
+                            ),
+                            None => Err(general_err!("missing offset index")),
+                        })
+                        .collect::<Result<Vec<_>>>()
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            self.metadata.set_offset_index(Some(index));
+        }
+
+        Ok(())
+    }
 
-        Ok(decode_metadata(
-            &suffix[metadata_start - footer_start..suffix_len - 8],
-        )?)
+    /// Returns the finished [`ParquetMetaData`]
+    pub fn finish(self) -> ParquetMetaData {
+        self.metadata
     }
 }
 
+struct MetadataFetchFn<F>(F);
+
+impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
+where
+    F: FnMut(Range<usize>) -> Fut + Send,
+    Fut: Future<Output = Result<Bytes>> + Send,
+{
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        async move { self.0(range).await }.boxed()
+    }
+}
+
+/// Fetches parquet metadata

Review Comment:
   Given this is the public interface, I think we should also have the doc 
comments on it as well (even if they are redundant with what is on 
`MetadataFetch`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to