This is an automated email from the ASF dual-hosted git repository.

kangkang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new b37191ddf2 feat: introduce opendal `AsyncWriter` for parquet 
integrations  (#4958)
b37191ddf2 is described below

commit b37191ddf2343e74d34dd71b259f0d81d40739b1
Author: Weny Xu <[email protected]>
AuthorDate: Tue Aug 6 00:26:57 2024 +0800

    feat: introduce opendal `AsyncWriter` for parquet integrations  (#4958)
    
    * chore: init parquet crate
    
    * feat: implement the `OpendalAsyncWriter`
    
    * chore: apply suggestions from CR
    
    * chore: remove arrow dep from default
    
    * chore(ci): add ci for opendal_parquet
    
    * test: add test for async writer
    
    * chore: remove arrow dep
    
    * chore(ci): add doc test
    
    * Update .github/workflows/ci_integration_parquet.yml
    
    * chore(ci): run cargo test
    
    ---------
    
    Co-authored-by: Xuanwo <[email protected]>
---
 .github/workflows/ci_integration_parquet.yml  |  51 ++++++++
 .github/workflows/docs.yml                    |  24 ++++
 integrations/parquet/.gitignore               |   1 +
 integrations/parquet/Cargo.toml               |  52 ++++++++
 integrations/parquet/examples/async_writer.rs |  45 +++++++
 integrations/parquet/src/async_writer.rs      | 172 ++++++++++++++++++++++++++
 integrations/parquet/src/lib.rs               |  72 +++++++++++
 7 files changed, 417 insertions(+)

diff --git a/.github/workflows/ci_integration_parquet.yml 
b/.github/workflows/ci_integration_parquet.yml
new file mode 100644
index 0000000000..421faa418d
--- /dev/null
+++ b/.github/workflows/ci_integration_parquet.yml
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Integration Parquet CI
+
+on:
+  push:
+    branches:
+      - main
+  pull_request:
+    branches:
+      - main
+    paths:
+      - "integrations/parquet/**"
+      - "core/**"
+      - ".github/workflows/ci_integration_parquet.yml"
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+  cancel-in-progress: true
+
+jobs:
+  check_clippy:
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v4
+
+      - name: Setup Rust toolchain
+        uses: ./.github/actions/setup
+
+      - name: Cargo clippy
+        working-directory: integrations/parquet
+        run: cargo clippy --all-targets --all-features -- -D warnings
+
+      - name: Cargo test
+        working-directory: integrations/parquet
+        run: cargo test
diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml
index fbf561af63..7b9d9e0dba 100644
--- a/.github/workflows/docs.yml
+++ b/.github/workflows/docs.yml
@@ -406,6 +406,29 @@ jobs:
           name: virtiofs-opendal-docs
           path: ./integrations/virtiofs/target/doc
 
+  build-parquet-opendal-doc:
+    runs-on: ubuntu-latest
+
+    steps:
+      - uses: actions/checkout@v4
+
+      - name: Setup Rust toolchain
+        uses: ./.github/actions/setup
+
+      - name: Setup Rust Nightly
+        run: |
+          rustup toolchain install ${{ env.RUST_DOC_TOOLCHAIN }}
+
+      - name: Build parquet-opendal doc
+        working-directory: "integrations/parquet"
+        run: cargo +${{ env.RUST_DOC_TOOLCHAIN }} doc --lib --no-deps 
--all-features
+
+      - name: Upload docs
+        uses: actions/upload-artifact@v3
+        with:
+          name: object-parquet-docs
+          path: ./integrations/parquet/target/doc
+
   build-website:
     runs-on: ubuntu-latest
     needs:
@@ -423,6 +446,7 @@ jobs:
       - build-fuse3-opendal-doc
       - build-unftp-sbe-opendal-doc
       - build-virtiofs-opendal-doc
+      - build-parquet-opendal-doc
 
     steps:
       - uses: actions/checkout@v4
diff --git a/integrations/parquet/.gitignore b/integrations/parquet/.gitignore
new file mode 100644
index 0000000000..03314f77b5
--- /dev/null
+++ b/integrations/parquet/.gitignore
@@ -0,0 +1 @@
+Cargo.lock
diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml
new file mode 100644
index 0000000000..1efe19b75a
--- /dev/null
+++ b/integrations/parquet/Cargo.toml
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+description = "parquet Integration for Apache OpenDAL"
+name = "parquet_opendal"
+
+authors = ["Apache OpenDAL <[email protected]>"]
+edition = "2021"
+homepage = "https://opendal.apache.org/";
+license = "Apache-2.0"
+repository = "https://github.com/apache/opendal";
+rust-version = "1.75"
+version = "0.0.1"
+
+[dependencies]
+async-trait = "0.1"
+bytes = "1"
+futures = "0.3"
+opendal = { version = "0.48.0", path = "../../core" }
+parquet = { version = "52.0", default-features = false, features = [
+  "async",
+  "arrow",
+] }
+
+[dev-dependencies]
+opendal = { version = "0.48.0", path = "../../core", features = [
+  "services-memory",
+  "services-s3",
+] }
+rand = "0.8.5"
+tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] }
+arrow = { version = "52.0" }
+
+
+[[example]]
+name = "async_writer"
+path = "examples/async_writer.rs"
diff --git a/integrations/parquet/examples/async_writer.rs 
b/integrations/parquet/examples/async_writer.rs
new file mode 100644
index 0000000000..9f16f69eac
--- /dev/null
+++ b/integrations/parquet/examples/async_writer.rs
@@ -0,0 +1,45 @@
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array, RecordBatch};
+
+use opendal::{services::S3Config, Operator};
+use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, 
AsyncArrowWriter};
+use parquet_opendal::AsyncWriter;
+
+#[tokio::main]
+async fn main() {
+    let mut cfg = S3Config::default();
+    cfg.access_key_id = Some("my_access_key".to_string());
+    cfg.secret_access_key = Some("my_secret_key".to_string());
+    cfg.endpoint = Some("my_endpoint".to_string());
+    cfg.region = Some("my_region".to_string());
+    cfg.bucket = "my_bucket".to_string();
+
+    // Create a new operator
+    let operator = Operator::from_config(cfg).unwrap().finish();
+    let path = "/path/to/file.parquet";
+
+    // Create an async writer
+    let writer = AsyncWriter::new(
+        operator
+            .writer_with(path)
+            .chunk(32 * 1024 * 1024)
+            .concurrent(8)
+            .await
+            .unwrap(),
+    );
+
+    let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
+    let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+    let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), 
None).unwrap();
+    writer.write(&to_write).await.unwrap();
+    writer.close().await.unwrap();
+
+    let buffer = operator.read(path).await.unwrap().to_bytes();
+    let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+        .unwrap()
+        .build()
+        .unwrap();
+    let read = reader.next().unwrap().unwrap();
+    assert_eq!(to_write, read);
+}
diff --git a/integrations/parquet/src/async_writer.rs 
b/integrations/parquet/src/async_writer.rs
new file mode 100644
index 0000000000..027c9214c0
--- /dev/null
+++ b/integrations/parquet/src/async_writer.rs
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+use parquet::arrow::async_writer::AsyncFileWriter;
+use parquet::errors::{ParquetError, Result};
+
+use futures::future::BoxFuture;
+use opendal::Writer;
+
+/// AsyncWriter implements AsyncFileWriter trait by using opendal.
+///
+/// ```no_run
+/// use std::sync::Arc;
+///
+/// use arrow::array::{ArrayRef, Int64Array, RecordBatch};
+///
+/// use opendal::{services::S3Config, Operator};
+/// use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, 
AsyncArrowWriter};
+/// use parquet_opendal::AsyncWriter;
+///
+/// #[tokio::main]
+/// async fn main() {
+///     let mut cfg = S3Config::default();
+///     cfg.access_key_id = Some("my_access_key".to_string());
+///     cfg.secret_access_key = Some("my_secret_key".to_string());
+///     cfg.endpoint = Some("my_endpoint".to_string());
+///     cfg.region = Some("my_region".to_string());
+///     cfg.bucket = "my_bucket".to_string();
+///
+///     // Create a new operator
+///     let operator = Operator::from_config(cfg).unwrap().finish();
+///     let path = "/path/to/file.parquet";
+///
+///     // Create an async writer
+///     let writer = AsyncWriter::new(
+///         operator
+///             .writer_with(path)
+///             .chunk(32 * 1024 * 1024)
+///             .concurrent(8)
+///             .await
+///             .unwrap(),
+///     );
+///
+///     let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as 
ArrayRef;
+///     let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+///     let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), 
None).unwrap();
+///     writer.write(&to_write).await.unwrap();
+///     writer.close().await.unwrap();
+///
+///     let buffer = operator.read(path).await.unwrap().to_bytes();
+///     let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+///         .unwrap()
+///         .build()
+///         .unwrap();
+///     let read = reader.next().unwrap().unwrap();
+///     assert_eq!(to_write, read);
+/// }
+/// ```
+pub struct AsyncWriter {
+    inner: Writer,
+}
+
+impl AsyncWriter {
+    /// Create a [`OpendalAsyncWriter`] by given [`Writer`].
+    pub fn new(writer: Writer) -> Self {
+        Self { inner: writer }
+    }
+}
+
+impl AsyncFileWriter for AsyncWriter {
+    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
+        Box::pin(async move {
+            self.inner
+                .write(bs)
+                .await
+                .map_err(|err| ParquetError::External(Box::new(err)))
+        })
+    }
+
+    fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
+        Box::pin(async move {
+            self.inner
+                .close()
+                .await
+                .map_err(|err| ParquetError::External(Box::new(err)))
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use super::*;
+    use arrow::array::{ArrayRef, Int64Array, RecordBatch};
+    use opendal::{services, Operator};
+    use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, 
AsyncArrowWriter};
+
+    #[tokio::test]
+    async fn test_basic() {
+        let op = Operator::new(services::Memory::default()).unwrap().finish();
+        let path = "data/test.txt";
+        let mut writer = AsyncWriter::new(op.writer(path).await.unwrap());
+        let bytes = Bytes::from_static(b"hello, world!");
+        writer.write(bytes).await.unwrap();
+        let bytes = Bytes::from_static(b"hello, OpenDAL!");
+        writer.write(bytes).await.unwrap();
+        writer.complete().await.unwrap();
+
+        let bytes = op.read(path).await.unwrap().to_vec();
+        assert_eq!(bytes, b"hello, world!hello, OpenDAL!");
+    }
+
+    #[tokio::test]
+    async fn test_abort() {
+        let op = Operator::new(services::Memory::default()).unwrap().finish();
+        let path = "data/test.txt";
+        let mut writer = AsyncWriter::new(op.writer(path).await.unwrap());
+        let bytes = Bytes::from_static(b"hello, world!");
+        writer.write(bytes).await.unwrap();
+        let bytes = Bytes::from_static(b"hello, OpenDAL!");
+        writer.write(bytes).await.unwrap();
+        drop(writer);
+
+        let exist = op.is_exist(path).await.unwrap();
+        assert!(!exist);
+    }
+
+    #[tokio::test]
+    async fn test_async_writer() {
+        let operator = 
Operator::new(services::Memory::default()).unwrap().finish();
+        let path = "/path/to/file.parquet";
+
+        let writer = AsyncWriter::new(
+            operator
+                .writer_with(path)
+                .chunk(32 * 1024 * 1024)
+                .concurrent(8)
+                .await
+                .unwrap(),
+        );
+
+        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as 
ArrayRef;
+        let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+        let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), 
None).unwrap();
+        writer.write(&to_write).await.unwrap();
+        writer.close().await.unwrap();
+
+        let buffer = operator.read(path).await.unwrap().to_bytes();
+        let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+            .unwrap()
+            .build()
+            .unwrap();
+        let read = reader.next().unwrap().unwrap();
+        assert_eq!(to_write, read);
+    }
+}
diff --git a/integrations/parquet/src/lib.rs b/integrations/parquet/src/lib.rs
new file mode 100644
index 0000000000..ded082d823
--- /dev/null
+++ b/integrations/parquet/src/lib.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! parquet_opendal provides parquet IO utils.
+//!
+//! AsyncWriter implements AsyncFileWriter trait by using opendal.
+//!
+//! ```no_run
+//! use std::sync::Arc;
+//!
+//! use arrow::array::{ArrayRef, Int64Array, RecordBatch};
+//!
+//! use opendal::{services::S3Config, Operator};
+//! use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, 
AsyncArrowWriter};
+//! use parquet_opendal::AsyncWriter;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//!     let mut cfg = S3Config::default();
+//!     cfg.access_key_id = Some("my_access_key".to_string());
+//!     cfg.secret_access_key = Some("my_secret_key".to_string());
+//!     cfg.endpoint = Some("my_endpoint".to_string());
+//!     cfg.region = Some("my_region".to_string());
+//!     cfg.bucket = "my_bucket".to_string();
+//!
+//!     // Create a new operator
+//!     let operator = Operator::from_config(cfg).unwrap().finish();
+//!     let path = "/path/to/file.parquet";
+//!
+//!     // Create an async writer
+//!     let writer = AsyncWriter::new(
+//!         operator
+//!             .writer_with(path)
+//!             .chunk(32 * 1024 * 1024)
+//!             .concurrent(8)
+//!             .await
+//!             .unwrap(),
+//!     );
+//!
+//!     let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as 
ArrayRef;
+//!     let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+//!     let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), 
None).unwrap();
+//!     writer.write(&to_write).await.unwrap();
+//!     writer.close().await.unwrap();
+//!
+//!     let buffer = operator.read(path).await.unwrap().to_bytes();
+//!     let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+//!         .unwrap()
+//!         .build()
+//!         .unwrap();
+//!     let read = reader.next().unwrap().unwrap();
+//!     assert_eq!(to_write, read);
+//! }
+//! ```
+
+mod async_writer;
+
+pub use async_writer::AsyncWriter;

Reply via email to