This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit cdc4c877163fe0fc20bc80ffea46f7368b5075a3
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 16:06:39 2023 +0800

    Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/ghac/writer.rs | 123 ++++++++++++++++++++++++++++-----------
 core/src/services/hdfs/writer.rs |   1 +
 2 files changed, 90 insertions(+), 34 deletions(-)

diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 886abffb5..0be63ce22 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -16,6 +16,8 @@
 // under the License.
 
 use async_trait::async_trait;
+use futures::future::BoxFuture;
+use std::task::{ready, Context, Poll};
 
 use super::backend::GhacBackend;
 use super::error::parse_error;
@@ -23,7 +25,7 @@ use crate::raw::*;
 use crate::*;
 
 pub struct GhacWriter {
-    backend: GhacBackend,
+    state: State,
 
     cache_id: i64,
     size: u64,
@@ -32,55 +34,108 @@ pub struct GhacWriter {
 impl GhacWriter {
     pub fn new(backend: GhacBackend, cache_id: i64) -> Self {
         GhacWriter {
-            backend,
+            state: State::Idle(Some(backend)),
             cache_id,
             size: 0,
         }
     }
 }
 
+enum State {
+    Idle(Option<GhacBackend>),
+    Upload(BoxFuture<'static, (GhacBackend, Result<usize>)>),
+    Commit(BoxFuture<'static, (GhacBackend, Result<()>)>),
+}
+
 #[async_trait]
 impl oio::Write for GhacWriter {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let size = bs.remaining();
-
-        let req = self
-            .backend
-            .ghac_upload(
-                self.cache_id,
-                size as u64,
-                AsyncBody::Bytes(bs.copy_to_bytes(size)),
-            )
-            .await?;
-
-        let resp = self.backend.client.send(req).await?;
-
-        if resp.status().is_success() {
-            resp.into_body().consume().await?;
-            self.size += size as u64;
-            Ok(size)
-        } else {
-            Err(parse_error(resp)
-                .await
-                .map(|err| err.with_operation("Backend::ghac_upload"))?)
+        loop {
+            match &mut self.state {
+                State::Idle(backend) => {
+                    let backend = backend.take().expect("GhacWriter must be 
initialized");
+
+                    let cache_id = self.cache_id;
+                    let size = bs.remaining();
+                    let bs = bs.copy_to_bytes(size);
+
+                    let fut = async {
+                        let req = backend
+                            .ghac_upload(cache_id, size as u64, 
AsyncBody::Bytes(bs))
+                            .await?;
+
+                        let resp = backend.client.send(req).await?;
+
+                        let res = if resp.status().is_success() {
+                            resp.into_body().consume().await?;
+                            Ok(size)
+                        } else {
+                            Err(parse_error(resp)
+                                .await
+                                .map(|err| 
err.with_operation("Backend::ghac_upload"))?)
+                        };
+
+                        (backend, res)
+                    };
+                    self.state = State::Upload(Box::pin(fut));
+                }
+                State::Upload(fut) => {
+                    let (backend, res) = ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(backend));
+
+                    let size = res?;
+                    self.size += size as u64;
+                    return Poll::Ready(Ok(size));
+                }
+                State::Commit(_) => {
+                    unreachable!("GhacWriter must not go into State:Commit 
during poll_write")
+                }
+            }
         }
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
+        self.state = State::Idle(None);
+
+        Poll::Ready(Ok(()))
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        let req = self.backend.ghac_commit(self.cache_id, self.size).await?;
-        let resp = self.backend.client.send(req).await?;
-
-        if resp.status().is_success() {
-            resp.into_body().consume().await?;
-            Ok(())
-        } else {
-            Err(parse_error(resp)
-                .await
-                .map(|err| err.with_operation("Backend::ghac_commit"))?)
+        loop {
+            match &mut self.state {
+                State::Idle(backend) => {
+                    let backend = backend.take().expect("GhacWriter must be 
initialized");
+
+                    let cache_id = self.cache_id;
+                    let size = self.size;
+
+                    let fut = async {
+                        let req = backend.ghac_commit(cache_id, size).await?;
+                        let resp = backend.client.send(req).await?;
+
+                        let res = if resp.status().is_success() {
+                            resp.into_body().consume().await?;
+                            Ok(())
+                        } else {
+                            Err(parse_error(resp)
+                                .await
+                                .map(|err| 
err.with_operation("Backend::ghac_commit"))?)
+                        };
+
+                        (backend, res)
+                    };
+                    self.state = State::Upload(Box::pin(fut));
+                }
+                State::Upload(_) => {
+                    unreachable!("GhacWriter must not go into State:Upload 
during poll_close")
+                }
+                State::Commit(fut) => {
+                    let (backend, res) = ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(backend));
+
+                    return Poll::Ready(res);
+                }
+            }
         }
     }
 }
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 4c69d63dc..1b4f549ce 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::io::Write;
+use std::task::Context;
 
 use async_trait::async_trait;
 use futures::AsyncWriteExt;

Reply via email to