This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit fdd7bb002a1f46e65336d44db7bb85d3668963c1 Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 17:06:05 2023 +0800 Save Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/blocking.rs | 7 +++++-- core/src/raw/adapters/kv/backend.rs | 5 ++--- core/src/raw/adapters/typed_kv/backend.rs | 5 ++--- core/src/types/operator/operator.rs | 1 + 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 6beb8881b..7dd2d189c 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -18,6 +18,7 @@ use async_trait::async_trait; use bytes; use bytes::Bytes; +use futures::future::poll_fn; use tokio::runtime::Handle; use crate::raw::oio::ReadExt; @@ -197,11 +198,13 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> { impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { - self.handle.block_on(self.inner.write(bs)) + self.handle + .block_on(poll_fn(|cx| self.inner.poll_write(cx, bs))) } fn close(&mut self) -> Result<()> { - self.handle.block_on(self.inner.close()) + self.handle + .block_on(poll_fn(|cx| self.inner.poll_close(cx))) } } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 48c327b0a..b664a76a9 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -20,6 +20,7 @@ use std::task::{ready, Context, Poll}; use async_trait::async_trait; use futures::future::BoxFuture; +use futures::FutureExt; use super::Adapter; use crate::raw::*; @@ -445,9 +446,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> { None => return Poll::Ready(Ok(())), }; - let fut = async move { - kv.set(&path, &buf).await?; - }; + let fut = async move { kv.set(&path, &buf).await }; self.future = Some(Box::pin(fut)); } } diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index fda56bf28..ce0f9f073 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -21,6 +21,7 @@ use std::task::{ready, Context, Poll}; use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; +use futures::FutureExt; use super::Adapter; use super::Value; @@ -455,9 +456,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> { let path = self.path.clone(); let value = self.build(); - let fut = async move { - kv.set(&path, value).await?; - }; + let fut = async move { kv.set(&path, value).await }; self.future = Some(Box::pin(fut)); } } diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index b51c2bc60..1aa814658 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -27,6 +27,7 @@ use tokio::io::ReadBuf; use super::BlockingOperator; use crate::operator_futures::*; +use crate::raw::oio::WriteExt; use crate::raw::*; use crate::*;
