This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 7f2d9ac14b feat(parquet): Implement AsyncFileWriter for
`object_store::buffered::BufWriter` (#6013)
7f2d9ac14b is described below
commit 7f2d9ac14b1b5b846feb130f5cbdfd64e6616cb9
Author: Xuanwo <[email protected]>
AuthorDate: Tue Aug 6 18:29:44 2024 +0800
feat(parquet): Implement AsyncFileWriter for
`object_store::buffered::BufWriter` (#6013)
* feat(parquet): Implement AsyncFileWriter for obejct_store::BufWriter
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Bump object_store
Signed-off-by: Xuanwo <[email protected]>
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <[email protected]>
* Address comments
Signed-off-by: Xuanwo <[email protected]>
* Add comments
Signed-off-by: Xuanwo <[email protected]>
* Make it better to read
Signed-off-by: Xuanwo <[email protected]>
* Fix docs
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/Cargo.toml | 4 +-
parquet/src/arrow/async_writer/mod.rs | 10 +-
parquet/src/arrow/async_writer/store.rs | 157 ++++++++++++++++++++++++++++++++
3 files changed, 168 insertions(+), 3 deletions(-)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index f959827599..bc59c2f3fa 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -44,7 +44,7 @@ arrow-schema = { workspace = true, optional = true }
arrow-select = { workspace = true, optional = true }
arrow-ipc = { workspace = true, optional = true }
# Intentionally not a path dependency as object_store is released separately
-object_store = { version = "0.10.0", default-features = false, optional = true
}
+object_store = { version = "0.10.2", default-features = false, optional = true
}
bytes = { version = "1.1", default-features = false, features = ["std"] }
thrift = { version = "0.17", default-features = false }
@@ -82,7 +82,7 @@ serde_json = { version = "1.0", features = ["std"],
default-features = false }
arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint",
"json"] }
tokio = { version = "1.0", default-features = false, features = ["macros",
"rt", "io-util", "fs"] }
rand = { version = "0.8", default-features = false, features = ["std",
"std_rng"] }
-object_store = { version = "0.10.0", default-features = false, features =
["azure"] }
+object_store = { version = "0.10.2", default-features = false, features =
["azure"] }
# TODO: temporary to fix parquet wasm build
# upstream issue: https://github.com/gyscos/zstd-rs/issues/269
diff --git a/parquet/src/arrow/async_writer/mod.rs
b/parquet/src/arrow/async_writer/mod.rs
index 274d8fef89..50bb5c0463 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -22,6 +22,7 @@
//! read the documentation there before using this API.
//!
//! Here is an example for using [`AsyncArrowWriter`]:
+//!
//! ```
//! # #[tokio::main(flavor="current_thread")]
//! # async fn main() {
@@ -49,6 +50,13 @@
//! assert_eq!(to_write, read);
//! # }
//! ```
+//!
+//! [`object_store`] provides it's native implementation of
[`AsyncFileWriter`] by [`ParquetObjectWriter`].
+
+#[cfg(feature = "object_store")]
+mod store;
+#[cfg(feature = "object_store")]
+pub use store::*;
use crate::{
arrow::arrow_writer::ArrowWriterOptions,
@@ -65,7 +73,7 @@ use futures::FutureExt;
use std::mem;
use tokio::io::{AsyncWrite, AsyncWriteExt};
-/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet
files
+/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet
files.
pub trait AsyncFileWriter: Send {
/// Write the provided bytes to the underlying writer
///
diff --git a/parquet/src/arrow/async_writer/store.rs
b/parquet/src/arrow/async_writer/store.rs
new file mode 100644
index 0000000000..ad09eae499
--- /dev/null
+++ b/parquet/src/arrow/async_writer/store.rs
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use std::sync::Arc;
+
+use crate::arrow::async_writer::AsyncFileWriter;
+use crate::errors::{ParquetError, Result};
+use object_store::buffered::BufWriter;
+use object_store::path::Path;
+use object_store::ObjectStore;
+use tokio::io::AsyncWriteExt;
+
+/// [`ParquetObjectWriter`] for writing to parquet to [`ObjectStore`]
+///
+/// ```
+/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+/// # use object_store::memory::InMemory;
+/// # use object_store::path::Path;
+/// # use object_store::ObjectStore;
+/// # use std::sync::Arc;
+///
+/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+/// # use parquet::arrow::async_writer::ParquetObjectWriter;
+/// # use parquet::arrow::AsyncArrowWriter;
+///
+/// # #[tokio::main(flavor="current_thread")]
+/// # async fn main() {
+/// let store = Arc::new(InMemory::new());
+///
+/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as
ArrayRef;
+/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+///
+/// let object_store_writer = ParquetObjectWriter::new(store.clone(),
Path::from("test"));
+/// let mut writer =
+/// AsyncArrowWriter::try_new(object_store_writer, to_write.schema(),
None).unwrap();
+/// writer.write(&to_write).await.unwrap();
+/// writer.close().await.unwrap();
+///
+/// let buffer = store
+/// .get(&Path::from("test"))
+/// .await
+/// .unwrap()
+/// .bytes()
+/// .await
+/// .unwrap();
+/// let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+/// .unwrap()
+/// .build()
+/// .unwrap();
+/// let read = reader.next().unwrap().unwrap();
+///
+/// assert_eq!(to_write, read);
+/// # }
+/// ```
+#[derive(Debug)]
+pub struct ParquetObjectWriter {
+ w: BufWriter,
+}
+
+impl ParquetObjectWriter {
+ /// Create a new [`ParquetObjectWriter`] that writes to the specified path
in the given store.
+ ///
+ /// To configure the writer behavior, please build [`BufWriter`] and then
use [`Self::from_buf_writer`]
+ pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+ Self::from_buf_writer(BufWriter::new(store, path))
+ }
+
+ /// Construct a new ParquetObjectWriter via a existing BufWriter.
+ pub fn from_buf_writer(w: BufWriter) -> Self {
+ Self { w }
+ }
+
+ /// Consume the writer and return the underlying BufWriter.
+ pub fn into_inner(self) -> BufWriter {
+ self.w
+ }
+}
+
+impl AsyncFileWriter for ParquetObjectWriter {
+ fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
+ Box::pin(async {
+ self.w
+ .put(bs)
+ .await
+ .map_err(|err| ParquetError::External(Box::new(err)))
+ })
+ }
+
+ fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
+ Box::pin(async {
+ self.w
+ .shutdown()
+ .await
+ .map_err(|err| ParquetError::External(Box::new(err)))
+ })
+ }
+}
+impl From<BufWriter> for ParquetObjectWriter {
+ fn from(w: BufWriter) -> Self {
+ Self::from_buf_writer(w)
+ }
+}
+#[cfg(test)]
+mod tests {
+ use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+ use object_store::memory::InMemory;
+ use std::sync::Arc;
+
+ use super::*;
+ use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+ use crate::arrow::AsyncArrowWriter;
+
+ #[tokio::test]
+ async fn test_async_writer() {
+ let store = Arc::new(InMemory::new());
+
+ let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as
ArrayRef;
+ let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+
+ let object_store_writer = ParquetObjectWriter::new(store.clone(),
Path::from("test"));
+ let mut writer =
+ AsyncArrowWriter::try_new(object_store_writer, to_write.schema(),
None).unwrap();
+ writer.write(&to_write).await.unwrap();
+ writer.close().await.unwrap();
+
+ let buffer = store
+ .get(&Path::from("test"))
+ .await
+ .unwrap()
+ .bytes()
+ .await
+ .unwrap();
+ let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+ .unwrap()
+ .build()
+ .unwrap();
+ let read = reader.next().unwrap().unwrap();
+
+ assert_eq!(to_write, read);
+ }
+}