This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch buffer-refactor in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 8f2ff0f3663e01328c1c73d4ff8aa5bef2e1a0a4 Author: Xuanwo <[email protected]> AuthorDate: Tue Aug 22 17:08:01 2023 +0800 refactor Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/write/append_object_write.rs | 79 ++------------------------- core/src/raw/oio/write/at_least_buf_write.rs | 61 +++++++++++++++------ core/src/raw/ops.rs | 21 +++++++ core/src/services/cos/backend.rs | 53 +++++++----------- core/src/services/cos/core.rs | 1 - core/src/services/cos/writer.rs | 6 ++ core/src/services/obs/backend.rs | 55 +++++++------------ core/src/services/obs/core.rs | 1 - core/src/services/obs/writer.rs | 6 ++ core/src/services/oss/backend.rs | 39 ++++++------- core/src/services/oss/core.rs | 1 - core/src/services/oss/writer.rs | 6 ++ core/src/services/s3/backend.rs | 42 +++++++++----- core/src/services/s3/core.rs | 1 - core/src/services/s3/writer.rs | 11 ++-- core/src/types/operator/operator_futures.rs | 26 +++++++++ core/tests/behavior/write.rs | 4 +- 17 files changed, 211 insertions(+), 202 deletions(-) diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index 5d6eda9da..07fa546cc 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -22,8 +22,6 @@ use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; -const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024; - /// AppendObjectWrite is used to implement [`Write`] based on append /// object. By implementing AppendObjectWrite, services don't need to /// care about the details of buffering and uploading parts. @@ -53,8 +51,6 @@ pub struct AppendObjectWriter<W: AppendObjectWrite> { inner: W, offset: Option<u64>, - buffer: oio::VectorCursor, - buffer_size: usize, } impl<W: AppendObjectWrite> AppendObjectWriter<W> { @@ -63,24 +59,9 @@ impl<W: AppendObjectWrite> AppendObjectWriter<W> { Self { inner, offset: None, - buffer: oio::VectorCursor::new(), - buffer_size: DEFAULT_WRITE_MIN_SIZE, } } - /// Configure the write_min_size. - /// - /// write_min_size is used to control the size of internal buffer. - /// - /// AppendObjectWriter will flush the buffer to storage when - /// the size of buffer is larger than write_min_size. - /// - /// This value is default to 8 MiB. - pub fn with_write_min_size(mut self, v: usize) -> Self { - self.buffer_size = v; - self - } - async fn offset(&mut self) -> Result<u64> { if let Some(offset) = self.offset { return Ok(offset); @@ -101,72 +82,24 @@ where async fn write(&mut self, bs: Bytes) -> Result<()> { let offset = self.offset().await?; - // Ignore empty bytes - if bs.is_empty() { - return Ok(()); - } - - self.buffer.push(bs); - // Return directly if the buffer is not full - if self.buffer.len() <= self.buffer_size { - return Ok(()); - } - - let bs = self.buffer.peak_all(); - let size = bs.len(); + let size = bs.len() as u64; - match self - .inner - .append(offset, size as u64, AsyncBody::Bytes(bs)) + self.inner + .append(offset, size, AsyncBody::Bytes(bs)) .await - { - Ok(_) => { - self.buffer.take(size); - self.offset = Some(offset + size as u64); - Ok(()) - } - Err(e) => { - // If the upload fails, we should pop the given bs to make sure - // write is re-enter safe. - self.buffer.pop(); - Err(e) - } - } + .map(|_| self.offset = Some(offset + size)) } async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { - if !self.buffer.is_empty() { - return Err(Error::new( - ErrorKind::InvalidInput, - "Writer::sink should not be used mixed with existing buffer", - )); - } - let offset = self.offset().await?; self.inner .append(offset, size, AsyncBody::Stream(s)) - .await?; - self.offset = Some(offset + size); - - Ok(()) + .await + .map(|_| self.offset = Some(offset + size)) } async fn close(&mut self) -> Result<()> { - // Make sure internal buffer has been flushed. - if !self.buffer.is_empty() { - let bs = self.buffer.peak_exact(self.buffer.len()); - - let offset = self.offset().await?; - let size = bs.len() as u64; - self.inner - .append(offset, size, AsyncBody::Bytes(bs)) - .await?; - - self.buffer.clear(); - self.offset = Some(offset + size); - } - Ok(()) } diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 7a295c313..a86fb2c05 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -27,49 +27,76 @@ use bytes::Bytes; pub struct AtLeastBufWriter<W: oio::Write> { inner: W, + /// The total size of the data. + /// + /// If the total size is known, we will write to underlying storage directly without buffer it + /// when possible. + total_size: Option<u64>, + /// The size for buffer, we will flush the underlying storage if the buffer is full. - size: usize, - buf: oio::ChunkedCursor, + buffer_size: usize, + buffer: oio::ChunkedCursor, } impl<W: oio::Write> AtLeastBufWriter<W> { /// Create a new at least buf writer. - pub fn new(inner: W, size: usize) -> Self { + pub fn new(inner: W, buffer_size: usize) -> Self { Self { inner, - size, - buf: oio::ChunkedCursor::new(), + total_size: None, + buffer_size, + buffer: oio::ChunkedCursor::new(), } } + + /// Configure the total size for writer. + pub fn with_total_size(mut self, total_size: Option<u64>) -> Self { + self.total_size = total_size; + self + } } #[async_trait] impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { async fn write(&mut self, bs: Bytes) -> Result<()> { + // If total size is known and equals to given bytes, we can write it directly. + if let Some(total_size) = self.total_size { + if total_size == bs.len() as u64 { + return self.inner.write(bs).await; + } + } + // Push the bytes into the buffer if the buffer is not full. - if self.buf.len() + bs.len() <= self.size { - self.buf.push(bs); + if self.buffer.len() + bs.len() <= self.buffer_size { + self.buffer.push(bs); return Ok(()); } - let mut buf = self.buf.clone(); + let mut buf = self.buffer.clone(); buf.push(bs); self.inner .sink(buf.len() as u64, Box::new(buf)) .await // Clear buffer if the write is successful. - .map(|_| self.buf.clear()) + .map(|_| self.buffer.clear()) } async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + // If total size is known and equals to given stream, we can write it directly. + if let Some(total_size) = self.total_size { + if total_size == size { + return self.inner.sink(size, s).await; + } + } + // Push the bytes into the buffer if the buffer is not full. - if self.buf.len() as u64 + size <= self.size as u64 { - self.buf.push(s.collect().await?); + if self.buffer.len() as u64 + size <= self.buffer_size as u64 { + self.buffer.push(s.collect().await?); return Ok(()); } - let buf = self.buf.clone(); + let buf = self.buffer.clone(); let buffer_size = buf.len() as u64; let stream = buf.chain(s); @@ -77,20 +104,20 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { .sink(buffer_size + size, Box::new(stream)) .await // Clear buffer if the write is successful. - .map(|_| self.buf.clear()) + .map(|_| self.buffer.clear()) } async fn abort(&mut self) -> Result<()> { - self.buf.clear(); + self.buffer.clear(); self.inner.abort().await } async fn close(&mut self) -> Result<()> { - if !self.buf.is_empty() { + if !self.buffer.is_empty() { self.inner - .sink(self.buf.len() as u64, Box::new(self.buf.clone())) + .sink(self.buffer.len() as u64, Box::new(self.buffer.clone())) .await?; - self.buf.clear(); + self.buffer.clear(); } self.inner.close().await?; diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index 2617768da..0404ed468 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -406,6 +406,7 @@ impl OpStat { pub struct OpWrite { append: bool, + buffer_size: Option<usize>, content_length: Option<u64>, content_type: Option<String>, content_disposition: Option<String>, @@ -439,6 +440,26 @@ impl OpWrite { self } + /// Get the buffer size from op. + /// + /// The buffer size is used by service to decide the buffer size of the underlying writer. + pub fn buffer_size(&self) -> Option<usize> { + self.buffer_size + } + + /// Set the buffer size of op. + /// + /// If buffer size is set, the data will be buffered by the underlying writer. + /// + /// ## NOTE + /// + /// Service could have their own minimum buffer size while perform write operations like + /// multipart uploads. So the buffer size may be larger than the given buffer size. + pub fn with_buffer_size(mut self, buffer_size: usize) -> Self { + self.buffer_size = Some(buffer_size); + self + } + /// Get the content length from op. /// /// The content length is the total length of the data to be written. diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 851e79e53..66aa65449 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::cmp::max; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -32,9 +33,13 @@ use super::error::parse_error; use super::pager::CosPager; use super::writer::CosWriter; use crate::raw::*; +use crate::services::cos::writer::CosWriters; use crate::*; -const DEFAULT_WRITE_MIN_SIZE: usize = 1024 * 1024; +/// The minimum multipart size of COS is 1 MiB. +/// +/// ref: <https://www.tencentcloud.com/document/product/436/14112> +const MINIMUM_MULTIPART_SIZE: usize = 1024 * 1024; /// Tencent-Cloud COS services support. #[doc = include_str!("docs.md")] @@ -47,10 +52,6 @@ pub struct CosBuilder { bucket: Option<String>, http_client: Option<HttpClient>, - /// the part size of cos multipart upload, which should be 1 MB to 5 GB. - /// There is no minimum size limit on the last part of your multipart upload - write_min_size: Option<usize>, - disable_config_load: bool, } @@ -125,14 +126,6 @@ impl CosBuilder { self } - /// set the minimum size of unsized write, it should be greater than 1 MB. - /// Reference: [Upload Part | Tencent Cloud](https://www.tencentcloud.com/document/product/436/7750) - pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self { - self.write_min_size = Some(write_min_size); - - self - } - /// Disable config load so that opendal will not load config from /// environment. /// @@ -168,8 +161,6 @@ impl Builder for CosBuilder { map.get("endpoint").map(|v| builder.endpoint(v)); map.get("secret_id").map(|v| builder.secret_id(v)); map.get("secret_key").map(|v| builder.secret_key(v)); - map.get("write_min_size") - .map(|v| builder.write_min_size(v.parse().expect("input must be a number"))); builder } @@ -233,14 +224,6 @@ impl Builder for CosBuilder { let cred_loader = TencentCosCredentialLoader::new(client.client(), cfg); let signer = TencentCosSigner::new(); - let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE); - if write_min_size < 1024 * 1024 { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "The write minimum buffer size is misconfigured", - ) - .with_context("service", Scheme::Cos)); - } debug!("backend build finished"); Ok(CosBackend { @@ -251,7 +234,6 @@ impl Builder for CosBuilder { signer, loader: cred_loader, client, - write_min_size, }), }) } @@ -267,10 +249,7 @@ pub struct CosBackend { impl Accessor for CosBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::TwoWaysWriter< - oio::MultipartUploadWriter<CosWriter>, - oio::AppendObjectWriter<CosWriter>, - >; + type Writer = oio::TwoWaysWriter<CosWriters, oio::AtLeastBufWriter<CosWriters>>; type BlockingWriter = (); type Pager = CosPager; type BlockingPager = (); @@ -358,18 +337,26 @@ impl Accessor for CosBackend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let writer = CosWriter::new(self.core.clone(), path, args.clone()); - let tw = if args.append() { + let w = if args.append() { + CosWriters::Three(oio::AppendObjectWriter::new(writer)) + } else if args.content_length().is_some() { + CosWriters::One(oio::OneShotWriter::new(writer)) + } else { + CosWriters::Two(oio::MultipartUploadWriter::new(writer)) + }; + + let w = if let Some(buffer_size) = args.buffer_size() { + let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); + let w = - oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size); + oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); oio::TwoWaysWriter::Two(w) } else { - let w = oio::MultipartUploadWriter::new(writer); - oio::TwoWaysWriter::One(w) }; - return Ok((RpWrite::default(), tw)); + Ok((RpWrite::default(), w)) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs index e288a447c..490dada6e 100644 --- a/core/src/services/cos/core.rs +++ b/core/src/services/cos/core.rs @@ -45,7 +45,6 @@ pub struct CosCore { pub signer: TencentCosSigner, pub loader: TencentCosCredentialLoader, pub client: HttpClient, - pub write_min_size: usize, } impl Debug for CosCore { diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index cdcbb9b18..af2a6ebd0 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -27,6 +27,12 @@ use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; +pub type CosWriters = oio::ThreeWaysWriter< + oio::OneShotWriter<CosWriter>, + oio::MultipartUploadWriter<CosWriter>, + oio::AppendObjectWriter<CosWriter>, +>; + pub struct CosWriter { core: Arc<CosCore>, diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index dc7676d4a..78ecf0976 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::cmp::max; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -32,8 +33,14 @@ use super::error::parse_error; use super::pager::ObsPager; use super::writer::ObsWriter; use crate::raw::*; +use crate::services::obs::writer::ObsWriters; use crate::*; +/// The minimum multipart size of OBS is 5 MiB. +/// +/// ref: <https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html> +const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024; + /// Huawei Cloud OBS services support. /// /// # Capabilities @@ -91,9 +98,6 @@ use crate::*; /// Ok(()) /// } /// ``` - -const DEFAULT_WRITE_MIN_SIZE: usize = 100 * 1024; - /// Huawei-Cloud Object Storage Service (OBS) support #[derive(Default, Clone)] pub struct ObsBuilder { @@ -103,9 +107,6 @@ pub struct ObsBuilder { secret_access_key: Option<String>, bucket: Option<String>, http_client: Option<HttpClient>, - /// the part size of obs multipart upload, which should be 100 KiB to 5 GiB. - /// There is no minimum size limit on the last part of your multipart upload - write_min_size: Option<usize>, } impl Debug for ObsBuilder { @@ -190,14 +191,6 @@ impl ObsBuilder { self.http_client = Some(client); self } - - /// set the minimum size of unsized write, it should be greater than 100 KB. - /// Reference: [Huawei Obs multipart upload limits](https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0099.html) - pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self { - self.write_min_size = Some(write_min_size); - - self - } } impl Builder for ObsBuilder { @@ -213,8 +206,6 @@ impl Builder for ObsBuilder { map.get("access_key_id").map(|v| builder.access_key_id(v)); map.get("secret_access_key") .map(|v| builder.secret_access_key(v)); - map.get("write_min_size") - .map(|v| builder.write_min_size(v.parse().expect("input must be a number"))); builder } @@ -296,14 +287,6 @@ impl Builder for ObsBuilder { &endpoint } }); - let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE); - if write_min_size < 100 * 1024 { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "The write minimum buffer size is misconfigured", - ) - .with_context("service", Scheme::Obs)); - } debug!("backend build finished"); Ok(ObsBackend { @@ -314,7 +297,6 @@ impl Builder for ObsBuilder { signer, loader, client, - write_min_size, }), }) } @@ -330,10 +312,7 @@ pub struct ObsBackend { impl Accessor for ObsBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::TwoWaysWriter< - oio::MultipartUploadWriter<ObsWriter>, - oio::AppendObjectWriter<ObsWriter>, - >; + type Writer = oio::TwoWaysWriter<ObsWriters, oio::AtLeastBufWriter<ObsWriters>>; type BlockingWriter = (); type Pager = ObsPager; type BlockingPager = (); @@ -452,18 +431,26 @@ impl Accessor for ObsBackend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let writer = ObsWriter::new(self.core.clone(), path, args.clone()); - let tw = if args.append() { + let w = if args.append() { + ObsWriters::Three(oio::AppendObjectWriter::new(writer)) + } else if args.content_length().is_some() { + ObsWriters::One(oio::OneShotWriter::new(writer)) + } else { + ObsWriters::Two(oio::MultipartUploadWriter::new(writer)) + }; + + let w = if let Some(buffer_size) = args.buffer_size() { + let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); + let w = - oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size); + oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); oio::TwoWaysWriter::Two(w) } else { - let w = oio::MultipartUploadWriter::new(writer); - oio::TwoWaysWriter::One(w) }; - return Ok((RpWrite::default(), tw)); + Ok((RpWrite::default(), w)) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs index 52604fd50..3d8ba1daf 100644 --- a/core/src/services/obs/core.rs +++ b/core/src/services/obs/core.rs @@ -45,7 +45,6 @@ pub struct ObsCore { pub signer: HuaweicloudObsSigner, pub loader: HuaweicloudObsCredentialLoader, pub client: HttpClient, - pub write_min_size: usize, } impl Debug for ObsCore { diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index 893a098d7..f8078a955 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -27,6 +27,12 @@ use crate::raw::oio::{MultipartUploadPart, Streamer}; use crate::raw::*; use crate::*; +pub type ObsWriters = oio::ThreeWaysWriter< + oio::OneShotWriter<ObsWriter>, + oio::MultipartUploadWriter<ObsWriter>, + oio::AppendObjectWriter<ObsWriter>, +>; + pub struct ObsWriter { core: Arc<ObsCore>, diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 029b3b52b..1aef92b77 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::cmp::max; use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; @@ -35,9 +36,13 @@ use super::error::parse_error; use super::pager::OssPager; use super::writer::OssWriter; use crate::raw::*; +use crate::services::oss::writer::OssWriters; use crate::*; -const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024; +/// The minimum multipart size of OSS is 100 KiB. +/// +/// ref: <https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12> +const MINIMUM_MULTIPART_SIZE: usize = 100 * 1024; const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000; /// Aliyun Object Storage Service (OSS) support @@ -343,14 +348,6 @@ impl Builder for OssBuilder { let signer = AliyunOssSigner::new(bucket); - let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE); - if write_min_size < 5 * 1024 * 1024 { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "The write minimum buffer size is misconfigured", - ) - .with_context("service", Scheme::Oss)); - } let batch_max_operations = self .batch_max_operations .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS); @@ -368,7 +365,6 @@ impl Builder for OssBuilder { client, server_side_encryption, server_side_encryption_key_id, - write_min_size, batch_max_operations, }), }) @@ -385,10 +381,7 @@ pub struct OssBackend { impl Accessor for OssBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::TwoWaysWriter< - oio::MultipartUploadWriter<OssWriter>, - oio::AppendObjectWriter<OssWriter>, - >; + type Writer = oio::TwoWaysWriter<OssWriters, oio::AtLeastBufWriter<OssWriters>>; type BlockingWriter = (); type Pager = OssPager; type BlockingPager = (); @@ -480,18 +473,26 @@ impl Accessor for OssBackend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let writer = OssWriter::new(self.core.clone(), path, args.clone()); - let tw = if args.append() { + let w = if args.append() { + OssWriters::Three(oio::AppendObjectWriter::new(writer)) + } else if args.content_length().is_some() { + OssWriters::One(oio::OneShotWriter::new(writer)) + } else { + OssWriters::Two(oio::MultipartUploadWriter::new(writer)) + }; + + let w = if let Some(buffer_size) = args.buffer_size() { + let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); + let w = - oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size); + oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); oio::TwoWaysWriter::Two(w) } else { - let w = oio::MultipartUploadWriter::new(writer); - oio::TwoWaysWriter::One(w) }; - return Ok((RpWrite::default(), tw)); + Ok((RpWrite::default(), w)) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index 6c570fb17..a81db97fa 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -64,7 +64,6 @@ pub struct OssCore { pub client: HttpClient, pub loader: AliyunLoader, pub signer: AliyunOssSigner, - pub write_min_size: usize, pub batch_max_operations: usize, } diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index ff9c8f9d2..27aa09011 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -27,6 +27,12 @@ use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; +pub type OssWriters = oio::ThreeWaysWriter< + oio::OneShotWriter<OssWriter>, + oio::MultipartUploadWriter<OssWriter>, + oio::AppendObjectWriter<OssWriter>, +>; + pub struct OssWriter { core: Arc<OssCore>, diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 83ff8a7d0..700359009 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::cmp::max; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; @@ -43,6 +44,7 @@ use super::error::parse_s3_error_code; use super::pager::S3Pager; use super::writer::S3Writer; use crate::raw::*; +use crate::services::s3::writer::S3Writers; use crate::*; /// Allow constructing correct region endpoint if user gives a global endpoint. @@ -56,7 +58,10 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, &'static str>> = Lazy::new m }); -const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024; +/// The minimum multipart size of S3 is 5 MiB. +/// +/// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html> +const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024; const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000; /// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support. @@ -847,14 +852,6 @@ impl Builder for S3Builder { let signer = AwsV4Signer::new("s3", ®ion); - let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE); - if write_min_size < 5 * 1024 * 1024 { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "The write minimum buffer size is misconfigured", - ) - .with_context("service", Scheme::S3)); - } let batch_max_operations = self .batch_max_operations .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS); @@ -874,7 +871,6 @@ impl Builder for S3Builder { signer, loader, client, - write_min_size, batch_max_operations, }), }) @@ -891,7 +887,7 @@ pub struct S3Backend { impl Accessor for S3Backend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::MultipartUploadWriter<S3Writer>; + type Writer = oio::TwoWaysWriter<S3Writers, oio::AtLeastBufWriter<S3Writers>>; type BlockingWriter = (); type Pager = S3Pager; type BlockingPager = (); @@ -979,10 +975,26 @@ impl Accessor for S3Backend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok(( - RpWrite::default(), - S3Writer::new(self.core.clone(), path, args), - )) + let writer = S3Writer::new(self.core.clone(), path, args.clone()); + + let w = if args.content_length().is_some() { + S3Writers::One(oio::OneShotWriter::new(writer)) + } else { + S3Writers::Two(oio::MultipartUploadWriter::new(writer)) + }; + + let w = if let Some(buffer_size) = args.buffer_size() { + let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); + + let w = + oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()); + + oio::TwoWaysWriter::Two(w) + } else { + oio::TwoWaysWriter::One(w) + }; + + Ok((RpWrite::default(), w)) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 36321280f..89989bcb7 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -82,7 +82,6 @@ pub struct S3Core { pub signer: AwsV4Signer, pub loader: Box<dyn AwsCredentialLoad>, pub client: HttpClient, - pub write_min_size: usize, pub batch_max_operations: usize, } diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index c323dd526..a27341d71 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -27,6 +27,9 @@ use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; +pub type S3Writers = + oio::TwoWaysWriter<oio::OneShotWriter<S3Writer>, oio::MultipartUploadWriter<S3Writer>>; + pub struct S3Writer { core: Arc<S3Core>, @@ -35,14 +38,12 @@ pub struct S3Writer { } impl S3Writer { - pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> oio::MultipartUploadWriter<Self> { - let s3_writer = S3Writer { + pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self { + S3Writer { core, path: path.to_string(), op, - }; - - oio::MultipartUploadWriter::new(s3_writer) + } } } diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index d3676f1ce..10a9c061c 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -395,6 +395,19 @@ impl FutureWrite { self } + /// Set the buffer size of op. + /// + /// If buffer size is set, the data will be buffered by the underlying writer. + /// + /// ## NOTE + /// + /// Service could have their own minimum buffer size while perform write operations like + /// multipart uploads. So the buffer size may be larger than the given buffer size. + pub fn buffer_size(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs)); + self + } + /// Set the content length of op. /// /// If the content length is not set, the content length will be @@ -457,6 +470,19 @@ impl FutureWriter { self } + /// Set the buffer size of op. + /// + /// If buffer size is set, the data will be buffered by the underlying writer. + /// + /// ## NOTE + /// + /// Service could have their own minimum buffer size while perform write operations like + /// multipart uploads. So the buffer size may be larger than the given buffer size. + pub fn buffer_size(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|args| args.with_buffer_size(v)); + self + } + /// Set the content length of op. /// /// If the content length is not set, the content length will be diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index a1effab43..ae8825984 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -1233,7 +1233,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> Result<()> { let (content, size): (Vec<u8>, usize) = gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024); - let mut w = op.writer(&path).await?; + let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?; // Wrap a buf reader here to make sure content is read in 1MiB chunks. let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone())); @@ -1266,7 +1266,7 @@ pub async fn test_fuzz_unsized_writer(op: Operator) -> Result<()> { let mut fuzzer = ObjectWriterFuzzer::new(&path, None); - let mut w = op.writer(&path).await?; + let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?; for _ in 0..100 { match fuzzer.fuzz() {
