This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit a5e53461769aa6b357f50bf7c7a078065d674cb6 Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 16:30:06 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/src/services/dropbox/writer.rs | 20 ++++++----------- core/src/services/gdrive/writer.rs | 31 ++++++--------------------- core/src/services/onedrive/writer.rs | 19 +++++------------ core/src/services/vercel_artifacts/writer.rs | 24 ++++++--------------- core/src/services/webdav/writer.rs | 32 ++++++++-------------------- core/src/services/webhdfs/writer.rs | 20 ++++++----------- 6 files changed, 39 insertions(+), 107 deletions(-) diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index c7887a7d7..5e3e19a92 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use http::StatusCode; use super::core::DropboxCore; @@ -37,10 +38,9 @@ impl DropboxWriter { } } -#[async_trait] -impl oio::Write for DropboxWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); +impl oio::OneShotWrite for DropboxWriter { + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); let resp = self .core @@ -48,24 +48,16 @@ impl oio::Write for DropboxWriter { &self.path, Some(size), self.op.content_type(), - AsyncBody::Bytes(bs.copy_to_bytes(size)), + AsyncBody::Bytes(bs), ) .await?; let status = resp.status(); match status { StatusCode::OK => { resp.into_body().consume().await?; - Ok(size) + Ok(()) } _ => Err(parse_error(resp).await?), } } - - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } } diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index b733863a8..f2f0b4e1c 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -49,7 +49,7 @@ impl GdriveWriter { /// /// This is used for small objects. /// And should overwrite the object if it already exists. - pub async fn write_create(&mut self, size: u64, body: Bytes) -> Result<()> { + pub async fn write_create(&self, size: u64, body: Bytes) -> Result<()> { let resp = self .core .gdrive_upload_simple_request(&self.path, size, body) @@ -59,13 +59,7 @@ impl GdriveWriter { match status { StatusCode::OK | StatusCode::CREATED => { - let bs = resp.into_body().bytes().await?; - - let file = serde_json::from_slice::<GdriveFile>(&bs) - .map_err(new_json_deserialize_error)?; - - self.file_id = Some(file.id); - + resp.into_body().consume().await?; Ok(()) } _ => Err(parse_error(resp).await?), @@ -93,26 +87,15 @@ impl GdriveWriter { } } -#[async_trait] -impl oio::Write for GdriveWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); +impl oio::OneShotWrite for GdriveWriter { + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); if self.file_id.is_none() { - self.write_create(size as u64, bs.copy_to_bytes(size)) - .await?; + self.write_create(size as u64, bs).await?; } else { - self.write_overwrite(size as u64, bs.copy_to_bytes(size)) - .await?; + self.write_overwrite(size as u64, bs).await?; } - Ok(size) - } - - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 46aa50a21..76fc734d1 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -23,6 +23,7 @@ use super::backend::OnedriveBackend; use super::error::parse_error; use super::graph_model::OneDriveUploadSessionCreationRequestBody; use super::graph_model::OneDriveUploadSessionCreationResponseBody; +use crate::raw::oio::OneShotWriter; use crate::raw::*; use crate::*; @@ -43,11 +44,9 @@ impl OneDriveWriter { } } -#[async_trait] -impl oio::Write for OneDriveWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); - let bs = bs.copy_to_bytes(size); +impl oio::OneShotWrite for OneDriveWriter { + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); if size <= Self::MAX_SIMPLE_SIZE { self.write_simple(bs).await?; @@ -55,20 +54,12 @@ impl oio::Write for OneDriveWriter { self.write_chunked(bs).await?; } - Ok(size) - } - - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { Ok(()) } } impl OneDriveWriter { - async fn write_simple(&mut self, bs: Bytes) -> Result<()> { + async fn write_simple(&self, bs: Bytes) -> Result<()> { let resp = self .backend .onedrive_upload_simple( diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 6579374a7..596c933cf 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -16,6 +16,7 @@ // under the License. use async_trait::async_trait; +use bytes::Bytes; use http::StatusCode; use super::backend::VercelArtifactsBackend; @@ -36,18 +37,13 @@ impl VercelArtifactsWriter { } } -#[async_trait] -impl oio::Write for VercelArtifactsWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); +impl oio::OneShotWrite for VercelArtifactsWriter { + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); let resp = self .backend - .vercel_artifacts_put( - self.path.as_str(), - self.op.content_length().unwrap(), - AsyncBody::Bytes(bs.copy_to_bytes(size)), - ) + .vercel_artifacts_put(self.path.as_str(), size as u64, AsyncBody::Bytes(bs)) .await?; let status = resp.status(); @@ -55,17 +51,9 @@ impl oio::Write for VercelArtifactsWriter { match status { StatusCode::OK | StatusCode::ACCEPTED => { resp.into_body().consume().await?; - Ok(size) + Ok(()) } _ => Err(parse_error(resp).await?), } } - - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } } diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index eaff3d8b4..5b6ea319f 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -16,6 +16,7 @@ // under the License. use async_trait::async_trait; +use bytes::Bytes; use http::StatusCode; use super::backend::WebdavBackend; @@ -34,16 +35,21 @@ impl WebdavWriter { pub fn new(backend: WebdavBackend, op: OpWrite, path: String) -> Self { WebdavWriter { backend, op, path } } +} + +#[async_trait] +impl oio::OneShotWrite for WebdavWriter { + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); - async fn write_oneshot(&mut self, size: u64, body: AsyncBody) -> Result<()> { let resp = self .backend .webdav_put( &self.path, - Some(size), + Some(size as u64), self.op.content_type(), self.op.content_disposition(), - body, + AsyncBody::Bytes(bs), ) .await?; @@ -58,23 +64,3 @@ impl WebdavWriter { } } } - -#[async_trait] -impl oio::Write for WebdavWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); - - self.write_oneshot(size as u64, AsyncBody::Bytes(bs.copy_to_bytes(size))) - .await?; - - Ok(size) - } - - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } -} diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 0c7a9e10b..22901b496 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -16,6 +16,7 @@ // under the License. use async_trait::async_trait; +use bytes::Bytes; use http::StatusCode; use super::backend::WebhdfsBackend; @@ -36,10 +37,9 @@ impl WebhdfsWriter { } } -#[async_trait] -impl oio::Write for WebhdfsWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); +impl oio::OneShotWrite for WebhdfsWriter { + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); let req = self .backend @@ -47,7 +47,7 @@ impl oio::Write for WebhdfsWriter { &self.path, Some(size), self.op.content_type(), - AsyncBody::Bytes(bs.copy_to_bytes(size)), + AsyncBody::Bytes(bs), ) .await?; @@ -57,17 +57,9 @@ impl oio::Write for WebhdfsWriter { match status { StatusCode::CREATED | StatusCode::OK => { resp.into_body().consume().await?; - Ok(size) + Ok(()) } _ => Err(parse_error(resp).await?), } } - - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } }
