This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 919d554ce0a627455ae85ab2b4cc5ad256d168f2 Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 17:27:08 2023 +0800 Build pass Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/utils.rs | 13 ++--- core/benches/oio/write.rs | 2 +- core/src/raw/oio/write/api.rs | 4 +- core/src/raw/oio/write/append_object_write.rs | 11 ++--- core/src/raw/oio/write/exact_buf_write.rs | 4 +- core/src/raw/oio/write/multipart_upload_write.rs | 22 ++++----- core/src/raw/oio/write/one_shot_write.rs | 4 +- core/src/services/fs/writer.rs | 4 +- core/src/services/ftp/writer.rs | 3 +- core/src/services/ghac/writer.rs | 60 +++++++++++++----------- core/src/services/sftp/writer.rs | 11 +++-- 11 files changed, 74 insertions(+), 64 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index 67b4fd451..b3ea04027 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -20,6 +20,7 @@ use bytes::Bytes; use opendal::raw::oio; use rand::prelude::ThreadRng; use rand::RngCore; +use std::task::{Context, Poll}; /// BlackHoleWriter will discard all data written to it so we can measure the buffer's cost. pub struct BlackHoleWriter; @@ -30,16 +31,16 @@ impl oio::Write for BlackHoleWriter { &mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf, - ) -> opendal::Result<usize> { - Ok(bs.remaining()) + ) -> Poll<opendal::Result<usize>> { + Poll::Ready(Ok(bs.remaining())) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> opendal::Result<()> { - Ok(()) + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<opendal::Result<()>> { + Poll::Ready(Ok(())) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> opendal::Result<()> { - Ok(()) + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<opendal::Result<()>> { + Poll::Ready(Ok(())) } } diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs index 8da2a6dd0..3ab71cdcf 100644 --- a/core/benches/oio/write.rs +++ b/core/benches/oio/write.rs @@ -18,8 +18,8 @@ use bytes::Buf; use criterion::Criterion; use once_cell::sync::Lazy; -use opendal::raw::oio::ExactBufWriter; use opendal::raw::oio::Write; +use opendal::raw::oio::{ExactBufWriter, WriteExt}; use rand::thread_rng; use size::Size; diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index 41bd93491..c05f061c7 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -166,8 +166,8 @@ where type Output = Result<usize>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> { - let this = self.project(); - Pin::new(this.writer).poll_write(cx, this.buf) + let mut this = self.project(); + Pin::new(this.writer).poll_write(cx, *this.buf) } } diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index 393254097..5cbaedb37 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -32,7 +32,7 @@ use crate::*; /// - `AppendObjectWriter` impl `Write` /// - Expose `AppendObjectWriter` as `Accessor::Writer` #[async_trait] -pub trait AppendObjectWrite: Send + Sync + Unpin { +pub trait AppendObjectWrite: Send + Sync + Unpin + 'static { /// Get the current offset of the append object. /// /// Returns `0` if the object is not exist. @@ -90,9 +90,9 @@ where let bs = bs.copy_to_bytes(size); self.state = State::Append(Box::pin(async move { - w.append(offset, size as u64, AsyncBody::Bytes(bs)).await?; + let res = w.append(offset, size as u64, AsyncBody::Bytes(bs)).await; - (w, Ok(size)) + (w, res.map(|_| size)) })); } None => { @@ -110,11 +110,10 @@ where self.offset = Some(offset?); } State::Append(fut) => { - let (w, res) = ready!(fut.as_mut().poll(cx)); + let (w, size) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(w)); - let size = res?; - return Poll::Ready(Ok(size)); + return Poll::Ready(Ok(size?)); } } } diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index f1615346b..636242280 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -164,11 +164,11 @@ mod tests { } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + Poll::Ready(Ok(())) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + Poll::Ready(Ok(())) } } diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 97b2f0220..af6f825ac 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -33,7 +33,7 @@ use crate::*; /// - `MultipartUploadWriter` impl `Write` /// - Expose `MultipartUploadWriter` as `Accessor::Writer` #[async_trait] -pub trait MultipartUploadWrite: Send + Sync + Unpin { +pub trait MultipartUploadWrite: Send + Sync + Unpin + 'static { /// initiate_part will call start a multipart upload and return the upload id. /// /// MultipartUploadWriter will call this when: @@ -72,6 +72,7 @@ pub trait MultipartUploadWrite: Send + Sync + Unpin { /// /// - `part_number` is the index of the part, starting from 0. /// - `etag` is the `ETag` of the part. +#[derive(Clone)] pub struct MultipartUploadPart { /// The number of the part, starting from 0. pub part_number: usize, @@ -91,7 +92,7 @@ pub struct MultipartUploadWriter<W: MultipartUploadWrite> { enum State<W> { Idle(Option<W>), Init(BoxFuture<'static, (W, Result<String>)>), - Write(BoxFuture<'static, (W, Result<(usize, MultipartUploadPart)>)>), + Write(BoxFuture<'static, (W, usize, Result<MultipartUploadPart>)>), Close(BoxFuture<'static, (W, Result<()>)>), Abort(BoxFuture<'static, (W, Result<()>)>), } @@ -138,9 +139,9 @@ where size as u64, AsyncBody::Bytes(bs), ) - .await?; + .await; - (w, Ok((size, part))) + (w, size, part) })); } None => { @@ -157,12 +158,11 @@ where self.upload_id = Some(Arc::new(upload_id?)); } State::Write(fut) => { - let (w, res) = ready!(fut.as_mut().poll(cx)); + let (w, size, part) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(w)); - let (written, part) = res?; - self.parts.push(part); - return Poll::Ready(Ok(written)); + self.parts.push(part?); + return Poll::Ready(Ok(size)); } State::Close(_) => { unreachable!( @@ -183,11 +183,11 @@ where match &mut self.state { State::Idle(w) => { let w = w.take().expect("writer must be valid"); - match &self.upload_id { + match self.upload_id.clone() { Some(upload_id) => { let parts = self.parts.clone(); self.state = State::Close(Box::pin(async move { - let res = w.complete_part(&upload_id, &self.parts).await; + let res = w.complete_part(&upload_id, &parts).await; (w, res) })); } @@ -217,7 +217,7 @@ where match &mut self.state { State::Idle(w) => { let w = w.take().expect("writer must be valid"); - match &self.upload_id { + match self.upload_id.clone() { Some(upload_id) => { self.state = State::Close(Box::pin(async move { let res = w.abort_part(&upload_id).await; diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 6cc083491..b798da5a4 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -30,7 +30,7 @@ use crate::*; /// /// The layout after adopting [`OneShotWrite`]: #[async_trait] -pub trait OneShotWrite: Send + Sync + Unpin { +pub trait OneShotWrite: Send + Sync + Unpin + 'static { /// write_once write all data at once. /// /// Implementations should make sure that the data is written correctly at once. @@ -71,7 +71,7 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { let size = bs.remaining(); let bs = bs.copy_to_bytes(size); - let fut = async { + let fut = async move { let res = w.write_once(bs).await; (w, res.map(|_| size)) diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 1a0aa49eb..26105115b 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -42,7 +42,7 @@ impl<F> FsWriter<F> { Self { target_path, tmp_path, - f, + f: Some(f), fut: None, } } @@ -80,7 +80,7 @@ impl oio::Write for FsWriter<tokio::fs::File> { let f = self.f.take().expect("FsWriter must be initialized"); let tmp_path = self.tmp_path.clone(); let target_path = self.target_path.clone(); - self.fut = Some(Box::pin(async { + self.fut = Some(Box::pin(async move { f.sync_all().await.map_err(parse_io_error)?; if let Some(tmp_path) = &tmp_path { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index ec3c3f0e2..5a759aa21 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -18,6 +18,7 @@ use async_trait::async_trait; use futures::future::BoxFuture; use futures::AsyncWriteExt; +use futures::FutureExt; use std::task::{ready, Context, Poll}; use super::backend::FtpBackend; @@ -66,7 +67,7 @@ impl oio::Write for FtpWriter { let path = self.path.clone(); let backend = self.backend.clone(); - let fut = async { + let fut = async move { let mut ftp_stream = backend.ftp_connect(Operation::Write).await?; let mut data_stream = ftp_stream.append_with_stream(&path).await?; data_stream.write_all(&bs).await.map_err(|err| { diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index c780c2710..582a150aa 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -64,21 +64,24 @@ impl oio::Write for GhacWriter { let size = bs.remaining(); let bs = bs.copy_to_bytes(size); - let fut = async { - let req = backend - .ghac_upload(cache_id, size as u64, AsyncBody::Bytes(bs)) - .await?; - - let resp = backend.client.send(req).await?; - - let res = if resp.status().is_success() { - resp.into_body().consume().await?; - Ok(size) - } else { - Err(parse_error(resp) - .await - .map(|err| err.with_operation("Backend::ghac_upload"))?) - }; + let fut = async move { + let res = async { + let req = backend + .ghac_upload(cache_id, size as u64, AsyncBody::Bytes(bs)) + .await?; + + let resp = backend.client.send(req).await?; + + if resp.status().is_success() { + resp.into_body().consume().await?; + Ok(size) + } else { + Err(parse_error(resp) + .await + .map(|err| err.with_operation("Backend::ghac_upload"))?) + } + } + .await; (backend, res) }; @@ -114,18 +117,21 @@ impl oio::Write for GhacWriter { let cache_id = self.cache_id; let size = self.size; - let fut = async { - let req = backend.ghac_commit(cache_id, size).await?; - let resp = backend.client.send(req).await?; - - let res = if resp.status().is_success() { - resp.into_body().consume().await?; - Ok(()) - } else { - Err(parse_error(resp) - .await - .map(|err| err.with_operation("Backend::ghac_commit"))?) - }; + let fut = async move { + let res = async { + let req = backend.ghac_commit(cache_id, size).await?; + let resp = backend.client.send(req).await?; + + if resp.status().is_success() { + resp.into_body().consume().await?; + Ok(size as usize) + } else { + Err(parse_error(resp) + .await + .map(|err| err.with_operation("Backend::ghac_commit"))?) + } + } + .await; (backend, res) }; diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index fb1ab0a3e..455e0fdbc 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -37,9 +37,11 @@ impl SftpWriter { #[async_trait] impl oio::Write for SftpWriter { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - Pin::new(&mut self.file) - .poll_write(cx, bs.chunk()) - .map_err(Error::from) + // Pin::new(&mut self.file) + // .poll_write(cx, bs.chunk()) + // .map_err(Error::from) + + todo!() } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { @@ -47,6 +49,7 @@ impl oio::Write for SftpWriter { } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Pin::new(&mut self.file).poll_flush(cx).map_err(Error::from) + // Pin::new(&mut self.file).poll_flush(cx).map_err(Error::from) + todo!() } }
