This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 0dcbc79a4d1c726a43789e0e92e29cfb7ddcea2e Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 14:33:48 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/write/append_object_write.rs | 68 +++++++++++++++++------- core/src/raw/oio/write/multipart_upload_write.rs | 6 +-- 2 files changed, 53 insertions(+), 21 deletions(-) diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index ba5642dff..2c0bd5555 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -16,6 +16,8 @@ // under the License. use async_trait::async_trait; +use futures::future::BoxFuture; +use std::task::{ready, Context, Poll}; use crate::raw::*; use crate::*; @@ -46,16 +48,22 @@ pub trait AppendObjectWrite: Send + Sync + Unpin { /// /// - Allow users to switch to un-buffered mode if users write 16MiB every time. pub struct AppendObjectWriter<W: AppendObjectWrite> { - inner: W, + state: State<W>, offset: Option<u64>, } +enum State<W> { + Idle(Option<W>), + Offset(BoxFuture<'static, (W, Result<u64>)>), + Append(BoxFuture<'static, (W, Result<usize>)>), +} + impl<W: AppendObjectWrite> AppendObjectWriter<W> { /// Create a new AppendObjectWriter. pub fn new(inner: W) -> Self { Self { - inner, + state: State::Idle(Some(inner)), offset: None, } } @@ -78,27 +86,51 @@ where W: AppendObjectWrite, { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let offset = self.offset().await?; - - let size = bs.remaining(); - - self.inner - .append( - offset, - size as u64, - AsyncBody::Bytes(bs.copy_to_bytes(size)), - ) - .await - .map(|_| self.offset = Some(offset + size as u64))?; - - Ok(size) + loop { + match &mut self.state { + State::Idle(w) => { + let w = w.take().expect("writer must be valid"); + match self.offset { + Some(offset) => { + let size = bs.remaining(); + let bs = bs.copy_to_bytes(size); + + self.state = State::Append(Box::pin(async move { + w.append(offset, size as u64, AsyncBody::Bytes(bs)).await?; + + (w, Ok(size)) + })); + } + None => { + self.state = State::Offset(Box::pin(async move { + let offset = w.offset().await?; + + (w, Ok(offset)) + })); + } + } + } + State::Offset(fut) => { + let (w, offset) = ready!(fut.as_mut().poll(cx)); + self.state = State::Idle(Some(w)); + self.offset = Some(offset?); + } + State::Append(fut) => { + let (w, res) = ready!(fut.as_mut().poll(cx)); + self.state = State::Idle(Some(w)); + + let size = res?; + return Poll::Ready(Ok(size)); + } + } + } } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + Poll::Ready(Ok(())) } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + Poll::Ready(Ok(())) } } diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 55a28e84a..482954dc9 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -18,7 +18,7 @@ use async_trait::async_trait; use futures::future::BoxFuture; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use crate::raw::*; use crate::*; @@ -147,12 +147,12 @@ where } } State::Init(fut) => { - let (w, upload_id) = futures::ready!(fut.as_mut().poll(cx)); + let (w, upload_id) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(w)); self.upload_id = Some(Arc::new(upload_id?)); } State::Write(fut) => { - let (w, res) = futures::ready!(fut.as_mut().poll(cx)); + let (w, res) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(w)); let (written, part) = res?;
