This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b341910 feat: runtime module (#233)
b341910 is described below
commit b341910637246f2e5027b6d52150091f3a460eb1
Author: Chengxu Bian <[email protected]>
AuthorDate: Thu Jul 4 22:29:41 2024 -0400
feat: runtime module (#233)
* temp runtime
* POC
* fix chrono
* fix dep
* refine module
* refactor to use a deadly simple way
* allow dead_code
* add license
* fix clippy and tests
* clean code
* undo
* add async-std ci test
* rm tokio dev-dep
* make tokio dev dep
* fix sort
* rm tokio dev
---
.github/workflows/ci.yml | 5 +-
Cargo.toml | 1 +
crates/iceberg/Cargo.toml | 8 ++-
crates/iceberg/src/lib.rs | 2 +
crates/iceberg/src/runtime/mod.rs | 103 ++++++++++++++++++++++++++++++++++++++
5 files changed, 116 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index f0f8f0f..5be609c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -71,6 +71,9 @@ jobs:
- name: Test
run: cargo test --no-fail-fast --all-targets --all-features --workspace
-
+
+ - name: Async-std Test
+ run: cargo test --no-fail-fast --all-targets --no-default-features
--features "async-std" --features "storage-fs" --workspace
+
- name: Doc Test
run: cargo test --no-fail-fast --doc --all-features --workspace
diff --git a/Cargo.toml b/Cargo.toml
index 106aa8a..ce4d300 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -46,6 +46,7 @@ arrow-select = { version = "52" }
arrow-string = { version = "52" }
async-stream = "0.3.5"
async-trait = "0.1"
+async-std = "1.12.0"
aws-config = "1.1.8"
aws-sdk-glue = "1.21.0"
bimap = "0.6"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 6cad4ad..c43f54f 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -29,12 +29,15 @@ license = { workspace = true }
keywords = ["iceberg"]
[features]
-default = ["storage-fs", "storage-s3"]
+default = ["storage-fs", "storage-s3", "tokio"]
storage-all = ["storage-fs", "storage-s3"]
storage-fs = ["opendal/services-fs"]
storage-s3 = ["opendal/services-s3"]
+async-std = ["dep:async-std"]
+tokio = ["dep:tokio"]
+
[dependencies]
anyhow = { workspace = true }
apache-avro = { workspace = true }
@@ -45,6 +48,7 @@ arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
+async-std = { workspace = true, optional = true, features = ["attributes"] }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
@@ -71,6 +75,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
+tokio = { workspace = true, optional = true }
typed-builder = { workspace = true }
url = { workspace = true }
urlencoding = { workspace = true }
@@ -81,4 +86,3 @@ iceberg_test_utils = { path = "../test_utils", features =
["tests"] }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }
-tokio = { workspace = true }
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 475a058..3985884 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -50,5 +50,7 @@ pub mod expr;
pub mod transaction;
pub mod transform;
+mod runtime;
+
pub mod arrow;
pub mod writer;
diff --git a/crates/iceberg/src/runtime/mod.rs
b/crates/iceberg/src/runtime/mod.rs
new file mode 100644
index 0000000..453b156
--- /dev/null
+++ b/crates/iceberg/src/runtime/mod.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module contains the async runtime abstraction for iceberg.
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pub enum JoinHandle<T> {
+ #[cfg(feature = "tokio")]
+ Tokio(tokio::task::JoinHandle<T>),
+ #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+ AsyncStd(async_std::task::JoinHandle<T>),
+}
+
+impl<T: Send + 'static> Future for JoinHandle<T> {
+ type Output = T;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match self.get_mut() {
+ #[cfg(feature = "tokio")]
+ JoinHandle::Tokio(handle) => Pin::new(handle)
+ .poll(cx)
+ .map(|h| h.expect("tokio spawned task failed")),
+ #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+ JoinHandle::AsyncStd(handle) => Pin::new(handle).poll(cx),
+ }
+ }
+}
+
+#[allow(dead_code)]
+pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
+where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+{
+ #[cfg(feature = "tokio")]
+ return JoinHandle::Tokio(tokio::task::spawn(f));
+
+ #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+ return JoinHandle::AsyncStd(async_std::task::spawn(f));
+}
+
+#[allow(dead_code)]
+pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
+where
+ F: FnOnce() -> T + Send + 'static,
+ T: Send + 'static,
+{
+ #[cfg(feature = "tokio")]
+ return JoinHandle::Tokio(tokio::task::spawn_blocking(f));
+
+ #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+ return JoinHandle::AsyncStd(async_std::task::spawn_blocking(f));
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[cfg(feature = "tokio")]
+ #[tokio::test]
+ async fn test_tokio_spawn() {
+ let handle = spawn(async { 1 + 1 });
+ assert_eq!(handle.await, 2);
+ }
+
+ #[cfg(feature = "tokio")]
+ #[tokio::test]
+ async fn test_tokio_spawn_blocking() {
+ let handle = spawn_blocking(|| 1 + 1);
+ assert_eq!(handle.await, 2);
+ }
+
+ #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+ #[async_std::test]
+ async fn test_async_std_spawn() {
+ let handle = spawn(async { 1 + 1 });
+ assert_eq!(handle.await, 2);
+ }
+
+ #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+ #[async_std::test]
+ async fn test_async_std_spawn_blocking() {
+ let handle = spawn_blocking(|| 1 + 1);
+ assert_eq!(handle.await, 2);
+ }
+}