This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 1f60e84104134c034adb3f8b080d666ee9c7b597
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 17:33:01 2023 +0800

    Improve writer
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/types/writer.rs | 191 +++++------------------------------------------
 1 file changed, 18 insertions(+), 173 deletions(-)

diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 9ab2b69dd..c90ebcc54 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -56,14 +56,9 @@ use crate::*;
 /// and flush them into storage at needs. And finally, the file will be 
available
 /// after `close` has been called.
 pub struct Writer {
-    state: State,
+    inner: oio::Writer,
 }
 
-/// # Safety
-///
-/// Writer will only be accessed by `&mut Self`
-unsafe impl Sync for Writer {}
-
 impl Writer {
     /// Create a new writer.
     ///
@@ -75,25 +70,14 @@ impl Writer {
     pub(crate) async fn create(acc: FusedAccessor, path: &str, op: OpWrite) -> 
Result<Self> {
         let (_, w) = acc.write(path, op).await?;
 
-        Ok(Writer {
-            state: State::Idle(Some(w)),
-        })
+        Ok(Writer { inner: w })
     }
 
     /// Write into inner writer.
     pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
-        let w = if let State::Idle(Some(w)) = &mut self.state {
-            w
-        } else {
-            unreachable!(
-                "writer state invalid while write, expect Idle, actual {}",
-                self.state
-            );
-        };
-
         let mut bs = bs.into();
         while bs.remaining() > 0 {
-            let n = w.write(&bs).await?;
+            let n = self.inner.write(&bs).await?;
             bs.advance(n);
         }
 
@@ -137,20 +121,11 @@ impl Writer {
         S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static,
         T: Into<Bytes>,
     {
-        let w = if let State::Idle(Some(w)) = &mut self.state {
-            w
-        } else {
-            unreachable!(
-                "writer state invalid while sink, expect Idle, actual {}",
-                self.state
-            );
-        };
-
         let mut written = 0;
         while let Some(bs) = sink_from.try_next().await? {
             let mut bs = bs.into();
             while bs.remaining() > 0 {
-                let n = w.write(&bs).await?;
+                let n = self.inner.write(&bs).await?;
                 bs.advance(n);
                 written += n as u64;
             }
@@ -206,14 +181,7 @@ impl Writer {
     /// Abort should only be called when the writer is not closed or
     /// aborted, otherwise an unexpected error could be returned.
     pub async fn abort(&mut self) -> Result<()> {
-        if let State::Idle(Some(w)) = &mut self.state {
-            w.abort().await
-        } else {
-            unreachable!(
-                "writer state invalid while abort, expect Idle, actual {}",
-                self.state
-            );
-        }
+        self.inner.abort().await
     }
 
     /// Close the writer and make sure all data have been committed.
@@ -223,30 +191,7 @@ impl Writer {
     /// Close should only be called when the writer is not closed or
     /// aborted, otherwise an unexpected error could be returned.
     pub async fn close(&mut self) -> Result<()> {
-        if let State::Idle(Some(w)) = &mut self.state {
-            w.close().await
-        } else {
-            unreachable!(
-                "writer state invalid while close, expect Idle, actual {}",
-                self.state
-            );
-        }
-    }
-}
-
-enum State {
-    Idle(Option<oio::Writer>),
-    Write(BoxFuture<'static, Result<(usize, oio::Writer)>>),
-    Close(BoxFuture<'static, Result<oio::Writer>>),
-}
-
-impl Display for State {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        match self {
-            State::Idle(_) => write!(f, "Idle"),
-            State::Write(_) => write!(f, "Write"),
-            State::Close(_) => write!(f, "Close"),
-        }
+        self.inner.close().await
     }
 }
 
@@ -256,35 +201,9 @@ impl AsyncWrite for Writer {
         cx: &mut Context<'_>,
         buf: &[u8],
     ) -> Poll<io::Result<usize>> {
-        loop {
-            match &mut self.state {
-                State::Idle(w) => {
-                    let mut w = w
-                        .take()
-                        .expect("invalid state of writer: Idle state with 
empty write");
-                    // FIXME: This will the buf everytime, we should avoid 
this.
-                    let bs = Bytes::copy_from_slice(buf);
-                    let fut = async move {
-                        let n = w.write(&bs).await?;
-                        Ok((n, w))
-                    };
-                    self.state = State::Write(Box::pin(fut));
-                }
-                State::Write(fut) => match ready!(fut.poll_unpin(cx)) {
-                    Ok((size, w)) => {
-                        self.state = State::Idle(Some(w));
-                        return Poll::Ready(Ok(size));
-                    }
-                    Err(err) => {
-                        self.state = State::Idle(None);
-                        return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
-                    }
-                },
-                State::Close(_) => {
-                    unreachable!("invalid state of writer: poll_write with 
State::Close")
-                }
-            };
-        }
+        self.inner
+            .poll_write(cx, &buf)
+            .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
     }
 
     /// Writer makes sure that every write is flushed.
@@ -293,33 +212,9 @@ impl AsyncWrite for Writer {
     }
 
     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<io::Result<()>> {
-        loop {
-            match &mut self.state {
-                State::Idle(w) => {
-                    let mut w = w
-                        .take()
-                        .expect("invalid state of writer: Idle state with 
empty write");
-                    let fut = async move {
-                        w.close().await?;
-                        Ok(w)
-                    };
-                    self.state = State::Close(Box::pin(fut));
-                }
-                State::Write(_) => {
-                    unreachable!("invalid state of writer: poll_close with 
State::Write")
-                }
-                State::Close(fut) => match ready!(fut.poll_unpin(cx)) {
-                    Ok(w) => {
-                        self.state = State::Idle(Some(w));
-                        return Poll::Ready(Ok(()));
-                    }
-                    Err(err) => {
-                        self.state = State::Idle(None);
-                        return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
-                    }
-                },
-            }
-        }
+        self.inner
+            .poll_close(cx)
+            .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
     }
 }
 
@@ -329,35 +224,9 @@ impl tokio::io::AsyncWrite for Writer {
         cx: &mut Context<'_>,
         buf: &[u8],
     ) -> Poll<io::Result<usize>> {
-        loop {
-            match &mut self.state {
-                State::Idle(w) => {
-                    let mut w = w
-                        .take()
-                        .expect("invalid state of writer: Idle state with 
empty write");
-                    // FIXME: This will the buf everytime, we should avoid 
this.
-                    let bs = Bytes::copy_from_slice(buf);
-                    let fut = async move {
-                        let n = w.write(&bs).await?;
-                        Ok((n, w))
-                    };
-                    self.state = State::Write(Box::pin(fut));
-                }
-                State::Write(fut) => match ready!(fut.poll_unpin(cx)) {
-                    Ok((size, w)) => {
-                        self.state = State::Idle(Some(w));
-                        return Poll::Ready(Ok(size));
-                    }
-                    Err(err) => {
-                        self.state = State::Idle(None);
-                        return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
-                    }
-                },
-                State::Close(_) => {
-                    unreachable!("invalid state of writer: poll_write with 
State::Close")
-                }
-            };
-        }
+        self.inner
+            .poll_write(cx, &buf)
+            .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
     }
 
     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> 
Poll<io::Result<()>> {
@@ -365,33 +234,9 @@ impl tokio::io::AsyncWrite for Writer {
     }
 
     fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<io::Result<()>> {
-        loop {
-            match &mut self.state {
-                State::Idle(w) => {
-                    let mut w = w
-                        .take()
-                        .expect("invalid state of writer: Idle state with 
empty write");
-                    let fut = async move {
-                        w.close().await?;
-                        Ok(w)
-                    };
-                    self.state = State::Close(Box::pin(fut));
-                }
-                State::Write(_) => {
-                    unreachable!("invalid state of writer: poll_close with 
State::Write")
-                }
-                State::Close(fut) => match ready!(fut.poll_unpin(cx)) {
-                    Ok(w) => {
-                        self.state = State::Idle(Some(w));
-                        return Poll::Ready(Ok(()));
-                    }
-                    Err(err) => {
-                        self.state = State::Idle(None);
-                        return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
-                    }
-                },
-            }
-        }
+        self.inner
+            .poll_close(cx)
+            .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
     }
 }
 

Reply via email to