This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new b908c668c feat(service): alluxio support write (#3566)
b908c668c is described below

commit b908c668caf4e64885b267914f0bf004ae713a60
Author: hoslo <[email protected]>
AuthorDate: Tue Nov 14 14:26:55 2023 +0800

    feat(service): alluxio support write (#3566)
    
    Co-authored-by: shuai_yang <[email protected]>
---
 core/src/services/alluxio/backend.rs |   5 +-
 core/src/services/alluxio/core.rs    |   4 +-
 core/src/services/alluxio/writer.rs  | 124 +++++++++++++++++++++++++++++++----
 3 files changed, 116 insertions(+), 17 deletions(-)

diff --git a/core/src/services/alluxio/backend.rs 
b/core/src/services/alluxio/backend.rs
index 58d33c07a..e047bf89c 100644
--- a/core/src/services/alluxio/backend.rs
+++ b/core/src/services/alluxio/backend.rs
@@ -24,7 +24,6 @@ use async_trait::async_trait;
 use log::debug;
 use serde::Deserialize;
 
-use crate::raw::oio::OneShotWriter;
 use crate::raw::*;
 use crate::*;
 
@@ -201,6 +200,7 @@ impl Accessor for AlluxioBackend {
                 write: true,
                 /// https://github.com/Alluxio/alluxio/issues/8212
                 write_can_append: false,
+                write_can_multi: true,
 
                 create_dir: true,
                 delete: true,
@@ -229,8 +229,7 @@ impl Accessor for AlluxioBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let w = AlluxioWriter::new(self.core.clone(), args, path.to_string());
-        let w = OneShotWriter::new(w);
+        let w = AlluxioWriter::new(self.core.clone(), args.clone(), 
path.to_string());
 
         Ok((RpWrite::default(), w))
     }
diff --git a/core/src/services/alluxio/core.rs 
b/core/src/services/alluxio/core.rs
index 86bf36923..a68e0bcdc 100644
--- a/core/src/services/alluxio/core.rs
+++ b/core/src/services/alluxio/core.rs
@@ -343,7 +343,9 @@ impl AlluxioCore {
         match status {
             StatusCode::OK => {
                 let body = resp.into_body().bytes().await?;
-                Ok(body.len())
+                let size: usize =
+                    
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+                Ok(size)
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/alluxio/writer.rs 
b/core/src/services/alluxio/writer.rs
index 4eb52ae90..404304d84 100644
--- a/core/src/services/alluxio/writer.rs
+++ b/core/src/services/alluxio/writer.rs
@@ -16,42 +16,140 @@
 // under the License.
 
 use std::sync::Arc;
+use std::task::ready;
+use std::task::Context;
+use std::task::Poll;
 
 use async_trait::async_trait;
+use futures::future::BoxFuture;
 
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
-use crate::Result;
+
+use crate::*;
 
 use super::core::AlluxioCore;
 
-pub type AlluxioWriters = oio::OneShotWriter<AlluxioWriter>;
+pub type AlluxioWriters = AlluxioWriter;
 
 pub struct AlluxioWriter {
-    core: Arc<AlluxioCore>,
+    state: State,
 
     _op: OpWrite,
     path: String,
+    stream_id: Option<u64>,
+}
+
+enum State {
+    Idle(Option<Arc<AlluxioCore>>),
+    Init(BoxFuture<'static, (Arc<AlluxioCore>, Result<u64>)>),
+    Write(BoxFuture<'static, (Arc<AlluxioCore>, Result<usize>)>),
+    Close(BoxFuture<'static, (Arc<AlluxioCore>, Result<()>)>),
 }
 
 impl AlluxioWriter {
     pub fn new(core: Arc<AlluxioCore>, _op: OpWrite, path: String) -> Self {
-        AlluxioWriter { core, _op, path }
+        AlluxioWriter {
+            state: State::Idle(Some(core)),
+            _op,
+            path,
+            stream_id: None,
+        }
     }
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for State.
+unsafe impl Sync for State {}
+
 #[async_trait]
-impl oio::OneShotWrite for AlluxioWriter {
-    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
-        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+impl oio::Write for AlluxioWriter {
+    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> 
Poll<Result<usize>> {
+        loop {
+            match &mut self.state {
+                State::Idle(w) => match self.stream_id.as_ref() {
+                    Some(stream_id) => {
+                        let size = bs.remaining();
+                        let cb = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)).clone();
+
+                        let stream_id = *stream_id;
 
-        let stream_id = self.core.create_file(&self.path).await?;
+                        let w = w.take().expect("writer must be valid");
 
-        self.core
-            .write(stream_id, AsyncBody::ChunkedBytes(bs))
-            .await?;
+                        self.state = State::Write(Box::pin(async move {
+                            let part = w.write(stream_id, 
AsyncBody::ChunkedBytes(cb)).await;
 
-        self.core.close(stream_id).await?;
+                            (w, part)
+                        }));
+                    }
+                    None => {
+                        let path = self.path.clone();
+                        let w = w.take().expect("writer must be valid");
+                        self.state = State::Init(Box::pin(async move {
+                            let upload_id = w.create_file(&path).await;
+                            (w, upload_id)
+                        }));
+                    }
+                },
+                State::Init(fut) => {
+                    let (w, stream_id) = ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(w));
+                    self.stream_id = Some(stream_id?);
+                }
+                State::Write(fut) => {
+                    let (w, part) = ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(w));
+                    return Poll::Ready(Ok(part?));
+                }
+                State::Close(_) => {
+                    unreachable!(
+                        "MultipartUploadWriter must not go into State::Close 
during poll_write"
+                    )
+                }
+            }
+        }
+    }
+
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        loop {
+            match &mut self.state {
+                State::Idle(w) => {
+                    let w = w.take().expect("writer must be valid");
+                    match self.stream_id {
+                        Some(stream_id) => {
+                            self.state = State::Close(Box::pin(async move {
+                                let res = w.close(stream_id).await;
+                                (w, res)
+                            }));
+                        }
+                        None => {
+                            return Poll::Ready(Ok(()));
+                        }
+                    }
+                }
+                State::Close(fut) => {
+                    let (w, res) = futures::ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some(w));
+
+                    res?;
+
+                    return Poll::Ready(Ok(()));
+                }
+                State::Init(_) => {
+                    unreachable!("AlluxioWriter must not go into State::Init 
during poll_close")
+                }
+                State::Write(_) => unreachable! {
+                    "AlluxioWriter must not go into State::Write during 
poll_close"
+                },
+            }
+        }
+    }
 
-        Ok(())
+    fn poll_abort(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "AlluxioWriter doesn't support abort",
+        )))
     }
 }

Reply via email to