This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch wasm in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 5f52c230e3a8488f8a42151948539bc8572053dc Author: Xuanwo <[email protected]> AuthorDate: Thu Dec 21 20:24:19 2023 +0800 feat: Make OpenDAL available under wasm32 arch Signed-off-by: Xuanwo <[email protected]> --- .github/workflows/ci.yml | 14 ++++ Cargo.lock | 13 ++-- core/Cargo.toml | 7 +- core/benches/oio/utils.rs | 2 - core/src/layers/async_backtrace.rs | 3 +- core/src/layers/await_tree.rs | 3 +- core/src/layers/blocking.rs | 3 +- core/src/layers/chaos.rs | 3 +- core/src/layers/complete.rs | 8 +-- core/src/layers/concurrent_limit.rs | 5 +- core/src/layers/error_context.rs | 3 +- core/src/layers/immutable_index.rs | 4 +- core/src/layers/logging.rs | 7 +- core/src/layers/madsim.rs | 5 +- core/src/layers/metrics.rs | 4 +- core/src/layers/minitrace.rs | 5 +- core/src/layers/oteltrace.rs | 7 +- core/src/layers/prometheus.rs | 4 +- core/src/layers/prometheus_client.rs | 4 +- core/src/layers/retry.rs | 11 ++-- core/src/layers/throttle.rs | 4 +- core/src/layers/timeout.rs | 5 +- core/src/layers/tracing.rs | 5 +- core/src/layers/type_eraser.rs | 3 +- core/src/raw/accessor.rs | 9 ++- core/src/raw/adapters/kv/backend.rs | 5 +- core/src/raw/adapters/typed_kv/backend.rs | 5 +- core/src/raw/futures_util.rs | 8 +++ core/src/raw/http_util/client.rs | 50 ++++++++++++-- core/src/raw/http_util/multipart.rs | 2 - core/src/raw/layer.rs | 12 ++-- core/src/raw/mod.rs | 1 + core/src/raw/oio/list/flat_list.rs | 7 +- core/src/raw/oio/list/page_list.rs | 10 ++- core/src/raw/oio/read/file_read.rs | 9 ++- core/src/raw/oio/read/lazy_read.rs | 9 ++- core/src/raw/oio/read/range_read.rs | 13 ++-- core/src/raw/oio/stream/into_stream.rs | 84 ++++++++++++++++++------ core/src/raw/oio/write/compose_write.rs | 4 -- core/src/raw/oio/write/exact_buf_write.rs | 3 - core/src/raw/oio/write/multipart_upload_write.rs | 17 +++-- core/src/raw/oio/write/one_shot_write.rs | 1 - core/src/services/dbfs/reader.rs | 2 - core/src/services/fs/lister.rs | 2 - core/src/services/fs/writer.rs | 2 - core/src/services/ftp/lister.rs | 2 - core/src/services/s3/backend.rs | 8 ++- core/src/services/s3/lister.rs | 3 +- core/src/services/s3/writer.rs | 3 +- core/src/types/list.rs | 3 +- core/src/types/operator/operator_futures.rs | 7 +- core/tests/behavior/main.rs | 2 +- 52 files changed, 271 insertions(+), 144 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3d6d2c24c..5a83465fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -293,6 +293,20 @@ jobs: ) cargo build --features "${FEATURES[*]}" + # We only support s3 services for now, but we will extend wasm support for other services. + build_under_wasm: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Setup Rust toolchain + uses: ./.github/actions/setup + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + - name: Build + run: | + rustup add target wasm32-unknown-unknown + cargo cca --target wasm32-unknown-unknown --no-default-features --features=services-s3 + unit: runs-on: ubuntu-latest steps: diff --git a/Cargo.lock b/Cargo.lock index 16f4e2409..b68f19e69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2930,8 +2930,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -3525,11 +3527,12 @@ dependencies = [ [[package]] name = "jsonwebtoken" -version = "9.1.0" +version = "9.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "155c4d7e39ad04c172c5e3a99c434ea3b4a7ba7960b38ecd562b270b097cce09" +checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4" dependencies = [ "base64 0.21.5", + "js-sys", "pem 3.0.2", "ring 0.17.5", "serde", @@ -4657,6 +4660,7 @@ dependencies = [ "flagset", "foundationdb", "futures", + "getrandom 0.2.11", "governor", "hdrs", "hrana-client-proto", @@ -6102,15 +6106,16 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.14.4" +version = "0.14.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c032d9e40cab6e8d3083b522a5cb5222fe6640c8b15478f1d19ac126810e8f4a" +checksum = "dce87f66ba6c6acef277a729f989a0eca946cb9ce6a15bcc036bda0f72d4b9fd" dependencies = [ "anyhow", "async-trait", "base64 0.21.5", "chrono", "form_urlencoded", + "getrandom 0.2.11", "hex", "hmac", "home", diff --git a/core/Cargo.toml b/core/Cargo.toml index dac151d42..76084a8c3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -284,7 +284,7 @@ redis = { version = "0.23.1", features = [ "tokio-comp", "connection-manager", ], optional = true } -reqsign = { version = "0.14.4", default-features = false, optional = true } +reqsign = { version = "0.14.6", default-features = false, optional = true } reqwest = { version = "0.11.18", features = [ "stream", ], default-features = false } @@ -300,12 +300,15 @@ suppaftp = { version = "5.2", default-features = false, features = [ "async-rustls", ], optional = true } tikv-client = { version = "0.3.0", optional = true, default-features = false } -tokio = "1.27" +tokio = { version= "1.27",features = ["sync"] } tokio-postgres = { version = "0.7.8", optional = true } tracing = { version = "0.1", optional = true } uuid = { version = "1", features = ["serde", "v4"] } async-tls = { version = "0.12.0", optional = true } +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom = { version = "0.2", features = ["js"] } + [dev-dependencies] criterion = { version = "0.5", features = ["async", "async_tokio"] } dotenvy = "0.15" diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index 338587062..a1cb259f1 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -18,7 +18,6 @@ use std::task::Context; use std::task::Poll; -use async_trait::async_trait; use bytes::Bytes; use opendal::raw::oio; use rand::prelude::ThreadRng; @@ -27,7 +26,6 @@ use rand::RngCore; /// BlackHoleWriter will discard all data written to it so we can measure the buffer's cost. pub struct BlackHoleWriter; -#[async_trait] impl oio::Write for BlackHoleWriter { fn poll_write( &mut self, diff --git a/core/src/layers/async_backtrace.rs b/core/src/layers/async_backtrace.rs index cd363432e..46bc3afa4 100644 --- a/core/src/layers/async_backtrace.rs +++ b/core/src/layers/async_backtrace.rs @@ -58,7 +58,8 @@ pub struct AsyncBacktraceAccessor<A: Accessor> { inner: A, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for AsyncBacktraceAccessor<A> { type Inner = A; type Reader = A::Reader; diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs index 8d7755d2c..bd82bf236 100644 --- a/core/src/layers/await_tree.rs +++ b/core/src/layers/await_tree.rs @@ -66,7 +66,8 @@ pub struct AwaitTreeAccessor<A: Accessor> { inner: A, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for AwaitTreeAccessor<A> { type Inner = A; type Reader = A::Reader; diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index f1dbac2e0..bf2c0e540 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -170,7 +170,8 @@ pub struct BlockingAccessor<A: Accessor> { handle: Handle, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for BlockingAccessor<A> { type Inner = A; type Reader = A::Reader; diff --git a/core/src/layers/chaos.rs b/core/src/layers/chaos.rs index 08ec4dee2..0907503fd 100644 --- a/core/src/layers/chaos.rs +++ b/core/src/layers/chaos.rs @@ -99,7 +99,8 @@ pub struct ChaosAccessor<A> { error_ratio: f64, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> { type Inner = A; type Reader = ChaosReader<A::Reader>; diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index d41a4d6e0..d130149af 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -433,7 +433,8 @@ impl<A: Accessor> CompleteAccessor<A> { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for CompleteAccessor<A> { type Inner = A; type Reader = CompleteReader<A, A::Reader>; @@ -734,7 +735,6 @@ pub enum CompleteLister<A: Accessor, P> { NeedPrefix(PrefixLister<P>), } -#[async_trait] impl<A, P> oio::List for CompleteLister<A, P> where A: Accessor<Lister = P>, @@ -789,7 +789,6 @@ impl<W> Drop for CompleteWriter<W> { } } -#[async_trait] impl<W> oio::Write for CompleteWriter<W> where W: oio::Write, @@ -865,7 +864,8 @@ mod tests { capability: Capability, } - #[async_trait] + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Accessor for MockService { type Reader = oio::Reader; type BlockingReader = oio::BlockingReader; diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index cd82aa650..e37b74ce0 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -79,7 +79,8 @@ pub struct ConcurrentLimitAccessor<A: Accessor> { semaphore: Arc<Semaphore>, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> { type Inner = A; type Reader = ConcurrentLimitWrapper<A::Reader>; @@ -283,7 +284,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> { } } -#[async_trait] impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner.poll_write(cx, bs) @@ -308,7 +308,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> { } } -#[async_trait] impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { self.inner.poll_next(cx) diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 3baed1be5..9a3a0130e 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -63,7 +63,8 @@ impl<A: Accessor> Debug for ErrorContextAccessor<A> { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> { type Inner = A; type Reader = ErrorContextWrapper<A::Reader>; diff --git a/core/src/layers/immutable_index.rs b/core/src/layers/immutable_index.rs index 993c7eba7..eea8a343c 100644 --- a/core/src/layers/immutable_index.rs +++ b/core/src/layers/immutable_index.rs @@ -135,7 +135,8 @@ impl<A: Accessor> ImmutableIndexAccessor<A> { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> { type Inner = A; type Reader = A::Reader; @@ -231,7 +232,6 @@ impl ImmutableDir { } } -#[async_trait] impl oio::List for ImmutableDir { fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { Poll::Ready(Ok(self.inner_next())) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 6d48dcbd0..5dab01287 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -211,7 +211,8 @@ pub struct LoggingAccessor<A: Accessor> { static LOGGING_TARGET: &str = "opendal::services"; -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> { type Inner = A; type Reader = LoggingReader<A::Reader>; @@ -1265,7 +1266,6 @@ impl<W> LoggingWriter<W> { } } -#[async_trait] impl<W: oio::Write> oio::Write for LoggingWriter<W> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { match ready!(self.inner.poll_write(cx, bs)) { @@ -1474,7 +1474,8 @@ impl<P> Drop for LoggingLister<P> { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<P: oio::List> oio::List for LoggingLister<P> { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { let res = ready!(self.inner.poll_next(cx)); diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 1b5b602aa..3d68bff90 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -140,7 +140,8 @@ pub struct MadsimAccessor { addr: SocketAddr, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl LayeredAccessor for MadsimAccessor { type Inner = (); type Reader = MadsimReader; @@ -295,7 +296,6 @@ pub struct MadsimWriter { addr: SocketAddr, } -#[async_trait] impl oio::Write for MadsimWriter { fn poll_write( &mut self, @@ -331,7 +331,6 @@ impl oio::Write for MadsimWriter { pub struct MadsimLister {} -#[async_trait] impl oio::List for MadsimLister { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<Option<oio::Entry>>> { Poll::Ready(Err(Error::new( diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index b9c67b54e..4e18512cc 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -405,7 +405,8 @@ impl<A: Accessor> Debug for MetricsAccessor<A> { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for MetricsAccessor<A> { type Inner = A; type Reader = MetricWrapper<A::Reader>; @@ -845,7 +846,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> { } } -#[async_trait] impl<R: oio::Write> oio::Write for MetricWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 0af413940..b49ad74cf 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -129,7 +129,8 @@ pub struct MinitraceAccessor<A> { inner: A, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> { type Inner = A; type Reader = MinitraceWrapper<A::Reader>; @@ -335,7 +336,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> { } } -#[async_trait] impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let _g = self.span.set_local_parent(); @@ -370,7 +370,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> { } } -#[async_trait] impl<R: oio::List> oio::List for MinitraceWrapper<R> { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { let _g = self.span.set_local_parent(); diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index bddb0b30f..3f8754c48 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -66,7 +66,8 @@ pub struct OtelTraceAccessor<A> { inner: A, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for OtelTraceAccessor<A> { type Inner = A; type Reader = OtelTraceWrapper<A::Reader>; @@ -304,7 +305,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> { } } -#[async_trait] impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner.poll_write(cx, bs) @@ -329,7 +329,8 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<R: oio::List> oio::List for OtelTraceWrapper<R> { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { self.inner.poll_next(cx) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 41b7bfc68..d5426c883 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -268,7 +268,8 @@ impl<A: Accessor> Debug for PrometheusAccessor<A> { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> { type Inner = A; type Reader = PrometheusMetricWrapper<A::Reader>; @@ -790,7 +791,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { } } -#[async_trait] impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let labels = self.stats.generate_metric_label( diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index a43d24f0a..eb44d8b4a 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -208,7 +208,8 @@ impl<A: Accessor> Debug for PrometheusAccessor<A> { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> { type Inner = A; type Reader = PrometheusMetricWrapper<A::Reader>; @@ -617,7 +618,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { } } -#[async_trait] impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { self.inner diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 8b0a44bb8..4219f445a 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -277,7 +277,8 @@ impl<A: Accessor, I: RetryInterceptor> Debug for RetryAccessor<A, I> { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor, I: RetryInterceptor> LayeredAccessor for RetryAccessor<A, I> { type Inner = A; type Reader = RetryWrapper<A::Reader, I>; @@ -870,7 +871,6 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapp } } -#[async_trait] impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { if let Some(sleep) = self.sleep.as_mut() { @@ -1049,7 +1049,8 @@ impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWra } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { if let Some(sleep) = self.sleep.as_mut() { @@ -1159,7 +1160,8 @@ mod tests { attempt: Arc<Mutex<usize>>, } - #[async_trait] + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Accessor for MockService { type Reader = MockReader; type BlockingReader = (); @@ -1314,7 +1316,6 @@ mod tests { attempt: usize, } - #[async_trait] impl oio::List for MockLister { fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { self.attempt += 1; diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index ea3da1cdb..8d4b93755 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -112,7 +112,8 @@ pub struct ThrottleAccessor<A: Accessor> { rate_limiter: SharedRateLimiter, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for ThrottleAccessor<A> { type Inner = A; type Reader = ThrottleWrapper<A::Reader>; @@ -213,7 +214,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> { } } -#[async_trait] impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap(); diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 8f04c3363..cae6ee918 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -139,7 +139,8 @@ pub struct TimeoutAccessor<A: Accessor> { speed: u64, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for TimeoutAccessor<A> { type Inner = A; type Reader = TimeoutWrapper<A::Reader>; @@ -322,7 +323,6 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { } } -#[async_trait] impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { match self.start { @@ -415,7 +415,6 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { } } -#[async_trait] impl<R: oio::List> oio::List for TimeoutWrapper<R> { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { match self.start { diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 74cb89a55..377b1e2a8 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -130,7 +130,8 @@ pub struct TracingAccessor<A> { inner: A, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for TracingAccessor<A> { type Inner = A; type Reader = TracingWrapper<A::Reader>; @@ -318,7 +319,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for TracingWrapper<R> { } } -#[async_trait] impl<R: oio::Write> oio::Write for TracingWrapper<R> { #[tracing::instrument( parent = &self.span, @@ -363,7 +363,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> { } } -#[async_trait] impl<R: oio::List> oio::List for TracingWrapper<R> { #[tracing::instrument(parent = &self.span, level = "debug", skip_all)] fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { diff --git a/core/src/layers/type_eraser.rs b/core/src/layers/type_eraser.rs index 68cb24859..37a0444ec 100644 --- a/core/src/layers/type_eraser.rs +++ b/core/src/layers/type_eraser.rs @@ -52,7 +52,8 @@ impl<A: Accessor> Debug for TypeEraseAccessor<A> { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> { type Inner = A; type Reader = oio::Reader; diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs index 5ff1ab802..4d78b153a 100644 --- a/core/src/raw/accessor.rs +++ b/core/src/raw/accessor.rs @@ -53,7 +53,8 @@ use crate::*; /// - Operations with capability requirement like `presign` are optional operations. /// - Services can implement them based on services capabilities. /// - The default implementation should return [`ErrorKind::Unsupported`]. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// Reader is the associated reader the could return in `read` operation. type Reader: oio::Read; @@ -364,7 +365,8 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { } /// Dummy implementation of accessor. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Accessor for () { type Reader = (); type BlockingReader = (); @@ -386,7 +388,8 @@ impl Accessor for () { /// All functions in `Accessor` only requires `&self`, so it's safe to implement /// `Accessor` for `Arc<dyn Accessor>`. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<T: Accessor + ?Sized> Accessor for Arc<T> { type Reader = T::Reader; type BlockingReader = T::BlockingReader; diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index a05b1da67..2f1c82da8 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -64,7 +64,8 @@ where } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<S: Adapter> Accessor for Backend<S> { type Reader = oio::Cursor; type BlockingReader = oio::Cursor; @@ -247,7 +248,6 @@ impl KvLister { } } -#[async_trait] impl oio::List for KvLister { fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { Poll::Ready(Ok(self.inner_next())) @@ -289,7 +289,6 @@ enum Buffer { /// We will only take `&mut Self` reference for KvWriter. unsafe impl<S: Adapter> Sync for KvWriter<S> {} -#[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { if self.future.is_some() { diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index b02c99ec5..357f9b55f 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -58,7 +58,8 @@ where } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<S: Adapter> Accessor for Backend<S> { type Reader = oio::Cursor; type BlockingReader = oio::Cursor; @@ -251,7 +252,6 @@ impl KvLister { } } -#[async_trait] impl oio::List for KvLister { fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { Poll::Ready(Ok(self.inner_next())) @@ -311,7 +311,6 @@ impl<S> KvWriter<S> { } } -#[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { if self.future.is_some() { diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs index 46bab650c..318abe4ec 100644 --- a/core/src/raw/futures_util.rs +++ b/core/src/raw/futures_util.rs @@ -22,6 +22,14 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +/// BoxedFuture is the type alias of [`futures::future::BoxFuture`]. +/// +/// We will switch to [`futures::future::LocalBoxFuture`] on wasm32 target. +#[cfg(not(target_arch = "wasm32"))] +pub type BoxedFuture<T> = futures::future::BoxFuture<'static, T>; +#[cfg(target_arch = "wasm32")] +pub type BoxedFuture<T> = futures::future::LocalBoxFuture<'static, T>; + /// CONCURRENT_LARGE_THRESHOLD is the threshold to determine whether to use /// [`FuturesOrdered`] or not. /// diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 9ac5ac4e2..3bd7e826c 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -55,6 +55,7 @@ impl HttpClient { } /// Build a new http client in async context. + #[cfg(not(target_arch = "wasm32"))] pub fn build(mut builder: reqwest::ClientBuilder) -> Result<Self> { // Make sure we don't enable auto gzip decompress. builder = builder.no_gzip(); @@ -75,6 +76,16 @@ impl HttpClient { }) } + /// Build a new http client in async context. + #[cfg(target_arch = "wasm32")] + pub fn build(mut builder: reqwest::ClientBuilder) -> Result<Self> { + Ok(Self { + client: builder.build().map_err(|err| { + Error::new(ErrorKind::Unexpected, "async client build failed").set_source(err) + })?, + }) + } + /// Get the async client from http client. pub fn client(&self) -> reqwest::Client { self.client.clone() @@ -95,14 +106,39 @@ impl HttpClient { parts.method, reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"), ) - .version(parts.version) .headers(parts.headers); + // Client under wasm doesn't support set version. + #[cfg(not(target_arch = "wasm32"))] + { + req_builder = req_builder.version(parts.version); + } + req_builder = match body { AsyncBody::Empty => req_builder.body(reqwest::Body::from("")), AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)), - AsyncBody::ChunkedBytes(bs) => req_builder.body(reqwest::Body::wrap_stream(bs)), - AsyncBody::Stream(s) => req_builder.body(reqwest::Body::wrap_stream(s)), + AsyncBody::ChunkedBytes(bs) => { + #[cfg(not(target_arch = "wasm32"))] + { + req_builder.body(reqwest::Body::wrap_stream(bs)) + } + #[cfg(target_arch = "wasm32")] + { + let bs = oio::WriteBuf::bytes(&bs, bs.len()); + req_builder.body(reqwest::Body::from(bs)) + } + } + AsyncBody::Stream(s) => { + #[cfg(not(target_arch = "wasm32"))] + { + req_builder.body(reqwest::Body::wrap_stream(s)) + } + #[cfg(target_arch = "wasm32")] + { + let bs = oio::StreamExt::collect(s).await?; + req_builder.body(reqwest::Body::from(bs)) + } + } }; let mut resp = req_builder.send().await.map_err(|err| { @@ -139,11 +175,17 @@ impl HttpClient { }; let mut hr = Response::builder() - .version(resp.version()) .status(resp.status()) // Insert uri into response extension so that we can fetch // it later. .extension(uri.clone()); + + // Response builder under wasm doesn't support set version. + #[cfg(not(target_arch = "wasm32"))] + { + hr = hr.version(resp.version()); + } + // Swap headers directly instead of copy the entire map. mem::swap(hr.headers_mut().unwrap(), resp.headers_mut()); diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index 7f07fec35..81e6db714 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -22,7 +22,6 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use async_trait::async_trait; use bytes::Bytes; use bytes::BytesMut; use futures::stream; @@ -329,7 +328,6 @@ pub struct FormDataPartStream { content: Option<Streamer>, } -#[async_trait] impl Stream for FormDataPartStream { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { if let Some(pre_content) = self.pre_content.take() { diff --git a/core/src/raw/layer.rs b/core/src/raw/layer.rs index da21eca64..1aac99005 100644 --- a/core/src/raw/layer.rs +++ b/core/src/raw/layer.rs @@ -54,7 +54,8 @@ use crate::*; /// inner: A, /// } /// -/// #[async_trait] +/// #[cfg_attr(not(target_arch = "wasm32"), async_trait)] +/// #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] /// impl<A: Accessor> LayeredAccessor for TraceAccessor<A> { /// type Inner = A; /// type Reader = A::Reader; @@ -129,7 +130,8 @@ pub trait Layer<A: Accessor> { /// LayeredAccessor is layered accessor that forward all not implemented /// method to inner. #[allow(missing_docs)] -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { type Inner: Accessor; type Reader: oio::Read; @@ -206,7 +208,8 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)>; } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<L: LayeredAccessor> Accessor for L { type Reader = L::Reader; type BlockingReader = L::BlockingReader; @@ -319,7 +322,8 @@ mod tests { } } - #[async_trait::async_trait] + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl<A: Accessor> Accessor for Test<A> { type Reader = (); type BlockingReader = (); diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index 7ce67834c..0d4b1d6d3 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -63,6 +63,7 @@ mod std_io_util; pub use std_io_util::*; mod futures_util; +pub use futures_util::BoxedFuture; pub use futures_util::ConcurrentFutures; // Expose as a pub mod to avoid confusing. diff --git a/core/src/raw/oio/list/flat_list.rs b/core/src/raw/oio/list/flat_list.rs index c574d064e..acc2749f2 100644 --- a/core/src/raw/oio/list/flat_list.rs +++ b/core/src/raw/oio/list/flat_list.rs @@ -19,14 +19,13 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use futures::future::BoxFuture; use futures::FutureExt; use crate::raw::*; use crate::*; /// ListFuture is the future returned while calling async list. -type ListFuture<A, L> = BoxFuture<'static, (A, oio::Entry, Result<(RpList, L)>)>; +type ListFuture<A, L> = BoxedFuture<(A, oio::Entry, Result<(RpList, L)>)>; /// FlatLister will walk dir in bottom up way: /// @@ -73,6 +72,10 @@ pub struct FlatLister<A: Accessor, L> { list_future: Option<ListFuture<A, L>>, } +/// # Safety +/// +/// wasm32 is a special target that we only have one event-loop for this FlatLister. +unsafe impl<A: Accessor, L> Send for FlatLister<A, L> {} /// # Safety /// /// We will only take `&mut Self` reference for FsLister. diff --git a/core/src/raw/oio/list/page_list.rs b/core/src/raw/oio/list/page_list.rs index eb98a98bd..d054baba2 100644 --- a/core/src/raw/oio/list/page_list.rs +++ b/core/src/raw/oio/list/page_list.rs @@ -21,7 +21,6 @@ use std::task::Context; use std::task::Poll; use async_trait::async_trait; -use futures::future::BoxFuture; use crate::raw::*; use crate::*; @@ -36,7 +35,8 @@ use crate::*; /// - Services impl `PageList` /// - `PageLister` impl `List` /// - Expose `PageLister` as `Accessor::Lister` -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait PageList: Send + Sync + Unpin + 'static { /// next_page is used to fetch next page of entries from underlying storage. async fn next_page(&self, ctx: &mut PageContext) -> Result<()>; @@ -72,9 +72,13 @@ pub struct PageLister<L: PageList> { enum State<L> { Idle(Option<(L, PageContext)>), - Fetch(BoxFuture<'static, ((L, PageContext), Result<()>)>), + Fetch(BoxedFuture<((L, PageContext), Result<()>)>), } +/// # Safety +/// +/// wasm32 is a special target that we only have one event-loop for this state. +unsafe impl<L: PageList> Send for State<L> {} /// # Safety /// /// We will only take `&mut Self` reference for State. diff --git a/core/src/raw/oio/read/file_read.rs b/core/src/raw/oio/read/file_read.rs index fd8a53775..8db1c7cf7 100644 --- a/core/src/raw/oio/read/file_read.rs +++ b/core/src/raw/oio/read/file_read.rs @@ -24,7 +24,6 @@ use std::task::Context; use std::task::Poll; use bytes::Bytes; -use futures::future::BoxFuture; use futures::Future; use crate::raw::*; @@ -53,10 +52,14 @@ pub struct FileReader<A: Accessor, R> { enum State<R> { Idle, - Send(BoxFuture<'static, Result<(RpRead, R)>>), + Send(BoxedFuture<Result<(RpRead, R)>>), Read(R), } +/// # Safety +/// +/// wasm32 is a special target that we only have one event-loop for this state. +unsafe impl<R> Send for State<R> {} /// Safety: State will only be accessed under &mut. unsafe impl<R> Sync for State<R> {} @@ -90,7 +93,7 @@ where A: Accessor<Reader = R>, R: oio::Read, { - fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> { + fn read_future(&self) -> BoxedFuture<Result<(RpRead, R)>> { let acc = self.acc.clone(); let path = self.path.clone(); diff --git a/core/src/raw/oio/read/lazy_read.rs b/core/src/raw/oio/read/lazy_read.rs index f6237f656..84ba5b55a 100644 --- a/core/src/raw/oio/read/lazy_read.rs +++ b/core/src/raw/oio/read/lazy_read.rs @@ -23,7 +23,6 @@ use std::task::Context; use std::task::Poll; use bytes::Bytes; -use futures::future::BoxFuture; use futures::Future; use crate::raw::*; @@ -41,10 +40,14 @@ pub struct LazyReader<A: Accessor, R> { enum State<R> { Idle, - Send(BoxFuture<'static, Result<(RpRead, R)>>), + Send(BoxedFuture<Result<(RpRead, R)>>), Read(R), } +/// # Safety +/// +/// wasm32 is a special target that we only have one event-loop for this state. +unsafe impl<R> Send for State<R> {} /// Safety: State will only be accessed under &mut. unsafe impl<R> Sync for State<R> {} @@ -69,7 +72,7 @@ where A: Accessor<Reader = R>, R: oio::Read, { - fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> { + fn read_future(&self) -> BoxedFuture<Result<(RpRead, R)>> { let acc = self.acc.clone(); let path = self.path.clone(); let op = self.op.clone(); diff --git a/core/src/raw/oio/read/range_read.rs b/core/src/raw/oio/read/range_read.rs index b267402d2..4b46f47ac 100644 --- a/core/src/raw/oio/read/range_read.rs +++ b/core/src/raw/oio/read/range_read.rs @@ -24,7 +24,6 @@ use std::task::Context; use std::task::Poll; use bytes::Bytes; -use futures::future::BoxFuture; use crate::raw::*; use crate::*; @@ -51,11 +50,15 @@ pub struct RangeReader<A: Accessor, R> { enum State<R> { Idle, - SendStat(BoxFuture<'static, Result<RpStat>>), - SendRead(BoxFuture<'static, Result<(RpRead, R)>>), + SendStat(BoxedFuture<Result<RpStat>>), + SendRead(BoxedFuture<Result<(RpRead, R)>>), Read(R), } +/// # Safety +/// +/// wasm32 is a special target that we only have one event-loop for this state. +unsafe impl<R> Send for State<R> {} /// Safety: State will only be accessed under &mut. unsafe impl<R> Sync for State<R> {} @@ -192,7 +195,7 @@ where A: Accessor<Reader = R>, R: oio::Read, { - fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> { + fn read_future(&self) -> BoxedFuture<Result<(RpRead, R)>> { let acc = self.acc.clone(); let path = self.path.clone(); @@ -208,7 +211,7 @@ where Box::pin(async move { acc.read(&path, op).await }) } - fn stat_future(&self) -> BoxFuture<'static, Result<RpStat>> { + fn stat_future(&self) -> BoxedFuture<Result<RpStat>> { let acc = self.acc.clone(); let path = self.path.clone(); diff --git a/core/src/raw/oio/stream/into_stream.rs b/core/src/raw/oio/stream/into_stream.rs index 2b2197437..d5a57970c 100644 --- a/core/src/raw/oio/stream/into_stream.rs +++ b/core/src/raw/oio/stream/into_stream.rs @@ -15,32 +15,74 @@ // specific language governing permissions and limitations // under the License. -use std::task::Context; -use std::task::Poll; +#[cfg(not(target_arch = "wasm32"))] +pub use non_wasm32_impl::*; -use bytes::Bytes; -use futures::TryStreamExt; +#[cfg(not(target_arch = "wasm32"))] +mod non_wasm32_impl { + use crate::raw::oio; + use bytes::Bytes; + use futures::TryStreamExt; + use std::task::{Context, Poll}; -use crate::raw::*; -use crate::*; + /// Convert given futures stream into [`oio::Stream`]. + pub fn into_stream<S>(stream: S) -> IntoStream<S> + where + S: futures::Stream<Item = crate::Result<Bytes>> + Send + Sync + Unpin, + { + IntoStream { inner: stream } + } -/// Convert given futures stream into [`oio::Stream`]. -pub fn into_stream<S>(stream: S) -> IntoStream<S> -where - S: futures::Stream<Item = Result<Bytes>> + Send + Sync + Unpin, -{ - IntoStream { inner: stream } -} + pub struct IntoStream<S> { + inner: S, + } -pub struct IntoStream<S> { - inner: S, + impl<S> oio::Stream for IntoStream<S> + where + S: futures::Stream<Item = crate::Result<Bytes>> + Send + Sync + Unpin, + { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { + self.inner.try_poll_next_unpin(cx) + } + } } -impl<S> oio::Stream for IntoStream<S> -where - S: futures::Stream<Item = Result<Bytes>> + Send + Sync + Unpin, -{ - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { - self.inner.try_poll_next_unpin(cx) +#[cfg(target_arch = "wasm32")] +pub use wasm32_impl::*; +#[cfg(target_arch = "wasm32")] +mod wasm32_impl { + use crate::raw::oio; + use bytes::Bytes; + use futures::TryStreamExt; + use std::task::{Context, Poll}; + + /// Convert given futures stream into [`oio::Stream`]. + pub fn into_stream<S>(stream: S) -> IntoStream<S> + where + S: futures::Stream<Item = crate::Result<Bytes>> + Unpin, + { + IntoStream { inner: stream } + } + + pub struct IntoStream<S> { + inner: S, + } + + /// # Safety + /// + /// wasm32 is a special target that we only have one event-loop for this stream. + unsafe impl<S> Send for IntoStream<S> {} + /// # Safety + /// + /// IntoStream only has mutable references. + unsafe impl<S> Sync for IntoStream<S> {} + + impl<S> oio::Stream for IntoStream<S> + where + S: futures::Stream<Item = crate::Result<Bytes>> + Unpin, + { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { + self.inner.try_poll_next_unpin(cx) + } } } diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index e651aa714..6572eecf8 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -41,8 +41,6 @@ use std::task::Context; use std::task::Poll; -use async_trait::async_trait; - use crate::raw::*; use crate::*; @@ -56,7 +54,6 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> { Two(TWO), } -#[async_trait] impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { match self { @@ -92,7 +89,6 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> { Three(THREE), } -#[async_trait] impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write for ThreeWaysWriter<ONE, TWO, THREE> { diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 122e342c1..3f4dd7ade 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -19,8 +19,6 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use async_trait::async_trait; - use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -55,7 +53,6 @@ impl<W: oio::Write> ExactBufWriter<W> { } } -#[async_trait] impl<W: oio::Write> oio::Write for ExactBufWriter<W> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> Poll<Result<usize>> { if self.buffer.len() >= self.buffer_size { diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index f95cf64f1..a0f92aa98 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -21,7 +21,6 @@ use std::task::Context; use std::task::Poll; use async_trait::async_trait; -use futures::future::BoxFuture; use crate::raw::*; use crate::*; @@ -48,7 +47,8 @@ use crate::*; /// ``` /// /// We will use `write_once` instead of starting a new multipart upload. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait MultipartUploadWrite: Send + Sync + Unpin + 'static { /// write_once is used to write the data to underlying storage at once. /// @@ -115,12 +115,16 @@ pub struct MultipartUploadWriter<W: MultipartUploadWrite> { enum State<W> { Idle(Option<W>), - Init(BoxFuture<'static, (W, Result<String>)>), - Write(BoxFuture<'static, (W, Result<MultipartUploadPart>)>), - Close(BoxFuture<'static, (W, Result<()>)>), - Abort(BoxFuture<'static, (W, Result<()>)>), + Init(BoxedFuture<(W, Result<String>)>), + Write(BoxedFuture<(W, Result<MultipartUploadPart>)>), + Close(BoxedFuture<(W, Result<()>)>), + Abort(BoxedFuture<(W, Result<()>)>), } +/// # Safety +/// +/// wasm32 is a special target that we only have one event-loop for this state. +unsafe impl<S: MultipartUploadWrite> Send for State<S> {} /// # Safety /// /// We will only take `&mut Self` reference for State. @@ -139,7 +143,6 @@ impl<W: MultipartUploadWrite> MultipartUploadWriter<W> { } } -#[async_trait] impl<W> oio::Write for MultipartUploadWriter<W> where W: MultipartUploadWrite, diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index a015b98a1..85357d608 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -65,7 +65,6 @@ impl<W: OneShotWrite> OneShotWriter<W> { } } -#[async_trait] impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { match &mut self.state { diff --git a/core/src/services/dbfs/reader.rs b/core/src/services/dbfs/reader.rs index 1cc56ea89..e9fe13de3 100644 --- a/core/src/services/dbfs/reader.rs +++ b/core/src/services/dbfs/reader.rs @@ -22,7 +22,6 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use async_trait::async_trait; use base64::engine::general_purpose; use base64::Engine; use bytes::BufMut; @@ -94,7 +93,6 @@ enum State { /// We will only take `&mut Self` reference for DbfsReader. unsafe impl Sync for DbfsReader {} -#[async_trait] impl oio::Read for DbfsReader { fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll<Result<usize>> { while self.has_filled as usize != buf.len() { diff --git a/core/src/services/fs/lister.rs b/core/src/services/fs/lister.rs index c101c08d6..29ec2c9f3 100644 --- a/core/src/services/fs/lister.rs +++ b/core/src/services/fs/lister.rs @@ -22,7 +22,6 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use async_trait::async_trait; use futures::future::BoxFuture; use futures::FutureExt; @@ -55,7 +54,6 @@ impl<P> FsLister<P> { /// We will only take `&mut Self` reference for FsLister. unsafe impl<P> Sync for FsLister<P> {} -#[async_trait] impl oio::List for FsLister<tokio::fs::ReadDir> { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { if let Some(fut) = self.fut.as_mut() { diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index bd41f5611..12e5a4fff 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -22,7 +22,6 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use async_trait::async_trait; use futures::future::BoxFuture; use futures::FutureExt; use tokio::io::AsyncWrite; @@ -56,7 +55,6 @@ impl<F> FsWriter<F> { /// We will only take `&mut Self` reference for FsWriter. unsafe impl<F> Sync for FsWriter<F> {} -#[async_trait] impl oio::Write for FsWriter<tokio::fs::File> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { let f = self.f.as_mut().expect("FsWriter must be initialized"); diff --git a/core/src/services/ftp/lister.rs b/core/src/services/ftp/lister.rs index 099fee930..4d77bf1b8 100644 --- a/core/src/services/ftp/lister.rs +++ b/core/src/services/ftp/lister.rs @@ -21,7 +21,6 @@ use std::task::Context; use std::task::Poll; use std::vec::IntoIter; -use async_trait::async_trait; use suppaftp::list::File; use crate::raw::*; @@ -41,7 +40,6 @@ impl FtpLister { } } -#[async_trait] impl oio::List for FtpLister { fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { let de = match self.file_iter.next() { diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 47260c506..5795fd40f 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -860,7 +860,10 @@ impl Builder for S3Builder { // This is our current config. let mut cfg = AwsConfig::default(); if !self.config.disable_config_load { - cfg = cfg.from_profile(); + #[cfg(not(target_arch = "wasm32"))] + { + cfg = cfg.from_profile(); + } cfg = cfg.from_env(); } @@ -976,7 +979,8 @@ pub struct S3Backend { core: Arc<S3Core>, } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Accessor for S3Backend { type Reader = IncomingAsyncBody; type BlockingReader = (); diff --git a/core/src/services/s3/lister.rs b/core/src/services/s3/lister.rs index 5584f5372..f0712e650 100644 --- a/core/src/services/s3/lister.rs +++ b/core/src/services/s3/lister.rs @@ -60,7 +60,8 @@ impl S3Lister { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::PageList for S3Lister { async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index fa8505c37..22ecc041a 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -44,7 +44,8 @@ impl S3Writer { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl oio::MultipartUploadWrite for S3Writer { async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self diff --git a/core/src/types/list.rs b/core/src/types/list.rs index 5f1c3f60e..2f3860930 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -23,7 +23,6 @@ use std::task::Context; use std::task::Poll; use flagset::FlagSet; -use futures::future::BoxFuture; use futures::Stream; use futures::StreamExt; @@ -98,7 +97,7 @@ enum StatTask { /// Stating is used to store the join handle of spawned task. /// /// TODO: Replace with static future type after rust supported. - Stating(BoxFuture<'static, (String, Result<Metadata>)>), + Stating(BoxedFuture<(String, Result<Metadata>)>), /// Known is used to store the entry that already contains the required metakey. Known(Option<(String, Metadata)>), } diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index bcf6b720f..76b42e84e 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -28,7 +28,6 @@ use std::time::Duration; use bytes::Bytes; use flagset::FlagSet; -use futures::future::BoxFuture; use futures::Future; use futures::FutureExt; @@ -48,10 +47,10 @@ pub(crate) enum OperatorFuture<T, F> { /// The input args T, /// The function which will move all the args and return a static future - fn(FusedAccessor, String, T) -> BoxFuture<'static, Result<F>>, + fn(FusedAccessor, String, T) -> BoxedFuture<Result<F>>, ), /// Polling state, waiting for the future to be ready - Poll(BoxFuture<'static, Result<F>>), + Poll(BoxedFuture<Result<F>>), /// Empty state, the future has been polled and completed or /// something is broken during state switch. Empty, @@ -62,7 +61,7 @@ impl<T, F> OperatorFuture<T, F> { inner: FusedAccessor, path: String, args: T, - f: fn(FusedAccessor, String, T) -> BoxFuture<'static, Result<F>>, + f: fn(FusedAccessor, String, T) -> BoxedFuture<Result<F>>, ) -> Self { OperatorFuture::Idle(inner, path, args, f) } diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index 06721cc4d..a47d17c6e 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -54,7 +54,7 @@ use blocking_list::behavior_blocking_list_tests; use blocking_read_only::behavior_blocking_read_only_tests; use blocking_rename::behavior_blocking_rename_tests; use blocking_write::behavior_blocking_write_tests; -// External dependences +// External dependencies use libtest_mimic::Arguments; use libtest_mimic::Trial; use opendal::raw::tests::init_test_service;
