This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new e664208b7 Add parquet ObjectStore integration (#3370)
e664208b7 is described below
commit e664208b79b638536ae296212223537c3cd37acb
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Dec 19 21:01:40 2022 +0000
Add parquet ObjectStore integration (#3370)
* Add parquet ObjectStore integration
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <[email protected]>
* Add tests
* Fix merge conflict
Co-authored-by: Andrew Lamb <[email protected]>
---
object_store/src/lib.rs | 15 +-
parquet/Cargo.toml | 3 +
parquet/src/arrow/async_reader/metadata.rs | 159 +++++++++++++++++++++
.../arrow/{async_reader.rs => async_reader/mod.rs} | 8 ++
parquet/src/arrow/async_reader/store.rs | 158 ++++++++++++++++++++
5 files changed, 340 insertions(+), 3 deletions(-)
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 85e8737b7..6078c1c93 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -33,9 +33,18 @@
//!
//! # Create an [`ObjectStore`] implementation:
//!
-//! * [Google Cloud Storage](https://cloud.google.com/storage/):
[`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)
-//! * [Amazon S3](https://aws.amazon.com/s3/):
[`AmazonS3Builder`](aws::AmazonS3Builder)
-//! * [Azure Blob
Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/)::
[`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)
+#:
[`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
+)]
+#:
[`AmazonS3Builder`](aws::AmazonS3Builder)"
+)]
+#:
[`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
+)]
//! * In Memory: [`InMemory`](memory::InMemory)
//! * Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)
//!
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index cde46b98b..22dbc7e22 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -43,6 +43,7 @@ arrow-data = { version = "29.0.0", path = "../arrow-data",
default-features = fa
arrow-schema = { version = "29.0.0", path = "../arrow-schema",
default-features = false, optional = true }
arrow-select = { version = "29.0.0", path = "../arrow-select",
default-features = false, optional = true }
arrow-ipc = { version = "29.0.0", path = "../arrow-ipc", default-features =
false, optional = true }
+object_store = { version = "0.5", path = "../object_store", default-features =
false, optional = true }
bytes = { version = "1.1", default-features = false, features = ["std"] }
thrift = { version = "0.17", default-features = false }
@@ -96,6 +97,8 @@ test_common = ["arrow/test_utils"]
experimental = []
# Enable async APIs
async = ["futures", "tokio"]
+# Enable object_store integration
+object_store = ["dep:object_store", "async"]
[[example]]
name = "read_parquet"
diff --git a/parquet/src/arrow/async_reader/metadata.rs
b/parquet/src/arrow/async_reader/metadata.rs
new file mode 100644
index 000000000..9c96d0650
--- /dev/null
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::{ParquetError, Result};
+use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::metadata::ParquetMetaData;
+use bytes::{BufMut, Bytes, BytesMut};
+use std::future::Future;
+use std::ops::Range;
+
+/// Fetches parquet metadata
+///
+/// Parameters:
+/// * fetch: an async function that can fetch byte ranges
+/// * file_size: the total size of the parquet file
+/// * footer_size_hint: footer prefetch size (see comments below)
+///
+/// The length of the parquet footer, which contains file metadata, is not
+/// known up front. Therefore this function will first issue a request to read
+/// the last 8 bytes to determine the footer's precise length, before
+/// issuing a second request to fetch the metadata bytes
+///
+/// If a hint is set, this method will read the specified number of bytes
+/// in the first request, instead of 8, and only issue a second request
+/// if additional bytes are needed. This can therefore eliminate a
+/// potentially costly additional fetch operation
+pub async fn fetch_parquet_metadata<F, Fut>(
+ mut fetch: F,
+ file_size: usize,
+ footer_size_hint: Option<usize>,
+) -> Result<ParquetMetaData>
+where
+ F: FnMut(Range<usize>) -> Fut,
+ Fut: Future<Output = Result<Bytes>>,
+{
+ if file_size < 8 {
+ return Err(ParquetError::EOF(format!(
+ "file size of {} is less than footer",
+ file_size
+ )));
+ }
+
+ // If a size hint is provided, read more than the minimum size
+ // to try and avoid a second fetch.
+ let footer_start = if let Some(size_hint) = footer_size_hint {
+ file_size.saturating_sub(size_hint)
+ } else {
+ file_size - 8
+ };
+
+ let suffix = fetch(footer_start..file_size).await?;
+ let suffix_len = suffix.len();
+
+ let mut footer = [0; 8];
+ footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+
+ let length = decode_footer(&footer)?;
+
+ if file_size < length + 8 {
+ return Err(ParquetError::EOF(format!(
+ "file size of {} is less than footer + metadata {}",
+ file_size,
+ length + 8
+ )));
+ }
+
+ // Did not fetch the entire file metadata in the initial read, need to
make a second request
+ if length > suffix_len - 8 {
+ let metadata_start = file_size - length - 8;
+ let remaining_metadata = fetch(metadata_start..footer_start).await?;
+
+ let mut metadata = BytesMut::with_capacity(length);
+
+ metadata.put(remaining_metadata.as_ref());
+ metadata.put(&suffix[..suffix_len - 8]);
+
+ Ok(decode_metadata(metadata.as_ref())?)
+ } else {
+ let metadata_start = file_size - length - 8;
+
+ Ok(decode_metadata(
+ &suffix[metadata_start - footer_start..suffix_len - 8],
+ )?)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::file::reader::{FileReader, Length, SerializedFileReader};
+ use crate::util::test_common::file_util::get_test_file;
+ use std::fs::File;
+ use std::io::{Read, Seek, SeekFrom};
+
+ fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
+ file.seek(SeekFrom::Start(range.start as _))?;
+ let len = range.end - range.start;
+ let mut buf = Vec::with_capacity(len);
+ file.take(len as _).read_to_end(&mut buf)?;
+ Ok(buf.into())
+ }
+
+ #[tokio::test]
+ async fn test_simple() {
+ let mut file = get_test_file("nulls.snappy.parquet");
+ let len = file.len() as usize;
+
+ let reader =
SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
+ let expected = reader.metadata().file_metadata().schema();
+
+ let mut fetch = |range| futures::future::ready(read_range(&mut file,
range));
+ let actual = fetch_parquet_metadata(&mut fetch, len,
None).await.unwrap();
+ assert_eq!(actual.file_metadata().schema(), expected);
+
+ // Metadata hint too small
+ let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
+ .await
+ .unwrap();
+ assert_eq!(actual.file_metadata().schema(), expected);
+
+ // Metadata hint too large
+ let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
+ .await
+ .unwrap();
+ assert_eq!(actual.file_metadata().schema(), expected);
+
+ // Metadata hint exactly correct
+ let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
+ .await
+ .unwrap();
+ assert_eq!(actual.file_metadata().schema(), expected);
+
+ let err = fetch_parquet_metadata(&mut fetch, 4, None)
+ .await
+ .unwrap_err()
+ .to_string();
+ assert_eq!(err, "EOF: file size of 4 is less than footer");
+
+ let err = fetch_parquet_metadata(&mut fetch, 20, None)
+ .await
+ .unwrap_err()
+ .to_string();
+ assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
+ }
+}
diff --git a/parquet/src/arrow/async_reader.rs
b/parquet/src/arrow/async_reader/mod.rs
similarity index 99%
rename from parquet/src/arrow/async_reader.rs
rename to parquet/src/arrow/async_reader/mod.rs
index 4285a1c17..cbaa2bf6b 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -116,6 +116,14 @@ use crate::file::FOOTER_SIZE;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+mod metadata;
+pub use metadata::*;
+
+#[cfg(feature = "object_store")]
+mod store;
+#[cfg(feature = "object_store")]
+pub use store::*;
+
/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read
parquet files
pub trait AsyncFileReader: Send {
/// Retrieve the bytes in `range`
diff --git a/parquet/src/arrow/async_reader/store.rs
b/parquet/src/arrow/async_reader/store.rs
new file mode 100644
index 000000000..716b641cd
--- /dev/null
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ops::Range;
+use std::sync::Arc;
+
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, TryFutureExt};
+
+use object_store::{ObjectMeta, ObjectStore};
+
+use crate::arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader};
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::ParquetMetaData;
+
+/// Implements [`AsyncFileReader`] for a parquet file in object storage
+pub struct ParquetObjectReader {
+ store: Arc<dyn ObjectStore>,
+ meta: ObjectMeta,
+ metadata_size_hint: Option<usize>,
+}
+
+impl ParquetObjectReader {
+ /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`]
and [`ObjectMeta`]
+ ///
+ /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or
[`ObjectStore::head`]
+ pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self {
+ Self {
+ store,
+ meta,
+ metadata_size_hint: None,
+ }
+ }
+
+ /// Provide a hint as to the size of the parquet file's footer, see
[fetch_parquet_metadata]
+ pub fn with_footer_size_hint(self, hint: usize) -> Self {
+ Self {
+ metadata_size_hint: Some(hint),
+ ..self
+ }
+ }
+}
+
+impl AsyncFileReader for ParquetObjectReader {
+ fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_,
Result<Bytes>> {
+ self.store
+ .get_range(&self.meta.location, range)
+ .map_err(|e| {
+ ParquetError::General(format!("AsyncChunkReader::get_bytes
error: {}", e))
+ })
+ .boxed()
+ }
+
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<usize>>,
+ ) -> BoxFuture<'_, Result<Vec<Bytes>>>
+ where
+ Self: Send,
+ {
+ async move {
+ self.store
+ .get_ranges(&self.meta.location, &ranges)
+ .await
+ .map_err(|e| {
+ ParquetError::General(format!(
+ "ParquetObjectReader::get_byte_ranges error: {}",
+ e
+ ))
+ })
+ }
+ .boxed()
+ }
+
+ fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
+ Box::pin(async move {
+ let metadata = fetch_parquet_metadata(
+ |range| {
+ self.store
+ .get_range(&self.meta.location, range)
+ .map_err(|e| {
+ ParquetError::General(format!(
+ "ParquetObjectReader::get_metadata error: {}",
+ e
+ ))
+ })
+ },
+ self.meta.size,
+ self.metadata_size_hint,
+ )
+ .await?;
+ Ok(Arc::new(metadata))
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use futures::TryStreamExt;
+
+ use arrow::util::test_util::parquet_test_data;
+ use object_store::local::LocalFileSystem;
+ use object_store::path::Path;
+ use object_store::ObjectStore;
+
+ use crate::arrow::async_reader::ParquetObjectReader;
+ use crate::arrow::ParquetRecordBatchStreamBuilder;
+
+ #[tokio::test]
+ async fn test_simple() {
+ let res = parquet_test_data();
+ let store = LocalFileSystem::new_with_prefix(res).unwrap();
+
+ let mut meta = store
+ .head(&Path::from("alltypes_plain.parquet"))
+ .await
+ .unwrap();
+
+ let store = Arc::new(store) as Arc<dyn ObjectStore>;
+ let object_reader = ParquetObjectReader::new(Arc::clone(&store),
meta.clone());
+ let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
+ .await
+ .unwrap();
+ let batches: Vec<_> =
builder.build().unwrap().try_collect().await.unwrap();
+
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_rows(), 8);
+
+ meta.location = Path::from("I don't exist.parquet");
+
+ let object_reader = ParquetObjectReader::new(store, meta);
+ // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug
+ match ParquetRecordBatchStreamBuilder::new(object_reader).await {
+ Ok(_) => panic!("expected failure"),
+ Err(e) => {
+ let err = e.to_string();
+ assert!(err.contains("Parquet error:
ParquetObjectReader::get_metadata error: Object at location") &&
err.contains("not found: No such file or directory (os error 2)"), "{}", err);
+ }
+ }
+ }
+}