Xuanwo commented on code in PR #3566:
URL:
https://github.com/apache/incubator-opendal/pull/3566#discussion_r1391989653
##########
core/src/services/alluxio/backend.rs:
##########
@@ -229,8 +228,7 @@ impl Accessor for AlluxioBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let w = AlluxioWriter::new(self.core.clone(), args, path.to_string());
Review Comment:
Hi, please activate the `write_can_multi` capability to ensure corresponding
tests are enabled.
##########
core/src/services/alluxio/writer.rs:
##########
@@ -16,42 +16,177 @@
// under the License.
use std::sync::Arc;
+use std::task::ready;
+use std::task::Context;
+use std::task::Poll;
use async_trait::async_trait;
+use futures::future::BoxFuture;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::Result;
use super::core::AlluxioCore;
-pub type AlluxioWriters = oio::OneShotWriter<AlluxioWriter>;
+pub type AlluxioWriters = AlluxioWriter;
pub struct AlluxioWriter {
- core: Arc<AlluxioCore>,
+ state: State,
_op: OpWrite,
path: String,
+ stream_id: Option<u64>,
+}
+
+enum State {
+ Idle(Option<Arc<AlluxioCore>>),
+ Init(BoxFuture<'static, (Arc<AlluxioCore>, Result<u64>)>),
+ Write(BoxFuture<'static, (Arc<AlluxioCore>, Result<usize>)>),
+ Close(BoxFuture<'static, (Arc<AlluxioCore>, Result<()>)>),
+ Abort(BoxFuture<'static, (Arc<AlluxioCore>, Result<()>)>),
}
impl AlluxioWriter {
pub fn new(core: Arc<AlluxioCore>, _op: OpWrite, path: String) -> Self {
- AlluxioWriter { core, _op, path }
+ AlluxioWriter {
+ state: State::Idle(Some(core)),
+ _op,
+ path,
+ stream_id: None,
+ }
}
}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for State.
+unsafe impl Sync for State {}
+
#[async_trait]
-impl oio::OneShotWrite for AlluxioWriter {
- async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
- let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+impl oio::Write for AlluxioWriter {
+ fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) ->
Poll<Result<usize>> {
+ loop {
+ match &mut self.state {
+ State::Idle(w) => match self.stream_id.as_ref() {
+ Some(stream_id) => {
+ let size = bs.remaining();
+ let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)).clone();
- let stream_id = self.core.create_file(&self.path).await?;
+ let stream_id = *stream_id;
- self.core
- .write(stream_id, AsyncBody::ChunkedBytes(bs))
- .await?;
+ let w = w.take().expect("writer must be valid");
- self.core.close(stream_id).await?;
+ self.state = State::Write(Box::pin(async move {
+ let part = w.write(stream_id,
AsyncBody::ChunkedBytes(cb)).await;
+
+ (w, part)
+ }));
+ }
+ None => {
+ let path = self.path.clone();
+ let w = w.take().expect("writer must be valid");
+ self.state = State::Init(Box::pin(async move {
+ let upload_id = w.create_file(&path).await;
+ (w, upload_id)
+ }));
+ }
+ },
+ State::Init(fut) => {
+ let (w, stream_id) = ready!(fut.as_mut().poll(cx));
+ self.state = State::Idle(Some(w));
+ self.stream_id = Some(stream_id?);
+ }
+ State::Write(fut) => {
+ let (w, part) = ready!(fut.as_mut().poll(cx));
+ self.state = State::Idle(Some(w));
+ return Poll::Ready(Ok(part?));
+ }
+ State::Close(_) => {
+ unreachable!(
+ "MultipartUploadWriter must not go into State::Close
during poll_write"
+ )
+ }
+ State::Abort(_) => {
+ unreachable!(
+ "MultipartUploadWriter must not go into State::Abort
during poll_write"
+ )
+ }
+ }
+ }
+ }
+
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ loop {
+ match &mut self.state {
+ State::Idle(w) => {
+ let w = w.take().expect("writer must be valid");
+ match self.stream_id {
+ Some(stream_id) => {
+ self.state = State::Close(Box::pin(async move {
+ let res = w.close(stream_id).await;
+ (w, res)
+ }));
+ }
+ None => {
+ return Poll::Ready(Ok(()));
+ }
+ }
+ }
+ State::Close(fut) => {
+ let (w, res) = futures::ready!(fut.as_mut().poll(cx));
+ self.state = State::Idle(Some(w));
+
+ res?;
+
+ return Poll::Ready(Ok(()));
+ }
+ State::Init(_) => {
+ unreachable!("AlluxioWriter must not go into State::Init
during poll_close")
+ }
+ State::Write(_) => unreachable! {
+ "AlluxioWriter must not go into State::Write during
poll_close"
+ },
+ State::Abort(_) => {
+ unreachable!("AlluxioWriter must not go into State::Abort
during poll_close")
+ }
+ }
+ }
+ }
- Ok(())
+ fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ loop {
+ match &mut self.state {
+ State::Idle(w) => {
+ let w = w.take().expect("writer must be valid");
+ match self.stream_id {
+ Some(_) => {
+ let path = self.path.clone();
+ self.state = State::Abort(Box::pin(async move {
Review Comment:
Please don't implement abort by calling `delete`. Leave this as unsupported
if service doesn't provide native abort support. We can remove `Abort` state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]