This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-multipart in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit b8874de8ef9b39768e650ae4ff394a55061e58f7 Author: Xuanwo <[email protected]> AuthorDate: Thu Aug 31 14:18:54 2023 +0800 refactor: Merge MultipartUpload and OneshotWrite for supported services Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/write/multipart_upload_write.rs | 77 +++++++++++++++++------- core/src/services/cos/backend.rs | 6 +- core/src/services/cos/writer.rs | 17 ++---- core/src/services/obs/backend.rs | 6 +- core/src/services/obs/writer.rs | 31 ++++++++-- core/src/services/oss/backend.rs | 6 +- core/src/services/oss/writer.rs | 33 ++++++++-- core/src/services/s3/backend.rs | 6 +- core/src/services/s3/writer.rs | 28 ++++++++- 9 files changed, 148 insertions(+), 62 deletions(-) diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index c013b24c3..81d306681 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -32,6 +32,11 @@ use crate::*; /// - Expose `MultipartUploadWriter` as `Accessor::Writer` #[async_trait] pub trait MultipartUploadWrite: Send + Sync + Unpin { + /// write_once write all data at once. + /// + /// Implementations should make sure that the data is written correctly at once. + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>; + /// initiate_part will call start a multipart upload and return the upload id. /// /// MultipartUploadWriter will call this when: @@ -89,6 +94,8 @@ pub struct MultipartUploadWriter<W: MultipartUploadWrite> { upload_id: Option<String>, parts: Vec<MultipartUploadPart>, + + cache: Option<(u64, AsyncBody)>, } impl<W: MultipartUploadWrite> MultipartUploadWriter<W> { @@ -99,6 +106,7 @@ impl<W: MultipartUploadWrite> MultipartUploadWriter<W> { upload_id: None, parts: Vec::new(), + cache: None, } } @@ -121,47 +129,74 @@ where W: MultipartUploadWrite, { async fn write(&mut self, bs: Bytes) -> Result<()> { - let upload_id = self.upload_id().await?; + let (size, body) = match self.cache.take() { + Some(cache) => { + self.cache = Some((bs.len() as u64, AsyncBody::Bytes(bs))); + cache + } + None => { + self.cache = Some((bs.len() as u64, AsyncBody::Bytes(bs))); + return Ok(()); + } + }; - let size = bs.len(); + let upload_id = self.upload_id().await?; self.inner - .write_part( - &upload_id, - self.parts.len(), - size as u64, - AsyncBody::Bytes(bs), - ) + .write_part(&upload_id, self.parts.len(), size, body) .await .map(|v| self.parts.push(v)) } async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + let (size, body) = match self.cache.take() { + Some(cache) => { + self.cache = Some((size, AsyncBody::Stream(s))); + cache + } + None => { + self.cache = Some((size, AsyncBody::Stream(s))); + return Ok(()); + } + }; + let upload_id = self.upload_id().await?; self.inner - .write_part(&upload_id, self.parts.len(), size, AsyncBody::Stream(s)) + .write_part(&upload_id, self.parts.len(), size, body) .await .map(|v| self.parts.push(v)) } async fn close(&mut self) -> Result<()> { - let upload_id = if let Some(upload_id) = &self.upload_id { - upload_id - } else { - return Ok(()); - }; + match self.upload_id.take() { + Some(upload_id) => { + if let Some((size, body)) = self.cache.take() { + self.inner + .write_part(&upload_id, self.parts.len(), size, body) + .await + .map(|v| self.parts.push(v))?; + } + + self.inner.complete_part(&upload_id, &self.parts).await + } + None => { + if let Some((size, body)) = self.cache.take() { + self.inner.write_once(size, body).await?; + } - self.inner.complete_part(upload_id, &self.parts).await + Ok(()) + } + } } async fn abort(&mut self) -> Result<()> { - let upload_id = if let Some(upload_id) = &self.upload_id { - upload_id - } else { - return Ok(()); - }; + // Cleanup existing cache. + self.cache = None; - self.inner.abort_part(upload_id).await + match self.upload_id.take() { + Some(upload_id) => self.inner.abort_part(&upload_id).await, + None => Ok(()), + } } } diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 3360d9dd5..13e9b29ca 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -338,11 +338,9 @@ impl Accessor for CosBackend { let writer = CosWriter::new(self.core.clone(), path, args.clone()); let w = if args.append() { - CosWriters::Three(oio::AppendObjectWriter::new(writer)) - } else if args.content_length().is_some() { - CosWriters::One(oio::OneShotWriter::new(writer)) + CosWriters::Two(oio::AppendObjectWriter::new(writer)) } else { - CosWriters::Two(oio::MultipartUploadWriter::new(writer)) + CosWriters::One(oio::MultipartUploadWriter::new(writer)) }; let w = if let Some(buffer_size) = args.buffer_size() { diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index af2a6ebd0..bd29be2eb 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -23,15 +23,11 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; -use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; -pub type CosWriters = oio::ThreeWaysWriter< - oio::OneShotWriter<CosWriter>, - oio::MultipartUploadWriter<CosWriter>, - oio::AppendObjectWriter<CosWriter>, ->; +pub type CosWriters = + oio::TwoWaysWriter<oio::MultipartUploadWriter<CosWriter>, oio::AppendObjectWriter<CosWriter>>; pub struct CosWriter { core: Arc<CosCore>, @@ -51,15 +47,15 @@ impl CosWriter { } #[async_trait] -impl oio::OneShotWrite for CosWriter { - async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { +impl oio::MultipartUploadWrite for CosWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self.core.cos_put_object_request( &self.path, Some(size), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - AsyncBody::Stream(stream), + body, )?; self.core.sign(&mut req).await?; @@ -76,10 +72,7 @@ impl oio::OneShotWrite for CosWriter { _ => Err(parse_error(resp).await?), } } -} -#[async_trait] -impl oio::MultipartUploadWrite for CosWriter { async fn initiate_part(&self) -> Result<String> { let resp = self .core diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 6600ddd9b..d8b4a33aa 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -376,11 +376,9 @@ impl Accessor for ObsBackend { let writer = ObsWriter::new(self.core.clone(), path, args.clone()); let w = if args.append() { - ObsWriters::Three(oio::AppendObjectWriter::new(writer)) - } else if args.content_length().is_some() { - ObsWriters::One(oio::OneShotWriter::new(writer)) + ObsWriters::Two(oio::AppendObjectWriter::new(writer)) } else { - ObsWriters::Two(oio::MultipartUploadWriter::new(writer)) + ObsWriters::One(oio::MultipartUploadWriter::new(writer)) }; let w = if let Some(buffer_size) = args.buffer_size() { diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index d3b1e119f..7445613be 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -28,11 +28,8 @@ use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; -pub type ObsWriters = oio::ThreeWaysWriter< - oio::OneShotWriter<ObsWriter>, - oio::MultipartUploadWriter<ObsWriter>, - oio::AppendObjectWriter<ObsWriter>, ->; +pub type ObsWriters = + oio::TwoWaysWriter<oio::MultipartUploadWriter<ObsWriter>, oio::AppendObjectWriter<ObsWriter>>; pub struct ObsWriter { core: Arc<ObsCore>, @@ -80,6 +77,30 @@ impl oio::OneShotWrite for ObsWriter { #[async_trait] impl oio::MultipartUploadWrite for ObsWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { + let mut req = self.core.obs_put_object_request( + &self.path, + Some(size), + self.op.content_type(), + self.op.cache_control(), + body, + )?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + async fn initiate_part(&self) -> Result<String> { let resp = self .core diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index ee1131ee1..2ea5fe9e6 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -474,11 +474,9 @@ impl Accessor for OssBackend { let writer = OssWriter::new(self.core.clone(), path, args.clone()); let w = if args.append() { - OssWriters::Three(oio::AppendObjectWriter::new(writer)) - } else if args.content_length().is_some() { - OssWriters::One(oio::OneShotWriter::new(writer)) + OssWriters::Two(oio::AppendObjectWriter::new(writer)) } else { - OssWriters::Two(oio::MultipartUploadWriter::new(writer)) + OssWriters::One(oio::MultipartUploadWriter::new(writer)) }; let w = if let Some(buffer_size) = args.buffer_size() { diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 27aa09011..fd759df88 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -27,11 +27,8 @@ use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; -pub type OssWriters = oio::ThreeWaysWriter< - oio::OneShotWriter<OssWriter>, - oio::MultipartUploadWriter<OssWriter>, - oio::AppendObjectWriter<OssWriter>, ->; +pub type OssWriters = + oio::TwoWaysWriter<oio::MultipartUploadWriter<OssWriter>, oio::AppendObjectWriter<OssWriter>>; pub struct OssWriter { core: Arc<OssCore>, @@ -81,6 +78,32 @@ impl oio::OneShotWrite for OssWriter { #[async_trait] impl oio::MultipartUploadWrite for OssWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { + let mut req = self.core.oss_put_object_request( + &self.path, + Some(size), + self.op.content_type(), + self.op.content_disposition(), + self.op.cache_control(), + body, + false, + )?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + async fn initiate_part(&self) -> Result<String> { let resp = self .core diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 600fac3f3..36344da80 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -982,11 +982,7 @@ impl Accessor for S3Backend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let writer = S3Writer::new(self.core.clone(), path, args.clone()); - let w = if args.content_length().is_some() { - S3Writers::One(oio::OneShotWriter::new(writer)) - } else { - S3Writers::Two(oio::MultipartUploadWriter::new(writer)) - }; + let w = oio::MultipartUploadWriter::new(writer); let w = if let Some(buffer_size) = args.buffer_size() { let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index a27341d71..6bda47cdb 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -27,8 +27,7 @@ use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; -pub type S3Writers = - oio::TwoWaysWriter<oio::OneShotWriter<S3Writer>, oio::MultipartUploadWriter<S3Writer>>; +pub type S3Writers = oio::MultipartUploadWriter<S3Writer>; pub struct S3Writer { core: Arc<S3Core>, @@ -77,6 +76,31 @@ impl oio::OneShotWrite for S3Writer { #[async_trait] impl oio::MultipartUploadWrite for S3Writer { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { + let mut req = self.core.s3_put_object_request( + &self.path, + Some(size), + self.op.content_type(), + self.op.content_disposition(), + self.op.cache_control(), + body, + )?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + async fn initiate_part(&self) -> Result<String> { let resp = self .core
