This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f2d9ac14b feat(parquet): Implement AsyncFileWriter for 
`object_store::buffered::BufWriter` (#6013)
7f2d9ac14b is described below

commit 7f2d9ac14b1b5b846feb130f5cbdfd64e6616cb9
Author: Xuanwo <[email protected]>
AuthorDate: Tue Aug 6 18:29:44 2024 +0800

    feat(parquet): Implement AsyncFileWriter for 
`object_store::buffered::BufWriter` (#6013)
    
    * feat(parquet): Implement AsyncFileWriter for obejct_store::BufWriter
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Bump object_store
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Address comments
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add comments
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Make it better to read
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix docs
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/Cargo.toml                      |   4 +-
 parquet/src/arrow/async_writer/mod.rs   |  10 +-
 parquet/src/arrow/async_writer/store.rs | 157 ++++++++++++++++++++++++++++++++
 3 files changed, 168 insertions(+), 3 deletions(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index f959827599..bc59c2f3fa 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -44,7 +44,7 @@ arrow-schema = { workspace = true, optional = true }
 arrow-select = { workspace = true, optional = true }
 arrow-ipc = { workspace = true, optional = true }
 # Intentionally not a path dependency as object_store is released separately
-object_store = { version = "0.10.0", default-features = false, optional = true 
}
+object_store = { version = "0.10.2", default-features = false, optional = true 
}
 
 bytes = { version = "1.1", default-features = false, features = ["std"] }
 thrift = { version = "0.17", default-features = false }
@@ -82,7 +82,7 @@ serde_json = { version = "1.0", features = ["std"], 
default-features = false }
 arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", 
"json"] }
 tokio = { version = "1.0", default-features = false, features = ["macros", 
"rt", "io-util", "fs"] }
 rand = { version = "0.8", default-features = false, features = ["std", 
"std_rng"] }
-object_store = { version = "0.10.0", default-features = false, features = 
["azure"] }
+object_store = { version = "0.10.2", default-features = false, features = 
["azure"] }
 
 # TODO: temporary to fix parquet wasm build
 # upstream issue: https://github.com/gyscos/zstd-rs/issues/269
diff --git a/parquet/src/arrow/async_writer/mod.rs 
b/parquet/src/arrow/async_writer/mod.rs
index 274d8fef89..50bb5c0463 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -22,6 +22,7 @@
 //! read the documentation there before using this API.
 //!
 //! Here is an example for using [`AsyncArrowWriter`]:
+//!
 //! ```
 //! # #[tokio::main(flavor="current_thread")]
 //! # async fn main() {
@@ -49,6 +50,13 @@
 //! assert_eq!(to_write, read);
 //! # }
 //! ```
+//!
+//! [`object_store`] provides it's native implementation of 
[`AsyncFileWriter`] by [`ParquetObjectWriter`].
+
+#[cfg(feature = "object_store")]
+mod store;
+#[cfg(feature = "object_store")]
+pub use store::*;
 
 use crate::{
     arrow::arrow_writer::ArrowWriterOptions,
@@ -65,7 +73,7 @@ use futures::FutureExt;
 use std::mem;
 use tokio::io::{AsyncWrite, AsyncWriteExt};
 
-/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet 
files
+/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet 
files.
 pub trait AsyncFileWriter: Send {
     /// Write the provided bytes to the underlying writer
     ///
diff --git a/parquet/src/arrow/async_writer/store.rs 
b/parquet/src/arrow/async_writer/store.rs
new file mode 100644
index 0000000000..ad09eae499
--- /dev/null
+++ b/parquet/src/arrow/async_writer/store.rs
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use std::sync::Arc;
+
+use crate::arrow::async_writer::AsyncFileWriter;
+use crate::errors::{ParquetError, Result};
+use object_store::buffered::BufWriter;
+use object_store::path::Path;
+use object_store::ObjectStore;
+use tokio::io::AsyncWriteExt;
+
+/// [`ParquetObjectWriter`] for writing to parquet to [`ObjectStore`]
+///
+/// ```
+/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+/// # use object_store::memory::InMemory;
+/// # use object_store::path::Path;
+/// # use object_store::ObjectStore;
+/// # use std::sync::Arc;
+///
+/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+/// # use parquet::arrow::async_writer::ParquetObjectWriter;
+/// # use parquet::arrow::AsyncArrowWriter;
+///
+/// # #[tokio::main(flavor="current_thread")]
+/// # async fn main() {
+///     let store = Arc::new(InMemory::new());
+///
+///     let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as 
ArrayRef;
+///     let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+///
+///     let object_store_writer = ParquetObjectWriter::new(store.clone(), 
Path::from("test"));
+///     let mut writer =
+///         AsyncArrowWriter::try_new(object_store_writer, to_write.schema(), 
None).unwrap();
+///     writer.write(&to_write).await.unwrap();
+///     writer.close().await.unwrap();
+///
+///     let buffer = store
+///         .get(&Path::from("test"))
+///         .await
+///         .unwrap()
+///         .bytes()
+///         .await
+///         .unwrap();
+///     let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+///         .unwrap()
+///         .build()
+///         .unwrap();
+///     let read = reader.next().unwrap().unwrap();
+///
+///     assert_eq!(to_write, read);
+/// # }
+/// ```
+#[derive(Debug)]
+pub struct ParquetObjectWriter {
+    w: BufWriter,
+}
+
+impl ParquetObjectWriter {
+    /// Create a new [`ParquetObjectWriter`] that writes to the specified path 
in the given store.
+    ///
+    /// To configure the writer behavior, please build [`BufWriter`] and then 
use [`Self::from_buf_writer`]
+    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+        Self::from_buf_writer(BufWriter::new(store, path))
+    }
+
+    /// Construct a new ParquetObjectWriter via a existing BufWriter.
+    pub fn from_buf_writer(w: BufWriter) -> Self {
+        Self { w }
+    }
+
+    /// Consume the writer and return the underlying BufWriter.
+    pub fn into_inner(self) -> BufWriter {
+        self.w
+    }
+}
+
+impl AsyncFileWriter for ParquetObjectWriter {
+    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
+        Box::pin(async {
+            self.w
+                .put(bs)
+                .await
+                .map_err(|err| ParquetError::External(Box::new(err)))
+        })
+    }
+
+    fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
+        Box::pin(async {
+            self.w
+                .shutdown()
+                .await
+                .map_err(|err| ParquetError::External(Box::new(err)))
+        })
+    }
+}
+impl From<BufWriter> for ParquetObjectWriter {
+    fn from(w: BufWriter) -> Self {
+        Self::from_buf_writer(w)
+    }
+}
+#[cfg(test)]
+mod tests {
+    use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+    use object_store::memory::InMemory;
+    use std::sync::Arc;
+
+    use super::*;
+    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+    use crate::arrow::AsyncArrowWriter;
+
+    #[tokio::test]
+    async fn test_async_writer() {
+        let store = Arc::new(InMemory::new());
+
+        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as 
ArrayRef;
+        let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+
+        let object_store_writer = ParquetObjectWriter::new(store.clone(), 
Path::from("test"));
+        let mut writer =
+            AsyncArrowWriter::try_new(object_store_writer, to_write.schema(), 
None).unwrap();
+        writer.write(&to_write).await.unwrap();
+        writer.close().await.unwrap();
+
+        let buffer = store
+            .get(&Path::from("test"))
+            .await
+            .unwrap()
+            .bytes()
+            .await
+            .unwrap();
+        let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+            .unwrap()
+            .build()
+            .unwrap();
+        let read = reader.next().unwrap().unwrap();
+
+        assert_eq!(to_write, read);
+    }
+}

Reply via email to