This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 5d78f4f653845ee9d2a4aa85ae820b2d5ace9795
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 14:48:12 2023 +0800

    save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/write/one_shot_write.rs | 39 ++++++++++++++++++++++++--------
 1 file changed, 30 insertions(+), 9 deletions(-)

diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index 85ef499d5..ee15c438f 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -16,6 +16,8 @@
 // under the License.
 
 use async_trait::async_trait;
+use futures::future::BoxFuture;
+use std::task::{ready, Context, Poll};
 
 use crate::raw::*;
 use crate::*;
@@ -36,29 +38,48 @@ pub trait OneShotWrite: Send + Sync + Unpin {
 
 /// OneShotWrite is used to implement [`Write`] based on one shot.
 pub struct OneShotWriter<W: OneShotWrite> {
-    inner: W,
+    state: State<W>,
+}
+
+enum State<W> {
+    Idle(Option<W>),
+    Write(BoxFuture<'static, (W, Result<usize>)>),
 }
 
 impl<W: OneShotWrite> OneShotWriter<W> {
     /// Create a new one shot writer.
     pub fn new(inner: W) -> Self {
-        Self { inner }
+        Self {
+            state: State::Idle(Some(inner)),
+        }
     }
 }
 
 #[async_trait]
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let size = bs.remaining();
-        self.inner.write_once(bs).await?;
-        Ok(size)
+        loop {
+            match &mut self.state {
+                State::Idle(w) => {
+                    let w = w.take().expect("writer must be valid");
+                    let fut = w.write_once(bs);
+
+                    self.state = State::Write(Box::pin(fut));
+                }
+                State::Write(fut) => {
+                    let (w, size) = ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(w));
+                    return Poll::Ready(size);
+                }
+            }
+        }
     }
 
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
+    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Ok(()))
     }
 
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
+    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Ok(()))
     }
 }

Reply via email to