This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 7bc8f73da5438d6831b2c7df41c4ebf77ea9073a Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 16:33:42 2023 +0800 Save Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/concurrent_limit.rs | 6 +++--- core/src/raw/oio/write/api.rs | 4 ++-- core/src/services/dropbox/writer.rs | 1 + core/src/services/gdrive/writer.rs | 1 + core/src/services/ipmfs/writer.rs | 1 + core/src/services/onedrive/writer.rs | 1 + core/src/services/supabase/writer.rs | 1 + core/src/services/vercel_artifacts/writer.rs | 1 + core/src/services/wasabi/writer.rs | 1 + core/src/services/webhdfs/writer.rs | 1 + core/src/types/writer.rs | 6 +++--- 11 files changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index ef620e037..b09408d45 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -286,15 +286,15 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - self.inner.write(bs).await + self.inner.poll_write(cx, bs) } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.abort().await + self.inner.poll_abort(cx) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.inner.close().await + self.inner.poll_close(cx) } } diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index 76b534888..41bd93491 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -181,7 +181,7 @@ impl<W> Future for AbortFuture<'_, W> where W: Write + Unpin + ?Sized, { - type Output = Result<usize>; + type Output = Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { let this = self.project(); @@ -199,7 +199,7 @@ impl<W> Future for CloseFuture<'_, W> where W: Write + Unpin + ?Sized, { - type Output = Result<usize>; + type Output = Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { let this = self.project(); diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 5e3e19a92..3a5c6cdd7 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -38,6 +38,7 @@ impl DropboxWriter { } } +#[async_trait] impl oio::OneShotWrite for DropboxWriter { async fn write_once(&self, bs: Bytes) -> Result<()> { let size = bs.len(); diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index f2f0b4e1c..2b0ac8c55 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -87,6 +87,7 @@ impl GdriveWriter { } } +#[async_trait] impl oio::OneShotWrite for GdriveWriter { async fn write_once(&self, bs: Bytes) -> Result<()> { let size = bs.len(); diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index e3395dfaf..0d1673693 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -37,6 +37,7 @@ impl IpmfsWriter { } } +#[async_trait] impl oio::OneShotWrite for IpmfsWriter { async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { let size = bs.remaining(); diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 76fc734d1..727d8f7d3 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -44,6 +44,7 @@ impl OneDriveWriter { } } +#[async_trait] impl oio::OneShotWrite for OneDriveWriter { async fn write_once(&self, bs: Bytes) -> Result<()> { let size = bs.len(); diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index 3b692fd08..52292a737 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -44,6 +44,7 @@ impl SupabaseWriter { } } +#[async_trait] impl oio::OneShotWrite for SupabaseWriter { async fn write_once(&self, bs: Bytes) -> Result<()> { let size = bs.len(); diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 596c933cf..61e5681e0 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -37,6 +37,7 @@ impl VercelArtifactsWriter { } } +#[async_trait] impl oio::OneShotWrite for VercelArtifactsWriter { async fn write_once(&self, bs: Bytes) -> Result<()> { let size = bs.len(); diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 685848084..254810aa9 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -40,6 +40,7 @@ impl WasabiWriter { } } +#[async_trait] impl oio::OneShotWrite for WasabiWriter { async fn write_once(&self, bs: Bytes) -> Result<()> { let size = bs.len(); diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 22901b496..b323c0173 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -37,6 +37,7 @@ impl WebhdfsWriter { } } +#[async_trait] impl oio::OneShotWrite for WebhdfsWriter { async fn write_once(&self, bs: Bytes) -> Result<()> { let size = bs.len(); diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 77b88899e..9ab2b69dd 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -28,8 +28,8 @@ use futures::AsyncWrite; use futures::FutureExt; use futures::TryStreamExt; -use crate::raw::oio::Write; use crate::raw::oio::WriteBuf; +use crate::raw::oio::{Write, WriteExt}; use crate::raw::*; use crate::*; @@ -205,7 +205,7 @@ impl Writer { /// /// Abort should only be called when the writer is not closed or /// aborted, otherwise an unexpected error could be returned. - pub fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + pub async fn abort(&mut self) -> Result<()> { if let State::Idle(Some(w)) = &mut self.state { w.abort().await } else { @@ -222,7 +222,7 @@ impl Writer { /// /// Close should only be called when the writer is not closed or /// aborted, otherwise an unexpected error could be returned. - pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + pub async fn close(&mut self) -> Result<()> { if let State::Idle(Some(w)) = &mut self.state { w.close().await } else {
