This is an automated email from the ASF dual-hosted git repository.
kangkang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new b37191ddf2 feat: introduce opendal `AsyncWriter` for parquet
integrations (#4958)
b37191ddf2 is described below
commit b37191ddf2343e74d34dd71b259f0d81d40739b1
Author: Weny Xu <[email protected]>
AuthorDate: Tue Aug 6 00:26:57 2024 +0800
feat: introduce opendal `AsyncWriter` for parquet integrations (#4958)
* chore: init parquet crate
* feat: implement the `OpendalAsyncWriter`
* chore: apply suggestions from CR
* chore: remove arrow dep from default
* chore(ci): add ci for opendal_parquet
* test: add test for async writer
* chore: remove arrow dep
* chore(ci): add doc test
* Update .github/workflows/ci_integration_parquet.yml
* chore(ci): run cargo test
---------
Co-authored-by: Xuanwo <[email protected]>
---
.github/workflows/ci_integration_parquet.yml | 51 ++++++++
.github/workflows/docs.yml | 24 ++++
integrations/parquet/.gitignore | 1 +
integrations/parquet/Cargo.toml | 52 ++++++++
integrations/parquet/examples/async_writer.rs | 45 +++++++
integrations/parquet/src/async_writer.rs | 172 ++++++++++++++++++++++++++
integrations/parquet/src/lib.rs | 72 +++++++++++
7 files changed, 417 insertions(+)
diff --git a/.github/workflows/ci_integration_parquet.yml
b/.github/workflows/ci_integration_parquet.yml
new file mode 100644
index 0000000000..421faa418d
--- /dev/null
+++ b/.github/workflows/ci_integration_parquet.yml
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Integration Parquet CI
+
+on:
+ push:
+ branches:
+ - main
+ pull_request:
+ branches:
+ - main
+ paths:
+ - "integrations/parquet/**"
+ - "core/**"
+ - ".github/workflows/ci_integration_parquet.yml"
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+ cancel-in-progress: true
+
+jobs:
+ check_clippy:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup
+
+ - name: Cargo clippy
+ working-directory: integrations/parquet
+ run: cargo clippy --all-targets --all-features -- -D warnings
+
+ - name: Cargo test
+ working-directory: integrations/parquet
+ run: cargo test
diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml
index fbf561af63..7b9d9e0dba 100644
--- a/.github/workflows/docs.yml
+++ b/.github/workflows/docs.yml
@@ -406,6 +406,29 @@ jobs:
name: virtiofs-opendal-docs
path: ./integrations/virtiofs/target/doc
+ build-parquet-opendal-doc:
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup
+
+ - name: Setup Rust Nightly
+ run: |
+ rustup toolchain install ${{ env.RUST_DOC_TOOLCHAIN }}
+
+ - name: Build parquet-opendal doc
+ working-directory: "integrations/parquet"
+ run: cargo +${{ env.RUST_DOC_TOOLCHAIN }} doc --lib --no-deps
--all-features
+
+ - name: Upload docs
+ uses: actions/upload-artifact@v3
+ with:
+ name: object-parquet-docs
+ path: ./integrations/parquet/target/doc
+
build-website:
runs-on: ubuntu-latest
needs:
@@ -423,6 +446,7 @@ jobs:
- build-fuse3-opendal-doc
- build-unftp-sbe-opendal-doc
- build-virtiofs-opendal-doc
+ - build-parquet-opendal-doc
steps:
- uses: actions/checkout@v4
diff --git a/integrations/parquet/.gitignore b/integrations/parquet/.gitignore
new file mode 100644
index 0000000000..03314f77b5
--- /dev/null
+++ b/integrations/parquet/.gitignore
@@ -0,0 +1 @@
+Cargo.lock
diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml
new file mode 100644
index 0000000000..1efe19b75a
--- /dev/null
+++ b/integrations/parquet/Cargo.toml
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+description = "parquet Integration for Apache OpenDAL"
+name = "parquet_opendal"
+
+authors = ["Apache OpenDAL <[email protected]>"]
+edition = "2021"
+homepage = "https://opendal.apache.org/"
+license = "Apache-2.0"
+repository = "https://github.com/apache/opendal"
+rust-version = "1.75"
+version = "0.0.1"
+
+[dependencies]
+async-trait = "0.1"
+bytes = "1"
+futures = "0.3"
+opendal = { version = "0.48.0", path = "../../core" }
+parquet = { version = "52.0", default-features = false, features = [
+ "async",
+ "arrow",
+] }
+
+[dev-dependencies]
+opendal = { version = "0.48.0", path = "../../core", features = [
+ "services-memory",
+ "services-s3",
+] }
+rand = "0.8.5"
+tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] }
+arrow = { version = "52.0" }
+
+
+[[example]]
+name = "async_writer"
+path = "examples/async_writer.rs"
diff --git a/integrations/parquet/examples/async_writer.rs
b/integrations/parquet/examples/async_writer.rs
new file mode 100644
index 0000000000..9f16f69eac
--- /dev/null
+++ b/integrations/parquet/examples/async_writer.rs
@@ -0,0 +1,45 @@
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array, RecordBatch};
+
+use opendal::{services::S3Config, Operator};
+use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder,
AsyncArrowWriter};
+use parquet_opendal::AsyncWriter;
+
+#[tokio::main]
+async fn main() {
+ let mut cfg = S3Config::default();
+ cfg.access_key_id = Some("my_access_key".to_string());
+ cfg.secret_access_key = Some("my_secret_key".to_string());
+ cfg.endpoint = Some("my_endpoint".to_string());
+ cfg.region = Some("my_region".to_string());
+ cfg.bucket = "my_bucket".to_string();
+
+ // Create a new operator
+ let operator = Operator::from_config(cfg).unwrap().finish();
+ let path = "/path/to/file.parquet";
+
+ // Create an async writer
+ let writer = AsyncWriter::new(
+ operator
+ .writer_with(path)
+ .chunk(32 * 1024 * 1024)
+ .concurrent(8)
+ .await
+ .unwrap(),
+ );
+
+ let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
+ let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+ let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(),
None).unwrap();
+ writer.write(&to_write).await.unwrap();
+ writer.close().await.unwrap();
+
+ let buffer = operator.read(path).await.unwrap().to_bytes();
+ let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+ .unwrap()
+ .build()
+ .unwrap();
+ let read = reader.next().unwrap().unwrap();
+ assert_eq!(to_write, read);
+}
diff --git a/integrations/parquet/src/async_writer.rs
b/integrations/parquet/src/async_writer.rs
new file mode 100644
index 0000000000..027c9214c0
--- /dev/null
+++ b/integrations/parquet/src/async_writer.rs
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+use parquet::arrow::async_writer::AsyncFileWriter;
+use parquet::errors::{ParquetError, Result};
+
+use futures::future::BoxFuture;
+use opendal::Writer;
+
+/// AsyncWriter implements AsyncFileWriter trait by using opendal.
+///
+/// ```no_run
+/// use std::sync::Arc;
+///
+/// use arrow::array::{ArrayRef, Int64Array, RecordBatch};
+///
+/// use opendal::{services::S3Config, Operator};
+/// use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder,
AsyncArrowWriter};
+/// use parquet_opendal::AsyncWriter;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut cfg = S3Config::default();
+/// cfg.access_key_id = Some("my_access_key".to_string());
+/// cfg.secret_access_key = Some("my_secret_key".to_string());
+/// cfg.endpoint = Some("my_endpoint".to_string());
+/// cfg.region = Some("my_region".to_string());
+/// cfg.bucket = "my_bucket".to_string();
+///
+/// // Create a new operator
+/// let operator = Operator::from_config(cfg).unwrap().finish();
+/// let path = "/path/to/file.parquet";
+///
+/// // Create an async writer
+/// let writer = AsyncWriter::new(
+/// operator
+/// .writer_with(path)
+/// .chunk(32 * 1024 * 1024)
+/// .concurrent(8)
+/// .await
+/// .unwrap(),
+/// );
+///
+/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as
ArrayRef;
+/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+/// let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(),
None).unwrap();
+/// writer.write(&to_write).await.unwrap();
+/// writer.close().await.unwrap();
+///
+/// let buffer = operator.read(path).await.unwrap().to_bytes();
+/// let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+/// .unwrap()
+/// .build()
+/// .unwrap();
+/// let read = reader.next().unwrap().unwrap();
+/// assert_eq!(to_write, read);
+/// }
+/// ```
+pub struct AsyncWriter {
+ inner: Writer,
+}
+
+impl AsyncWriter {
+ /// Create a [`OpendalAsyncWriter`] by given [`Writer`].
+ pub fn new(writer: Writer) -> Self {
+ Self { inner: writer }
+ }
+}
+
+impl AsyncFileWriter for AsyncWriter {
+ fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
+ Box::pin(async move {
+ self.inner
+ .write(bs)
+ .await
+ .map_err(|err| ParquetError::External(Box::new(err)))
+ })
+ }
+
+ fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
+ Box::pin(async move {
+ self.inner
+ .close()
+ .await
+ .map_err(|err| ParquetError::External(Box::new(err)))
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use super::*;
+ use arrow::array::{ArrayRef, Int64Array, RecordBatch};
+ use opendal::{services, Operator};
+ use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder,
AsyncArrowWriter};
+
+ #[tokio::test]
+ async fn test_basic() {
+ let op = Operator::new(services::Memory::default()).unwrap().finish();
+ let path = "data/test.txt";
+ let mut writer = AsyncWriter::new(op.writer(path).await.unwrap());
+ let bytes = Bytes::from_static(b"hello, world!");
+ writer.write(bytes).await.unwrap();
+ let bytes = Bytes::from_static(b"hello, OpenDAL!");
+ writer.write(bytes).await.unwrap();
+ writer.complete().await.unwrap();
+
+ let bytes = op.read(path).await.unwrap().to_vec();
+ assert_eq!(bytes, b"hello, world!hello, OpenDAL!");
+ }
+
+ #[tokio::test]
+ async fn test_abort() {
+ let op = Operator::new(services::Memory::default()).unwrap().finish();
+ let path = "data/test.txt";
+ let mut writer = AsyncWriter::new(op.writer(path).await.unwrap());
+ let bytes = Bytes::from_static(b"hello, world!");
+ writer.write(bytes).await.unwrap();
+ let bytes = Bytes::from_static(b"hello, OpenDAL!");
+ writer.write(bytes).await.unwrap();
+ drop(writer);
+
+ let exist = op.is_exist(path).await.unwrap();
+ assert!(!exist);
+ }
+
+ #[tokio::test]
+ async fn test_async_writer() {
+ let operator =
Operator::new(services::Memory::default()).unwrap().finish();
+ let path = "/path/to/file.parquet";
+
+ let writer = AsyncWriter::new(
+ operator
+ .writer_with(path)
+ .chunk(32 * 1024 * 1024)
+ .concurrent(8)
+ .await
+ .unwrap(),
+ );
+
+ let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as
ArrayRef;
+ let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+ let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(),
None).unwrap();
+ writer.write(&to_write).await.unwrap();
+ writer.close().await.unwrap();
+
+ let buffer = operator.read(path).await.unwrap().to_bytes();
+ let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+ .unwrap()
+ .build()
+ .unwrap();
+ let read = reader.next().unwrap().unwrap();
+ assert_eq!(to_write, read);
+ }
+}
diff --git a/integrations/parquet/src/lib.rs b/integrations/parquet/src/lib.rs
new file mode 100644
index 0000000000..ded082d823
--- /dev/null
+++ b/integrations/parquet/src/lib.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! parquet_opendal provides parquet IO utils.
+//!
+//! AsyncWriter implements AsyncFileWriter trait by using opendal.
+//!
+//! ```no_run
+//! use std::sync::Arc;
+//!
+//! use arrow::array::{ArrayRef, Int64Array, RecordBatch};
+//!
+//! use opendal::{services::S3Config, Operator};
+//! use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder,
AsyncArrowWriter};
+//! use parquet_opendal::AsyncWriter;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let mut cfg = S3Config::default();
+//! cfg.access_key_id = Some("my_access_key".to_string());
+//! cfg.secret_access_key = Some("my_secret_key".to_string());
+//! cfg.endpoint = Some("my_endpoint".to_string());
+//! cfg.region = Some("my_region".to_string());
+//! cfg.bucket = "my_bucket".to_string();
+//!
+//! // Create a new operator
+//! let operator = Operator::from_config(cfg).unwrap().finish();
+//! let path = "/path/to/file.parquet";
+//!
+//! // Create an async writer
+//! let writer = AsyncWriter::new(
+//! operator
+//! .writer_with(path)
+//! .chunk(32 * 1024 * 1024)
+//! .concurrent(8)
+//! .await
+//! .unwrap(),
+//! );
+//!
+//! let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as
ArrayRef;
+//! let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+//! let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(),
None).unwrap();
+//! writer.write(&to_write).await.unwrap();
+//! writer.close().await.unwrap();
+//!
+//! let buffer = operator.read(path).await.unwrap().to_bytes();
+//! let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
+//! .unwrap()
+//! .build()
+//! .unwrap();
+//! let read = reader.next().unwrap().unwrap();
+//! assert_eq!(to_write, read);
+//! }
+//! ```
+
+mod async_writer;
+
+pub use async_writer::AsyncWriter;