This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit cdc4c877163fe0fc20bc80ffea46f7368b5075a3 Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 16:06:39 2023 +0800 Fix build Signed-off-by: Xuanwo <[email protected]> --- core/src/services/ghac/writer.rs | 123 ++++++++++++++++++++++++++++----------- core/src/services/hdfs/writer.rs | 1 + 2 files changed, 90 insertions(+), 34 deletions(-) diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 886abffb5..0be63ce22 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -16,6 +16,8 @@ // under the License. use async_trait::async_trait; +use futures::future::BoxFuture; +use std::task::{ready, Context, Poll}; use super::backend::GhacBackend; use super::error::parse_error; @@ -23,7 +25,7 @@ use crate::raw::*; use crate::*; pub struct GhacWriter { - backend: GhacBackend, + state: State, cache_id: i64, size: u64, @@ -32,55 +34,108 @@ pub struct GhacWriter { impl GhacWriter { pub fn new(backend: GhacBackend, cache_id: i64) -> Self { GhacWriter { - backend, + state: State::Idle(Some(backend)), cache_id, size: 0, } } } +enum State { + Idle(Option<GhacBackend>), + Upload(BoxFuture<'static, (GhacBackend, Result<usize>)>), + Commit(BoxFuture<'static, (GhacBackend, Result<()>)>), +} + #[async_trait] impl oio::Write for GhacWriter { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); - - let req = self - .backend - .ghac_upload( - self.cache_id, - size as u64, - AsyncBody::Bytes(bs.copy_to_bytes(size)), - ) - .await?; - - let resp = self.backend.client.send(req).await?; - - if resp.status().is_success() { - resp.into_body().consume().await?; - self.size += size as u64; - Ok(size) - } else { - Err(parse_error(resp) - .await - .map(|err| err.with_operation("Backend::ghac_upload"))?) + loop { + match &mut self.state { + State::Idle(backend) => { + let backend = backend.take().expect("GhacWriter must be initialized"); + + let cache_id = self.cache_id; + let size = bs.remaining(); + let bs = bs.copy_to_bytes(size); + + let fut = async { + let req = backend + .ghac_upload(cache_id, size as u64, AsyncBody::Bytes(bs)) + .await?; + + let resp = backend.client.send(req).await?; + + let res = if resp.status().is_success() { + resp.into_body().consume().await?; + Ok(size) + } else { + Err(parse_error(resp) + .await + .map(|err| err.with_operation("Backend::ghac_upload"))?) + }; + + (backend, res) + }; + self.state = State::Upload(Box::pin(fut)); + } + State::Upload(fut) => { + let (backend, res) = ready!(fut.as_mut().poll(cx)); + self.state = State::Idle(Some(backend)); + + let size = res?; + self.size += size as u64; + return Poll::Ready(Ok(size)); + } + State::Commit(_) => { + unreachable!("GhacWriter must not go into State:Commit during poll_write") + } + } } } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + self.state = State::Idle(None); + + Poll::Ready(Ok(())) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - let req = self.backend.ghac_commit(self.cache_id, self.size).await?; - let resp = self.backend.client.send(req).await?; - - if resp.status().is_success() { - resp.into_body().consume().await?; - Ok(()) - } else { - Err(parse_error(resp) - .await - .map(|err| err.with_operation("Backend::ghac_commit"))?) + loop { + match &mut self.state { + State::Idle(backend) => { + let backend = backend.take().expect("GhacWriter must be initialized"); + + let cache_id = self.cache_id; + let size = self.size; + + let fut = async { + let req = backend.ghac_commit(cache_id, size).await?; + let resp = backend.client.send(req).await?; + + let res = if resp.status().is_success() { + resp.into_body().consume().await?; + Ok(()) + } else { + Err(parse_error(resp) + .await + .map(|err| err.with_operation("Backend::ghac_commit"))?) + }; + + (backend, res) + }; + self.state = State::Upload(Box::pin(fut)); + } + State::Upload(_) => { + unreachable!("GhacWriter must not go into State:Upload during poll_close") + } + State::Commit(fut) => { + let (backend, res) = ready!(fut.as_mut().poll(cx)); + self.state = State::Idle(Some(backend)); + + return Poll::Ready(res); + } + } } } } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 4c69d63dc..1b4f549ce 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -16,6 +16,7 @@ // under the License. use std::io::Write; +use std::task::Context; use async_trait::async_trait; use futures::AsyncWriteExt;
