This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new b908c668c feat(service): alluxio support write (#3566)
b908c668c is described below
commit b908c668caf4e64885b267914f0bf004ae713a60
Author: hoslo <[email protected]>
AuthorDate: Tue Nov 14 14:26:55 2023 +0800
feat(service): alluxio support write (#3566)
Co-authored-by: shuai_yang <[email protected]>
---
core/src/services/alluxio/backend.rs | 5 +-
core/src/services/alluxio/core.rs | 4 +-
core/src/services/alluxio/writer.rs | 124 +++++++++++++++++++++++++++++++----
3 files changed, 116 insertions(+), 17 deletions(-)
diff --git a/core/src/services/alluxio/backend.rs
b/core/src/services/alluxio/backend.rs
index 58d33c07a..e047bf89c 100644
--- a/core/src/services/alluxio/backend.rs
+++ b/core/src/services/alluxio/backend.rs
@@ -24,7 +24,6 @@ use async_trait::async_trait;
use log::debug;
use serde::Deserialize;
-use crate::raw::oio::OneShotWriter;
use crate::raw::*;
use crate::*;
@@ -201,6 +200,7 @@ impl Accessor for AlluxioBackend {
write: true,
/// https://github.com/Alluxio/alluxio/issues/8212
write_can_append: false,
+ write_can_multi: true,
create_dir: true,
delete: true,
@@ -229,8 +229,7 @@ impl Accessor for AlluxioBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let w = AlluxioWriter::new(self.core.clone(), args, path.to_string());
- let w = OneShotWriter::new(w);
+ let w = AlluxioWriter::new(self.core.clone(), args.clone(),
path.to_string());
Ok((RpWrite::default(), w))
}
diff --git a/core/src/services/alluxio/core.rs
b/core/src/services/alluxio/core.rs
index 86bf36923..a68e0bcdc 100644
--- a/core/src/services/alluxio/core.rs
+++ b/core/src/services/alluxio/core.rs
@@ -343,7 +343,9 @@ impl AlluxioCore {
match status {
StatusCode::OK => {
let body = resp.into_body().bytes().await?;
- Ok(body.len())
+ let size: usize =
+
serde_json::from_slice(&body).map_err(new_json_serialize_error)?;
+ Ok(size)
}
_ => Err(parse_error(resp).await?),
}
diff --git a/core/src/services/alluxio/writer.rs
b/core/src/services/alluxio/writer.rs
index 4eb52ae90..404304d84 100644
--- a/core/src/services/alluxio/writer.rs
+++ b/core/src/services/alluxio/writer.rs
@@ -16,42 +16,140 @@
// under the License.
use std::sync::Arc;
+use std::task::ready;
+use std::task::Context;
+use std::task::Poll;
use async_trait::async_trait;
+use futures::future::BoxFuture;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
-use crate::Result;
+
+use crate::*;
use super::core::AlluxioCore;
-pub type AlluxioWriters = oio::OneShotWriter<AlluxioWriter>;
+pub type AlluxioWriters = AlluxioWriter;
pub struct AlluxioWriter {
- core: Arc<AlluxioCore>,
+ state: State,
_op: OpWrite,
path: String,
+ stream_id: Option<u64>,
+}
+
+enum State {
+ Idle(Option<Arc<AlluxioCore>>),
+ Init(BoxFuture<'static, (Arc<AlluxioCore>, Result<u64>)>),
+ Write(BoxFuture<'static, (Arc<AlluxioCore>, Result<usize>)>),
+ Close(BoxFuture<'static, (Arc<AlluxioCore>, Result<()>)>),
}
impl AlluxioWriter {
pub fn new(core: Arc<AlluxioCore>, _op: OpWrite, path: String) -> Self {
- AlluxioWriter { core, _op, path }
+ AlluxioWriter {
+ state: State::Idle(Some(core)),
+ _op,
+ path,
+ stream_id: None,
+ }
}
}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for State.
+unsafe impl Sync for State {}
+
#[async_trait]
-impl oio::OneShotWrite for AlluxioWriter {
- async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
- let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+impl oio::Write for AlluxioWriter {
+ fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) ->
Poll<Result<usize>> {
+ loop {
+ match &mut self.state {
+ State::Idle(w) => match self.stream_id.as_ref() {
+ Some(stream_id) => {
+ let size = bs.remaining();
+ let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)).clone();
+
+ let stream_id = *stream_id;
- let stream_id = self.core.create_file(&self.path).await?;
+ let w = w.take().expect("writer must be valid");
- self.core
- .write(stream_id, AsyncBody::ChunkedBytes(bs))
- .await?;
+ self.state = State::Write(Box::pin(async move {
+ let part = w.write(stream_id,
AsyncBody::ChunkedBytes(cb)).await;
- self.core.close(stream_id).await?;
+ (w, part)
+ }));
+ }
+ None => {
+ let path = self.path.clone();
+ let w = w.take().expect("writer must be valid");
+ self.state = State::Init(Box::pin(async move {
+ let upload_id = w.create_file(&path).await;
+ (w, upload_id)
+ }));
+ }
+ },
+ State::Init(fut) => {
+ let (w, stream_id) = ready!(fut.as_mut().poll(cx));
+ self.state = State::Idle(Some(w));
+ self.stream_id = Some(stream_id?);
+ }
+ State::Write(fut) => {
+ let (w, part) = ready!(fut.as_mut().poll(cx));
+ self.state = State::Idle(Some(w));
+ return Poll::Ready(Ok(part?));
+ }
+ State::Close(_) => {
+ unreachable!(
+ "MultipartUploadWriter must not go into State::Close
during poll_write"
+ )
+ }
+ }
+ }
+ }
+
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ loop {
+ match &mut self.state {
+ State::Idle(w) => {
+ let w = w.take().expect("writer must be valid");
+ match self.stream_id {
+ Some(stream_id) => {
+ self.state = State::Close(Box::pin(async move {
+ let res = w.close(stream_id).await;
+ (w, res)
+ }));
+ }
+ None => {
+ return Poll::Ready(Ok(()));
+ }
+ }
+ }
+ State::Close(fut) => {
+ let (w, res) = futures::ready!(fut.as_mut().poll(cx));
+ self.state = State::Idle(Some(w));
+
+ res?;
+
+ return Poll::Ready(Ok(()));
+ }
+ State::Init(_) => {
+ unreachable!("AlluxioWriter must not go into State::Init
during poll_close")
+ }
+ State::Write(_) => unreachable! {
+ "AlluxioWriter must not go into State::Write during
poll_close"
+ },
+ }
+ }
+ }
- Ok(())
+ fn poll_abort(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "AlluxioWriter doesn't support abort",
+ )))
}
}