This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new e664208b7 Add parquet ObjectStore integration (#3370)
e664208b7 is described below

commit e664208b79b638536ae296212223537c3cd37acb
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Dec 19 21:01:40 2022 +0000

    Add parquet ObjectStore integration (#3370)
    
    * Add parquet ObjectStore integration
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Add tests
    
    * Fix merge conflict
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 object_store/src/lib.rs                            |  15 +-
 parquet/Cargo.toml                                 |   3 +
 parquet/src/arrow/async_reader/metadata.rs         | 159 +++++++++++++++++++++
 .../arrow/{async_reader.rs => async_reader/mod.rs} |   8 ++
 parquet/src/arrow/async_reader/store.rs            | 158 ++++++++++++++++++++
 5 files changed, 340 insertions(+), 3 deletions(-)

diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 85e8737b7..6078c1c93 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -33,9 +33,18 @@
 //!
 //! # Create an [`ObjectStore`] implementation:
 //!
-//! * [Google Cloud Storage](https://cloud.google.com/storage/): 
[`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)
-//! * [Amazon S3](https://aws.amazon.com/s3/): 
[`AmazonS3Builder`](aws::AmazonS3Builder)
-//! * [Azure Blob 
Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/):: 
[`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)
+#![cfg_attr(
+    feature = "gcp",
+    doc = "* [Google Cloud Storage](https://cloud.google.com/storage/): 
[`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
+)]
+#![cfg_attr(
+    feature = "aws",
+    doc = "* [Amazon S3](https://aws.amazon.com/s3/): 
[`AmazonS3Builder`](aws::AmazonS3Builder)"
+)]
+#![cfg_attr(
+    feature = "azure",
+    doc = "* [Azure Blob 
Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/): 
[`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
+)]
 //! * In Memory: [`InMemory`](memory::InMemory)
 //! * Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)
 //!
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index cde46b98b..22dbc7e22 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -43,6 +43,7 @@ arrow-data = { version = "29.0.0", path = "../arrow-data", 
default-features = fa
 arrow-schema = { version = "29.0.0", path = "../arrow-schema", 
default-features = false, optional = true }
 arrow-select = { version = "29.0.0", path = "../arrow-select", 
default-features = false, optional = true }
 arrow-ipc = { version = "29.0.0", path = "../arrow-ipc", default-features = 
false, optional = true }
+object_store = { version = "0.5", path = "../object_store", default-features = 
false, optional = true }
 
 bytes = { version = "1.1", default-features = false, features = ["std"] }
 thrift = { version = "0.17", default-features = false }
@@ -96,6 +97,8 @@ test_common = ["arrow/test_utils"]
 experimental = []
 # Enable async APIs
 async = ["futures", "tokio"]
+# Enable object_store integration
+object_store = ["dep:object_store", "async"]
 
 [[example]]
 name = "read_parquet"
diff --git a/parquet/src/arrow/async_reader/metadata.rs 
b/parquet/src/arrow/async_reader/metadata.rs
new file mode 100644
index 000000000..9c96d0650
--- /dev/null
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::{ParquetError, Result};
+use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::metadata::ParquetMetaData;
+use bytes::{BufMut, Bytes, BytesMut};
+use std::future::Future;
+use std::ops::Range;
+
+/// Fetches parquet metadata
+///
+/// Parameters:
+/// * fetch: an async function that can fetch byte ranges
+/// * file_size: the total size of the parquet file
+/// * footer_size_hint: footer prefetch size (see comments below)
+///
+/// The length of the parquet footer, which contains file metadata, is not
+/// known up front. Therefore this function will first issue a request to read
+/// the last 8 bytes to determine the footer's precise length, before
+/// issuing a second request to fetch the metadata bytes
+///
+/// If a hint is set, this method will read the specified number of bytes
+/// in the first request, instead of 8, and only issue a second request
+/// if additional bytes are needed. This can therefore eliminate a
+/// potentially costly additional fetch operation
+pub async fn fetch_parquet_metadata<F, Fut>(
+    mut fetch: F,
+    file_size: usize,
+    footer_size_hint: Option<usize>,
+) -> Result<ParquetMetaData>
+where
+    F: FnMut(Range<usize>) -> Fut,
+    Fut: Future<Output = Result<Bytes>>,
+{
+    if file_size < 8 {
+        return Err(ParquetError::EOF(format!(
+            "file size of {} is less than footer",
+            file_size
+        )));
+    }
+
+    // If a size hint is provided, read more than the minimum size
+    // to try and avoid a second fetch.
+    let footer_start = if let Some(size_hint) = footer_size_hint {
+        file_size.saturating_sub(size_hint)
+    } else {
+        file_size - 8
+    };
+
+    let suffix = fetch(footer_start..file_size).await?;
+    let suffix_len = suffix.len();
+
+    let mut footer = [0; 8];
+    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+
+    let length = decode_footer(&footer)?;
+
+    if file_size < length + 8 {
+        return Err(ParquetError::EOF(format!(
+            "file size of {} is less than footer + metadata {}",
+            file_size,
+            length + 8
+        )));
+    }
+
+    // Did not fetch the entire file metadata in the initial read, need to 
make a second request
+    if length > suffix_len - 8 {
+        let metadata_start = file_size - length - 8;
+        let remaining_metadata = fetch(metadata_start..footer_start).await?;
+
+        let mut metadata = BytesMut::with_capacity(length);
+
+        metadata.put(remaining_metadata.as_ref());
+        metadata.put(&suffix[..suffix_len - 8]);
+
+        Ok(decode_metadata(metadata.as_ref())?)
+    } else {
+        let metadata_start = file_size - length - 8;
+
+        Ok(decode_metadata(
+            &suffix[metadata_start - footer_start..suffix_len - 8],
+        )?)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::file::reader::{FileReader, Length, SerializedFileReader};
+    use crate::util::test_common::file_util::get_test_file;
+    use std::fs::File;
+    use std::io::{Read, Seek, SeekFrom};
+
+    fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
+        file.seek(SeekFrom::Start(range.start as _))?;
+        let len = range.end - range.start;
+        let mut buf = Vec::with_capacity(len);
+        file.take(len as _).read_to_end(&mut buf)?;
+        Ok(buf.into())
+    }
+
+    #[tokio::test]
+    async fn test_simple() {
+        let mut file = get_test_file("nulls.snappy.parquet");
+        let len = file.len() as usize;
+
+        let reader = 
SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
+        let expected = reader.metadata().file_metadata().schema();
+
+        let mut fetch = |range| futures::future::ready(read_range(&mut file, 
range));
+        let actual = fetch_parquet_metadata(&mut fetch, len, 
None).await.unwrap();
+        assert_eq!(actual.file_metadata().schema(), expected);
+
+        // Metadata hint too small
+        let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
+            .await
+            .unwrap();
+        assert_eq!(actual.file_metadata().schema(), expected);
+
+        // Metadata hint too large
+        let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
+            .await
+            .unwrap();
+        assert_eq!(actual.file_metadata().schema(), expected);
+
+        // Metadata hint exactly correct
+        let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
+            .await
+            .unwrap();
+        assert_eq!(actual.file_metadata().schema(), expected);
+
+        let err = fetch_parquet_metadata(&mut fetch, 4, None)
+            .await
+            .unwrap_err()
+            .to_string();
+        assert_eq!(err, "EOF: file size of 4 is less than footer");
+
+        let err = fetch_parquet_metadata(&mut fetch, 20, None)
+            .await
+            .unwrap_err()
+            .to_string();
+        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
+    }
+}
diff --git a/parquet/src/arrow/async_reader.rs 
b/parquet/src/arrow/async_reader/mod.rs
similarity index 99%
rename from parquet/src/arrow/async_reader.rs
rename to parquet/src/arrow/async_reader/mod.rs
index 4285a1c17..cbaa2bf6b 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -116,6 +116,14 @@ use crate::file::FOOTER_SIZE;
 
 use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
 
+mod metadata;
+pub use metadata::*;
+
+#[cfg(feature = "object_store")]
+mod store;
+#[cfg(feature = "object_store")]
+pub use store::*;
+
 /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read 
parquet files
 pub trait AsyncFileReader: Send {
     /// Retrieve the bytes in `range`
diff --git a/parquet/src/arrow/async_reader/store.rs 
b/parquet/src/arrow/async_reader/store.rs
new file mode 100644
index 000000000..716b641cd
--- /dev/null
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ops::Range;
+use std::sync::Arc;
+
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, TryFutureExt};
+
+use object_store::{ObjectMeta, ObjectStore};
+
+use crate::arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader};
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::ParquetMetaData;
+
+/// Implements [`AsyncFileReader`] for a parquet file in object storage
+pub struct ParquetObjectReader {
+    store: Arc<dyn ObjectStore>,
+    meta: ObjectMeta,
+    metadata_size_hint: Option<usize>,
+}
+
+impl ParquetObjectReader {
+    /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] 
and [`ObjectMeta`]
+    ///
+    /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or 
[`ObjectStore::head`]
+    pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self {
+        Self {
+            store,
+            meta,
+            metadata_size_hint: None,
+        }
+    }
+
+    /// Provide a hint as to the size of the parquet file's footer, see 
[fetch_parquet_metadata]
+    pub fn with_footer_size_hint(self, hint: usize) -> Self {
+        Self {
+            metadata_size_hint: Some(hint),
+            ..self
+        }
+    }
+}
+
+impl AsyncFileReader for ParquetObjectReader {
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, 
Result<Bytes>> {
+        self.store
+            .get_range(&self.meta.location, range)
+            .map_err(|e| {
+                ParquetError::General(format!("AsyncChunkReader::get_bytes 
error: {}", e))
+            })
+            .boxed()
+    }
+
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<usize>>,
+    ) -> BoxFuture<'_, Result<Vec<Bytes>>>
+    where
+        Self: Send,
+    {
+        async move {
+            self.store
+                .get_ranges(&self.meta.location, &ranges)
+                .await
+                .map_err(|e| {
+                    ParquetError::General(format!(
+                        "ParquetObjectReader::get_byte_ranges error: {}",
+                        e
+                    ))
+                })
+        }
+        .boxed()
+    }
+
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
+        Box::pin(async move {
+            let metadata = fetch_parquet_metadata(
+                |range| {
+                    self.store
+                        .get_range(&self.meta.location, range)
+                        .map_err(|e| {
+                            ParquetError::General(format!(
+                                "ParquetObjectReader::get_metadata error: {}",
+                                e
+                            ))
+                        })
+                },
+                self.meta.size,
+                self.metadata_size_hint,
+            )
+            .await?;
+            Ok(Arc::new(metadata))
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use futures::TryStreamExt;
+
+    use arrow::util::test_util::parquet_test_data;
+    use object_store::local::LocalFileSystem;
+    use object_store::path::Path;
+    use object_store::ObjectStore;
+
+    use crate::arrow::async_reader::ParquetObjectReader;
+    use crate::arrow::ParquetRecordBatchStreamBuilder;
+
+    #[tokio::test]
+    async fn test_simple() {
+        let res = parquet_test_data();
+        let store = LocalFileSystem::new_with_prefix(res).unwrap();
+
+        let mut meta = store
+            .head(&Path::from("alltypes_plain.parquet"))
+            .await
+            .unwrap();
+
+        let store = Arc::new(store) as Arc<dyn ObjectStore>;
+        let object_reader = ParquetObjectReader::new(Arc::clone(&store), 
meta.clone());
+        let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
+            .await
+            .unwrap();
+        let batches: Vec<_> = 
builder.build().unwrap().try_collect().await.unwrap();
+
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 8);
+
+        meta.location = Path::from("I don't exist.parquet");
+
+        let object_reader = ParquetObjectReader::new(store, meta);
+        // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug
+        match ParquetRecordBatchStreamBuilder::new(object_reader).await {
+            Ok(_) => panic!("expected failure"),
+            Err(e) => {
+                let err = e.to_string();
+                assert!(err.contains("Parquet error: 
ParquetObjectReader::get_metadata error: Object at location") && 
err.contains("not found: No such file or directory (os error 2)"), "{}", err);
+            }
+        }
+    }
+}

Reply via email to