This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch write_can_multig in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 1fd07e3c68bd03d5ddadbcc600529fbd307ea9b8 Author: Xuanwo <[email protected]> AuthorDate: Wed Sep 13 08:36:46 2023 +0800 delay write for oneshot Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/write/one_shot_write.rs | 68 ++++++++++++++++++++-------- core/src/services/azblob/writer.rs | 6 +-- core/src/services/azdfs/writer.rs | 15 +++--- core/src/services/dropbox/writer.rs | 9 ++-- core/src/services/gdrive/writer.rs | 4 +- core/src/services/ipmfs/backend.rs | 6 +-- core/src/services/ipmfs/writer.rs | 5 +- core/src/services/onedrive/writer.rs | 4 +- core/src/services/supabase/writer.rs | 11 +++-- core/src/services/vercel_artifacts/writer.rs | 11 +++-- core/src/services/wasabi/writer.rs | 10 ++-- core/src/services/webdav/writer.rs | 10 ++-- core/src/services/webhdfs/writer.rs | 10 ++-- 13 files changed, 103 insertions(+), 66 deletions(-) diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index c56679e19..4efb3655b 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -20,7 +20,6 @@ use std::task::Context; use std::task::Poll; use async_trait::async_trait; -use bytes::Bytes; use futures::future::BoxFuture; use crate::raw::*; @@ -37,17 +36,18 @@ pub trait OneShotWrite: Send + Sync + Unpin + 'static { /// write_once write all data at once. /// /// Implementations should make sure that the data is written correctly at once. - async fn write_once(&self, bs: Bytes) -> Result<()>; + async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()>; } /// OneShotWrite is used to implement [`Write`] based on one shot. pub struct OneShotWriter<W: OneShotWrite> { state: State<W>, + buffer: Option<oio::ChunkedBytes>, } enum State<W> { Idle(Option<W>), - Write(BoxFuture<'static, (W, Result<usize>)>), + Write(BoxFuture<'static, (W, Result<()>)>), } /// # Safety @@ -60,45 +60,73 @@ impl<W: OneShotWrite> OneShotWriter<W> { pub fn new(inner: W) -> Self { Self { state: State::Idle(Some(inner)), + buffer: None, } } } #[async_trait] impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { + fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { + loop { + match &mut self.state { + State::Idle(_) => { + return match &self.buffer { + Some(_) => Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "OneShotWriter doesn't support multiple write", + ))), + None => { + let size = bs.remaining(); + let bs = bs.vectored_bytes(size); + self.buffer = Some(oio::ChunkedBytes::from_vec(bs)); + Poll::Ready(Ok(size)) + } + } + } + State::Write(_) => { + unreachable!("OneShotWriter must not go into State::Write during poll_write") + } + } + } + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { loop { match &mut self.state { State::Idle(w) => { let w = w.take().expect("writer must be valid"); - let size = bs.remaining(); - let bs = bs.bytes(size); - let fut = async move { - let res = w.write_once(bs).await; + match self.buffer.clone() { + Some(bs) => { + let fut = Box::pin(async move { + let res = w.write_once(&bs).await; - (w, res.map(|_| size)) - }; + (w, res) + }); + self.state = State::Write(fut); + } + None => { + let fut = Box::pin(async move { + let res = w.write_once(&"".as_bytes()).await; - self.state = State::Write(Box::pin(fut)); + (w, res) + }); + self.state = State::Write(fut); + } + }; } State::Write(fut) => { - let (w, size) = ready!(fut.as_mut().poll(cx)); + let (w, res) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(w)); - return Poll::Ready(size); + return Poll::Ready(res); } } } } fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { - Poll::Ready(Err(Error::new( - ErrorKind::Unsupported, - "OneShotWriter doesn't support abort since all content has been flushed", - ))) - } - - fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + self.buffer = None; Poll::Ready(Ok(())) } } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 029278ad5..301c9f733 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::AzblobCore; @@ -46,13 +45,14 @@ impl AzblobWriter { #[async_trait] impl oio::OneShotWrite for AzblobWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { + async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); let mut req = self.core.azblob_put_blob_request( &self.path, Some(bs.len() as u64), self.op.content_type(), self.op.cache_control(), - AsyncBody::Bytes(bs), + AsyncBody::ChunkedBytes(bs), )?; self.core.sign(&mut req).await?; diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 9e2a962f2..0424fe494 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -18,11 +18,11 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::AzdfsCore; use super::error::parse_error; +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -41,7 +41,7 @@ impl AzdfsWriter { #[async_trait] impl oio::OneShotWrite for AzdfsWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { let mut req = self.core.azdfs_create_request( &self.path, "file", @@ -66,11 +66,12 @@ impl oio::OneShotWrite for AzdfsWriter { } } - let size = bs.len(); - - let mut req = - self.core - .azdfs_update_request(&self.path, Some(size), AsyncBody::Bytes(bs))?; + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); + let mut req = self.core.azdfs_update_request( + &self.path, + Some(bs.len()), + AsyncBody::ChunkedBytes(bs), + )?; self.core.sign(&mut req).await?; diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 3a5c6cdd7..b5a4a69af 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::DropboxCore; @@ -40,16 +39,16 @@ impl DropboxWriter { #[async_trait] impl oio::OneShotWrite for DropboxWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); let resp = self .core .dropbox_update( &self.path, - Some(size), + Some(bs.len()), self.op.content_type(), - AsyncBody::Bytes(bs), + AsyncBody::ChunkedBytes(bs), ) .await?; let status = resp.status(); diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index e70f1754e..445ecb36f 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -23,6 +23,7 @@ use http::StatusCode; use super::core::GdriveCore; use super::error::parse_error; +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -88,7 +89,8 @@ impl GdriveWriter { #[async_trait] impl oio::OneShotWrite for GdriveWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { + let bs = bs.bytes(bs.remaining()); let size = bs.len(); if self.file_id.is_none() { self.write_create(size as u64, bs).await?; diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index ac70369c9..e7999767b 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -21,7 +21,6 @@ use std::str; use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::Request; use http::Response; use http::StatusCode; @@ -283,7 +282,7 @@ impl IpmfsBackend { pub async fn ipmfs_write( &self, path: &str, - body: Bytes, + body: oio::ChunkedBytes, ) -> Result<Response<IncomingAsyncBody>> { let p = build_rooted_abs_path(&self.root, path); @@ -293,7 +292,8 @@ impl IpmfsBackend { percent_encode_path(&p) ); - let multipart = Multipart::new().part(FormDataPart::new("data").content(body)); + let multipart = Multipart::new() + .part(FormDataPart::new("data").stream(body.len() as u64, Box::new(body))); let req: http::request::Builder = Request::post(url); let req = multipart.apply(req)?; diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index e6a32ac1b..96d9dab18 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -16,11 +16,11 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::IpmfsBackend; use super::error::parse_error; +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -38,7 +38,8 @@ impl IpmfsWriter { #[async_trait] impl oio::OneShotWrite for IpmfsWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); let resp = self.backend.ipmfs_write(&self.path, bs).await?; let status = resp.status(); diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 5ad92ac63..326035acb 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -23,6 +23,7 @@ use super::backend::OnedriveBackend; use super::error::parse_error; use super::graph_model::OneDriveUploadSessionCreationRequestBody; use super::graph_model::OneDriveUploadSessionCreationResponseBody; +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -45,7 +46,8 @@ impl OneDriveWriter { #[async_trait] impl oio::OneShotWrite for OneDriveWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { + let bs = bs.bytes(bs.remaining()); let size = bs.len(); if size <= Self::MAX_SIMPLE_SIZE { diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index c7fc6c8f8..b6929c4f5 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -18,11 +18,11 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::*; use super::error::parse_error; +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -45,13 +45,14 @@ impl SupabaseWriter { #[async_trait] impl oio::OneShotWrite for SupabaseWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); + let mut req = self.core.supabase_upload_object_request( &self.path, - Some(size), + Some(bs.len()), self.op.content_type(), - AsyncBody::Bytes(bs), + AsyncBody::ChunkedBytes(bs), )?; self.core.sign(&mut req)?; diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index f62d76cf5..058e57041 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::VercelArtifactsBackend; @@ -43,12 +42,16 @@ impl VercelArtifactsWriter { #[async_trait] impl oio::OneShotWrite for VercelArtifactsWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> { + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); let resp = self .backend - .vercel_artifacts_put(self.path.as_str(), size as u64, AsyncBody::Bytes(bs)) + .vercel_artifacts_put( + self.path.as_str(), + bs.len() as u64, + AsyncBody::ChunkedBytes(bs), + ) .await?; let status = resp.status(); diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index f9e27334e..cd76bf82c 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -18,11 +18,11 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::*; use super::error::parse_error; +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -41,18 +41,18 @@ impl WasabiWriter { #[async_trait] impl oio::OneShotWrite for WasabiWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); let resp = self .core .put_object( &self.path, - Some(size), + Some(bs.len()), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - AsyncBody::Bytes(bs), + AsyncBody::ChunkedBytes(bs), ) .await?; diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index 5b6ea319f..42de4fc0e 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -16,11 +16,11 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::WebdavBackend; use super::error::parse_error; +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -39,17 +39,17 @@ impl WebdavWriter { #[async_trait] impl oio::OneShotWrite for WebdavWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); let resp = self .backend .webdav_put( &self.path, - Some(size as u64), + Some(bs.len() as u64), self.op.content_type(), self.op.content_disposition(), - AsyncBody::Bytes(bs), + AsyncBody::ChunkedBytes(bs), ) .await?; diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index b323c0173..38cd6d577 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -16,11 +16,11 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::WebhdfsBackend; use super::error::parse_error; +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -39,16 +39,16 @@ impl WebhdfsWriter { #[async_trait] impl oio::OneShotWrite for WebhdfsWriter { - async fn write_once(&self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining())); let req = self .backend .webhdfs_create_object_request( &self.path, - Some(size), + Some(bs.len()), self.op.content_type(), - AsyncBody::Bytes(bs), + AsyncBody::ChunkedBytes(bs), ) .await?;
