This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 1f60e84104134c034adb3f8b080d666ee9c7b597 Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 17:33:01 2023 +0800 Improve writer Signed-off-by: Xuanwo <[email protected]> --- core/src/types/writer.rs | 191 +++++------------------------------------------ 1 file changed, 18 insertions(+), 173 deletions(-) diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 9ab2b69dd..c90ebcc54 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -56,14 +56,9 @@ use crate::*; /// and flush them into storage at needs. And finally, the file will be available /// after `close` has been called. pub struct Writer { - state: State, + inner: oio::Writer, } -/// # Safety -/// -/// Writer will only be accessed by `&mut Self` -unsafe impl Sync for Writer {} - impl Writer { /// Create a new writer. /// @@ -75,25 +70,14 @@ impl Writer { pub(crate) async fn create(acc: FusedAccessor, path: &str, op: OpWrite) -> Result<Self> { let (_, w) = acc.write(path, op).await?; - Ok(Writer { - state: State::Idle(Some(w)), - }) + Ok(Writer { inner: w }) } /// Write into inner writer. pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> { - let w = if let State::Idle(Some(w)) = &mut self.state { - w - } else { - unreachable!( - "writer state invalid while write, expect Idle, actual {}", - self.state - ); - }; - let mut bs = bs.into(); while bs.remaining() > 0 { - let n = w.write(&bs).await?; + let n = self.inner.write(&bs).await?; bs.advance(n); } @@ -137,20 +121,11 @@ impl Writer { S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static, T: Into<Bytes>, { - let w = if let State::Idle(Some(w)) = &mut self.state { - w - } else { - unreachable!( - "writer state invalid while sink, expect Idle, actual {}", - self.state - ); - }; - let mut written = 0; while let Some(bs) = sink_from.try_next().await? { let mut bs = bs.into(); while bs.remaining() > 0 { - let n = w.write(&bs).await?; + let n = self.inner.write(&bs).await?; bs.advance(n); written += n as u64; } @@ -206,14 +181,7 @@ impl Writer { /// Abort should only be called when the writer is not closed or /// aborted, otherwise an unexpected error could be returned. pub async fn abort(&mut self) -> Result<()> { - if let State::Idle(Some(w)) = &mut self.state { - w.abort().await - } else { - unreachable!( - "writer state invalid while abort, expect Idle, actual {}", - self.state - ); - } + self.inner.abort().await } /// Close the writer and make sure all data have been committed. @@ -223,30 +191,7 @@ impl Writer { /// Close should only be called when the writer is not closed or /// aborted, otherwise an unexpected error could be returned. pub async fn close(&mut self) -> Result<()> { - if let State::Idle(Some(w)) = &mut self.state { - w.close().await - } else { - unreachable!( - "writer state invalid while close, expect Idle, actual {}", - self.state - ); - } - } -} - -enum State { - Idle(Option<oio::Writer>), - Write(BoxFuture<'static, Result<(usize, oio::Writer)>>), - Close(BoxFuture<'static, Result<oio::Writer>>), -} - -impl Display for State { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - State::Idle(_) => write!(f, "Idle"), - State::Write(_) => write!(f, "Write"), - State::Close(_) => write!(f, "Close"), - } + self.inner.close().await } } @@ -256,35 +201,9 @@ impl AsyncWrite for Writer { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>> { - loop { - match &mut self.state { - State::Idle(w) => { - let mut w = w - .take() - .expect("invalid state of writer: Idle state with empty write"); - // FIXME: This will the buf everytime, we should avoid this. - let bs = Bytes::copy_from_slice(buf); - let fut = async move { - let n = w.write(&bs).await?; - Ok((n, w)) - }; - self.state = State::Write(Box::pin(fut)); - } - State::Write(fut) => match ready!(fut.poll_unpin(cx)) { - Ok((size, w)) => { - self.state = State::Idle(Some(w)); - return Poll::Ready(Ok(size)); - } - Err(err) => { - self.state = State::Idle(None); - return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))); - } - }, - State::Close(_) => { - unreachable!("invalid state of writer: poll_write with State::Close") - } - }; - } + self.inner + .poll_write(cx, &buf) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) } /// Writer makes sure that every write is flushed. @@ -293,33 +212,9 @@ impl AsyncWrite for Writer { } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - loop { - match &mut self.state { - State::Idle(w) => { - let mut w = w - .take() - .expect("invalid state of writer: Idle state with empty write"); - let fut = async move { - w.close().await?; - Ok(w) - }; - self.state = State::Close(Box::pin(fut)); - } - State::Write(_) => { - unreachable!("invalid state of writer: poll_close with State::Write") - } - State::Close(fut) => match ready!(fut.poll_unpin(cx)) { - Ok(w) => { - self.state = State::Idle(Some(w)); - return Poll::Ready(Ok(())); - } - Err(err) => { - self.state = State::Idle(None); - return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))); - } - }, - } - } + self.inner + .poll_close(cx) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) } } @@ -329,35 +224,9 @@ impl tokio::io::AsyncWrite for Writer { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>> { - loop { - match &mut self.state { - State::Idle(w) => { - let mut w = w - .take() - .expect("invalid state of writer: Idle state with empty write"); - // FIXME: This will the buf everytime, we should avoid this. - let bs = Bytes::copy_from_slice(buf); - let fut = async move { - let n = w.write(&bs).await?; - Ok((n, w)) - }; - self.state = State::Write(Box::pin(fut)); - } - State::Write(fut) => match ready!(fut.poll_unpin(cx)) { - Ok((size, w)) => { - self.state = State::Idle(Some(w)); - return Poll::Ready(Ok(size)); - } - Err(err) => { - self.state = State::Idle(None); - return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))); - } - }, - State::Close(_) => { - unreachable!("invalid state of writer: poll_write with State::Close") - } - }; - } + self.inner + .poll_write(cx, &buf) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) } fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { @@ -365,33 +234,9 @@ impl tokio::io::AsyncWrite for Writer { } fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - loop { - match &mut self.state { - State::Idle(w) => { - let mut w = w - .take() - .expect("invalid state of writer: Idle state with empty write"); - let fut = async move { - w.close().await?; - Ok(w) - }; - self.state = State::Close(Box::pin(fut)); - } - State::Write(_) => { - unreachable!("invalid state of writer: poll_close with State::Write") - } - State::Close(fut) => match ready!(fut.poll_unpin(cx)) { - Ok(w) => { - self.state = State::Idle(Some(w)); - return Poll::Ready(Ok(())); - } - Err(err) => { - self.state = State::Idle(None); - return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))); - } - }, - } - } + self.inner + .poll_close(cx) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) } }
