hoslo commented on code in PR #3566:
URL: 
https://github.com/apache/incubator-opendal/pull/3566#discussion_r1392042148


##########
core/src/services/alluxio/backend.rs:
##########
@@ -229,8 +228,7 @@ impl Accessor for AlluxioBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let w = AlluxioWriter::new(self.core.clone(), args, path.to_string());

Review Comment:
   fixed



##########
core/src/services/alluxio/writer.rs:
##########
@@ -16,42 +16,177 @@
 // under the License.
 
 use std::sync::Arc;
+use std::task::ready;
+use std::task::Context;
+use std::task::Poll;
 
 use async_trait::async_trait;
+use futures::future::BoxFuture;
 
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::Result;
 
 use super::core::AlluxioCore;
 
-pub type AlluxioWriters = oio::OneShotWriter<AlluxioWriter>;
+pub type AlluxioWriters = AlluxioWriter;
 
 pub struct AlluxioWriter {
-    core: Arc<AlluxioCore>,
+    state: State,
 
     _op: OpWrite,
     path: String,
+    stream_id: Option<u64>,
+}
+
+enum State {
+    Idle(Option<Arc<AlluxioCore>>),
+    Init(BoxFuture<'static, (Arc<AlluxioCore>, Result<u64>)>),
+    Write(BoxFuture<'static, (Arc<AlluxioCore>, Result<usize>)>),
+    Close(BoxFuture<'static, (Arc<AlluxioCore>, Result<()>)>),
+    Abort(BoxFuture<'static, (Arc<AlluxioCore>, Result<()>)>),
 }
 
 impl AlluxioWriter {
     pub fn new(core: Arc<AlluxioCore>, _op: OpWrite, path: String) -> Self {
-        AlluxioWriter { core, _op, path }
+        AlluxioWriter {
+            state: State::Idle(Some(core)),
+            _op,
+            path,
+            stream_id: None,
+        }
     }
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for State.
+unsafe impl Sync for State {}
+
 #[async_trait]
-impl oio::OneShotWrite for AlluxioWriter {
-    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
-        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+impl oio::Write for AlluxioWriter {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> 
Poll<Result<usize>> {
+        loop {
+            match &mut self.state {
+                State::Idle(w) => match self.stream_id.as_ref() {
+                    Some(stream_id) => {
+                        let size = bs.remaining();
+                        let cb = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)).clone();
 
-        let stream_id = self.core.create_file(&self.path).await?;
+                        let stream_id = *stream_id;
 
-        self.core
-            .write(stream_id, AsyncBody::ChunkedBytes(bs))
-            .await?;
+                        let w = w.take().expect("writer must be valid");
 
-        self.core.close(stream_id).await?;
+                        self.state = State::Write(Box::pin(async move {
+                            let part = w.write(stream_id, 
AsyncBody::ChunkedBytes(cb)).await;
+
+                            (w, part)
+                        }));
+                    }
+                    None => {
+                        let path = self.path.clone();
+                        let w = w.take().expect("writer must be valid");
+                        self.state = State::Init(Box::pin(async move {
+                            let upload_id = w.create_file(&path).await;
+                            (w, upload_id)
+                        }));
+                    }
+                },
+                State::Init(fut) => {
+                    let (w, stream_id) = ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(w));
+                    self.stream_id = Some(stream_id?);
+                }
+                State::Write(fut) => {
+                    let (w, part) = ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(w));
+                    return Poll::Ready(Ok(part?));
+                }
+                State::Close(_) => {
+                    unreachable!(
+                        "MultipartUploadWriter must not go into State::Close 
during poll_write"
+                    )
+                }
+                State::Abort(_) => {
+                    unreachable!(
+                        "MultipartUploadWriter must not go into State::Abort 
during poll_write"
+                    )
+                }
+            }
+        }
+    }
+
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        loop {
+            match &mut self.state {
+                State::Idle(w) => {
+                    let w = w.take().expect("writer must be valid");
+                    match self.stream_id {
+                        Some(stream_id) => {
+                            self.state = State::Close(Box::pin(async move {
+                                let res = w.close(stream_id).await;
+                                (w, res)
+                            }));
+                        }
+                        None => {
+                            return Poll::Ready(Ok(()));
+                        }
+                    }
+                }
+                State::Close(fut) => {
+                    let (w, res) = futures::ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(w));
+
+                    res?;
+
+                    return Poll::Ready(Ok(()));
+                }
+                State::Init(_) => {
+                    unreachable!("AlluxioWriter must not go into State::Init 
during poll_close")
+                }
+                State::Write(_) => unreachable! {
+                    "AlluxioWriter must not go into State::Write during 
poll_close"
+                },
+                State::Abort(_) => {
+                    unreachable!("AlluxioWriter must not go into State::Abort 
during poll_close")
+                }
+            }
+        }
+    }
 
-        Ok(())
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        loop {
+            match &mut self.state {
+                State::Idle(w) => {
+                    let w = w.take().expect("writer must be valid");
+                    match self.stream_id {
+                        Some(_) => {
+                            let path = self.path.clone();
+                            self.state = State::Abort(Box::pin(async move {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to