This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 5d78f4f653845ee9d2a4aa85ae820b2d5ace9795 Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 14:48:12 2023 +0800 save work Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/write/one_shot_write.rs | 39 ++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 85ef499d5..ee15c438f 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -16,6 +16,8 @@ // under the License. use async_trait::async_trait; +use futures::future::BoxFuture; +use std::task::{ready, Context, Poll}; use crate::raw::*; use crate::*; @@ -36,29 +38,48 @@ pub trait OneShotWrite: Send + Sync + Unpin { /// OneShotWrite is used to implement [`Write`] based on one shot. pub struct OneShotWriter<W: OneShotWrite> { - inner: W, + state: State<W>, +} + +enum State<W> { + Idle(Option<W>), + Write(BoxFuture<'static, (W, Result<usize>)>), } impl<W: OneShotWrite> OneShotWriter<W> { /// Create a new one shot writer. pub fn new(inner: W) -> Self { - Self { inner } + Self { + state: State::Idle(Some(inner)), + } } } #[async_trait] impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); - self.inner.write_once(bs).await?; - Ok(size) + loop { + match &mut self.state { + State::Idle(w) => { + let w = w.take().expect("writer must be valid"); + let fut = w.write_once(bs); + + self.state = State::Write(Box::pin(fut)); + } + State::Write(fut) => { + let (w, size) = ready!(fut.as_mut().poll(cx)); + self.state = State::Idle(Some(w)); + return Poll::Ready(size); + } + } + } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + Poll::Ready(Ok(())) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + Poll::Ready(Ok(())) } }
