This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b341910  feat: runtime module (#233)
b341910 is described below

commit b341910637246f2e5027b6d52150091f3a460eb1
Author: Chengxu Bian <[email protected]>
AuthorDate: Thu Jul 4 22:29:41 2024 -0400

    feat: runtime module (#233)
    
    * temp runtime
    
    * POC
    
    * fix chrono
    
    * fix dep
    
    * refine module
    
    * refactor to use a deadly simple way
    
    * allow dead_code
    
    * add license
    
    * fix clippy and tests
    
    * clean code
    
    * undo
    
    * add async-std ci test
    
    * rm tokio dev-dep
    
    * make tokio dev dep
    
    * fix sort
    
    * rm tokio dev
---
 .github/workflows/ci.yml          |   5 +-
 Cargo.toml                        |   1 +
 crates/iceberg/Cargo.toml         |   8 ++-
 crates/iceberg/src/lib.rs         |   2 +
 crates/iceberg/src/runtime/mod.rs | 103 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 116 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index f0f8f0f..5be609c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -71,6 +71,9 @@ jobs:
 
       - name: Test
         run: cargo test --no-fail-fast --all-targets --all-features --workspace
-      
+
+      - name: Async-std Test
+        run: cargo test --no-fail-fast --all-targets --no-default-features  
--features "async-std" --features "storage-fs" --workspace
+
       - name: Doc Test
         run: cargo test --no-fail-fast --doc --all-features --workspace
diff --git a/Cargo.toml b/Cargo.toml
index 106aa8a..ce4d300 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -46,6 +46,7 @@ arrow-select = { version = "52" }
 arrow-string = { version = "52" }
 async-stream = "0.3.5"
 async-trait = "0.1"
+async-std = "1.12.0"
 aws-config = "1.1.8"
 aws-sdk-glue = "1.21.0"
 bimap = "0.6"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 6cad4ad..c43f54f 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -29,12 +29,15 @@ license = { workspace = true }
 keywords = ["iceberg"]
 
 [features]
-default = ["storage-fs", "storage-s3"]
+default = ["storage-fs", "storage-s3", "tokio"]
 storage-all = ["storage-fs", "storage-s3"]
 
 storage-fs = ["opendal/services-fs"]
 storage-s3 = ["opendal/services-s3"]
 
+async-std = ["dep:async-std"]
+tokio = ["dep:tokio"]
+
 [dependencies]
 anyhow = { workspace = true }
 apache-avro = { workspace = true }
@@ -45,6 +48,7 @@ arrow-ord = { workspace = true }
 arrow-schema = { workspace = true }
 arrow-select = { workspace = true }
 arrow-string = { workspace = true }
+async-std = { workspace = true, optional = true, features = ["attributes"] }
 async-stream = { workspace = true }
 async-trait = { workspace = true }
 bimap = { workspace = true }
@@ -71,6 +75,7 @@ serde_derive = { workspace = true }
 serde_json = { workspace = true }
 serde_repr = { workspace = true }
 serde_with = { workspace = true }
+tokio = { workspace = true, optional = true }
 typed-builder = { workspace = true }
 url = { workspace = true }
 urlencoding = { workspace = true }
@@ -81,4 +86,3 @@ iceberg_test_utils = { path = "../test_utils", features = 
["tests"] }
 pretty_assertions = { workspace = true }
 tempfile = { workspace = true }
 tera = { workspace = true }
-tokio = { workspace = true }
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 475a058..3985884 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -50,5 +50,7 @@ pub mod expr;
 pub mod transaction;
 pub mod transform;
 
+mod runtime;
+
 pub mod arrow;
 pub mod writer;
diff --git a/crates/iceberg/src/runtime/mod.rs 
b/crates/iceberg/src/runtime/mod.rs
new file mode 100644
index 0000000..453b156
--- /dev/null
+++ b/crates/iceberg/src/runtime/mod.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module contains the async runtime abstraction for iceberg.
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pub enum JoinHandle<T> {
+    #[cfg(feature = "tokio")]
+    Tokio(tokio::task::JoinHandle<T>),
+    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+    AsyncStd(async_std::task::JoinHandle<T>),
+}
+
+impl<T: Send + 'static> Future for JoinHandle<T> {
+    type Output = T;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        match self.get_mut() {
+            #[cfg(feature = "tokio")]
+            JoinHandle::Tokio(handle) => Pin::new(handle)
+                .poll(cx)
+                .map(|h| h.expect("tokio spawned task failed")),
+            #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+            JoinHandle::AsyncStd(handle) => Pin::new(handle).poll(cx),
+        }
+    }
+}
+
+#[allow(dead_code)]
+pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
+where
+    F: Future + Send + 'static,
+    F::Output: Send + 'static,
+{
+    #[cfg(feature = "tokio")]
+    return JoinHandle::Tokio(tokio::task::spawn(f));
+
+    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+    return JoinHandle::AsyncStd(async_std::task::spawn(f));
+}
+
+#[allow(dead_code)]
+pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
+where
+    F: FnOnce() -> T + Send + 'static,
+    T: Send + 'static,
+{
+    #[cfg(feature = "tokio")]
+    return JoinHandle::Tokio(tokio::task::spawn_blocking(f));
+
+    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+    return JoinHandle::AsyncStd(async_std::task::spawn_blocking(f));
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[cfg(feature = "tokio")]
+    #[tokio::test]
+    async fn test_tokio_spawn() {
+        let handle = spawn(async { 1 + 1 });
+        assert_eq!(handle.await, 2);
+    }
+
+    #[cfg(feature = "tokio")]
+    #[tokio::test]
+    async fn test_tokio_spawn_blocking() {
+        let handle = spawn_blocking(|| 1 + 1);
+        assert_eq!(handle.await, 2);
+    }
+
+    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+    #[async_std::test]
+    async fn test_async_std_spawn() {
+        let handle = spawn(async { 1 + 1 });
+        assert_eq!(handle.await, 2);
+    }
+
+    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
+    #[async_std::test]
+    async fn test_async_std_spawn_blocking() {
+        let handle = spawn_blocking(|| 1 + 1);
+        assert_eq!(handle.await, 2);
+    }
+}

Reply via email to