This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 0dcbc79a4d1c726a43789e0e92e29cfb7ddcea2e
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 14:33:48 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/write/append_object_write.rs    | 68 +++++++++++++++++-------
 core/src/raw/oio/write/multipart_upload_write.rs |  6 +--
 2 files changed, 53 insertions(+), 21 deletions(-)

diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index ba5642dff..2c0bd5555 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -16,6 +16,8 @@
 // under the License.
 
 use async_trait::async_trait;
+use futures::future::BoxFuture;
+use std::task::{ready, Context, Poll};
 
 use crate::raw::*;
 use crate::*;
@@ -46,16 +48,22 @@ pub trait AppendObjectWrite: Send + Sync + Unpin {
 ///
 /// - Allow users to switch to un-buffered mode if users write 16MiB every 
time.
 pub struct AppendObjectWriter<W: AppendObjectWrite> {
-    inner: W,
+    state: State<W>,
 
     offset: Option<u64>,
 }
 
+enum State<W> {
+    Idle(Option<W>),
+    Offset(BoxFuture<'static, (W, Result<u64>)>),
+    Append(BoxFuture<'static, (W, Result<usize>)>),
+}
+
 impl<W: AppendObjectWrite> AppendObjectWriter<W> {
     /// Create a new AppendObjectWriter.
     pub fn new(inner: W) -> Self {
         Self {
-            inner,
+            state: State::Idle(Some(inner)),
             offset: None,
         }
     }
@@ -78,27 +86,51 @@ where
     W: AppendObjectWrite,
 {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let offset = self.offset().await?;
-
-        let size = bs.remaining();
-
-        self.inner
-            .append(
-                offset,
-                size as u64,
-                AsyncBody::Bytes(bs.copy_to_bytes(size)),
-            )
-            .await
-            .map(|_| self.offset = Some(offset + size as u64))?;
-
-        Ok(size)
+        loop {
+            match &mut self.state {
+                State::Idle(w) => {
+                    let w = w.take().expect("writer must be valid");
+                    match self.offset {
+                        Some(offset) => {
+                            let size = bs.remaining();
+                            let bs = bs.copy_to_bytes(size);
+
+                            self.state = State::Append(Box::pin(async move {
+                                w.append(offset, size as u64, 
AsyncBody::Bytes(bs)).await?;
+
+                                (w, Ok(size))
+                            }));
+                        }
+                        None => {
+                            self.state = State::Offset(Box::pin(async move {
+                                let offset = w.offset().await?;
+
+                                (w, Ok(offset))
+                            }));
+                        }
+                    }
+                }
+                State::Offset(fut) => {
+                    let (w, offset) = ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(w));
+                    self.offset = Some(offset?);
+                }
+                State::Append(fut) => {
+                    let (w, res) = ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(w));
+
+                    let size = res?;
+                    return Poll::Ready(Ok(size));
+                }
+            }
+        }
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 }
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 55a28e84a..482954dc9 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -18,7 +18,7 @@
 use async_trait::async_trait;
 use futures::future::BoxFuture;
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::task::{ready, Context, Poll};
 
 use crate::raw::*;
 use crate::*;
@@ -147,12 +147,12 @@ where
                     }
                 }
                 State::Init(fut) => {
-                    let (w, upload_id) = 
futures::ready!(fut.as_mut().poll(cx));
+                    let (w, upload_id) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(w));
                     self.upload_id = Some(Arc::new(upload_id?));
                 }
                 State::Write(fut) => {
-                    let (w, res) = futures::ready!(fut.as_mut().poll(cx));
+                    let (w, res) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(w));
 
                     let (written, part) = res?;

Reply via email to