This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch write_can_multig in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 253a029a9074d67c79748de166ac3bd7a7632a89 Author: Xuanwo <[email protected]> AuthorDate: Wed Sep 13 08:15:52 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/fuzz/fuzz_writer.rs | 2 +- core/src/layers/complete.rs | 74 +++------------------------ core/src/raw/adapters/kv/backend.rs | 18 +------ core/src/raw/adapters/typed_kv/backend.rs | 7 +-- core/src/raw/ops.rs | 35 ++++--------- core/src/services/azdfs/backend.rs | 7 --- core/src/services/cos/backend.rs | 4 +- core/src/services/dropbox/backend.rs | 6 --- core/src/services/fs/backend.rs | 2 +- core/src/services/ftp/backend.rs | 9 +--- core/src/services/gcs/backend.rs | 4 +- core/src/services/gdrive/backend.rs | 9 +--- core/src/services/ghac/backend.rs | 9 +--- core/src/services/ipmfs/backend.rs | 9 +--- core/src/services/obs/backend.rs | 4 +- core/src/services/onedrive/backend.rs | 7 --- core/src/services/oss/backend.rs | 5 +- core/src/services/s3/backend.rs | 5 +- core/src/services/sftp/backend.rs | 3 +- core/src/services/supabase/backend.rs | 7 --- core/src/services/vercel_artifacts/backend.rs | 7 --- core/src/services/wasabi/backend.rs | 7 --- core/src/services/webdav/backend.rs | 7 --- core/src/services/webhdfs/backend.rs | 7 --- core/src/types/operator/blocking_operator.rs | 2 +- core/src/types/operator/operator.rs | 2 +- core/src/types/operator/operator_functions.rs | 28 ++-------- core/src/types/operator/operator_futures.rs | 39 ++------------ core/tests/behavior/write.rs | 18 +++---- 29 files changed, 60 insertions(+), 283 deletions(-) diff --git a/core/fuzz/fuzz_writer.rs b/core/fuzz/fuzz_writer.rs index 2f992b919..ff1eb0bdb 100644 --- a/core/fuzz/fuzz_writer.rs +++ b/core/fuzz/fuzz_writer.rs @@ -107,7 +107,7 @@ async fn fuzz_writer(op: Operator, input: FuzzInput) -> Result<()> { let checker = WriteChecker::new(total_size); - let mut writer = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?; + let mut writer = op.writer_with(&path).buffer(8 * 1024 * 1024).await?; for chunk in &checker.chunks { writer.write(chunk.clone()).await?; diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index cc767f90e..e0a796623 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -427,11 +427,10 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { return new_capability_unsupported_error(Operation::Write); } - let size = args.content_length(); self.inner .write(path, args) .await - .map(|(rp, w)| (rp, CompleteWriter::new(w, size))) + .map(|(rp, w)| (rp, CompleteWriter::new(w))) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { @@ -440,10 +439,9 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { return new_capability_unsupported_error(Operation::BlockingWrite); } - let size = args.content_length(); self.inner .blocking_write(path, args) - .map(|(rp, w)| (rp, CompleteWriter::new(w, size))) + .map(|(rp, w)| (rp, CompleteWriter::new(w))) } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { @@ -681,17 +679,11 @@ where pub struct CompleteWriter<W> { inner: Option<W>, - size: Option<u64>, - written: u64, } impl<W> CompleteWriter<W> { - pub fn new(inner: W, size: Option<u64>) -> CompleteWriter<W> { - CompleteWriter { - inner: Some(inner), - size, - written: 0, - } + pub fn new(inner: W) -> CompleteWriter<W> { + CompleteWriter { inner: Some(inner) } } } @@ -717,52 +709,27 @@ where Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; let n = ready!(w.poll_write(cx, bs))?; - self.written += n as u64; - - if let Some(size) = self.size { - if self.written > size { - return Poll::Ready(Err(Error::new( - ErrorKind::ContentTruncated, - &format!( - "writer got too much data, expect: {size}, actual: {}", - self.written + n as u64 - ), - ))); - } - } Poll::Ready(Ok(n)) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - ready!(w.poll_abort(cx))?; + ready!(w.poll_close(cx))?; self.inner = None; Poll::Ready(Ok(())) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - if let Some(size) = self.size { - if self.written < size { - return Poll::Ready(Err(Error::new( - ErrorKind::ContentIncomplete, - &format!( - "writer got too less data, expect: {size}, actual: {}", - self.written - ), - ))); - } - } - + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - ready!(w.poll_close(cx))?; + ready!(w.poll_abort(cx))?; self.inner = None; Poll::Ready(Ok(())) @@ -778,36 +745,11 @@ where Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; let n = w.write(bs)?; - self.written += n as u64; - - if let Some(size) = self.size { - if self.written > size { - return Err(Error::new( - ErrorKind::ContentTruncated, - &format!( - "writer got too much data, expect: {size}, actual: {}", - self.written + n as u64 - ), - )); - } - } Ok(n) } fn close(&mut self) -> Result<()> { - if let Some(size) = self.size { - if self.written < size { - return Err(Error::new( - ErrorKind::ContentIncomplete, - &format!( - "writer got too less data, expect: {size}, actual: {}", - self.written - ), - )); - } - } - let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index db82e1cd6..f7a40e698 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -143,27 +143,13 @@ impl<S: Adapter> Accessor for Backend<S> { Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs))) } - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { let p = build_abs_path(&self.root, path); Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p))) } - fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - + fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { let p = build_abs_path(&self.root, path); Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p))) diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index e04822cdd..6d515b01a 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -393,6 +393,8 @@ impl<S> KvWriter<S> { let value = self.buf.take().map(Bytes::from).unwrap_or_default(); let mut metadata = Metadata::new(EntryMode::FILE); + metadata.set_content_length(value.len() as u64); + if let Some(v) = self.op.cache_control() { metadata.set_cache_control(v); } @@ -402,11 +404,6 @@ impl<S> KvWriter<S> { if let Some(v) = self.op.content_type() { metadata.set_content_type(v); } - if let Some(v) = self.op.content_length() { - metadata.set_content_length(v); - } else { - metadata.set_content_length(value.len() as u64); - } Value { metadata, value } } diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index 0404ed468..1e60d329e 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -405,9 +405,8 @@ impl OpStat { #[derive(Debug, Clone, Default)] pub struct OpWrite { append: bool, + buffer: Option<usize>, - buffer_size: Option<usize>, - content_length: Option<u64>, content_type: Option<String>, content_disposition: Option<String>, cache_control: Option<String>, @@ -440,39 +439,23 @@ impl OpWrite { self } - /// Get the buffer size from op. + /// Get the buffer from op. /// - /// The buffer size is used by service to decide the buffer size of the underlying writer. - pub fn buffer_size(&self) -> Option<usize> { - self.buffer_size + /// The buffer is used by service to decide the buffer size of the underlying writer. + pub fn buffer(&self) -> Option<usize> { + self.buffer } - /// Set the buffer size of op. + /// Set the buffer of op. /// - /// If buffer size is set, the data will be buffered by the underlying writer. + /// If buffer is set, the data will be buffered by the underlying writer. /// /// ## NOTE /// /// Service could have their own minimum buffer size while perform write operations like /// multipart uploads. So the buffer size may be larger than the given buffer size. - pub fn with_buffer_size(mut self, buffer_size: usize) -> Self { - self.buffer_size = Some(buffer_size); - self - } - - /// Get the content length from op. - /// - /// The content length is the total length of the data to be written. - pub fn content_length(&self) -> Option<u64> { - self.content_length - } - - /// Set the content length of op. - /// - /// If the content length is not set, the content length will be - /// calculated automatically by buffering part of data. - pub fn with_content_length(mut self, content_length: u64) -> Self { - self.content_length = Some(content_length); + pub fn with_buffer(mut self, buffer: usize) -> Self { + self.buffer = Some(buffer); self } diff --git a/core/src/services/azdfs/backend.rs b/core/src/services/azdfs/backend.rs index 1c0dd3a8c..c729ab754 100644 --- a/core/src/services/azdfs/backend.rs +++ b/core/src/services/azdfs/backend.rs @@ -296,13 +296,6 @@ impl Accessor for AzdfsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - Ok(( RpWrite::default(), oio::OneShotWriter::new(AzdfsWriter::new(self.core.clone(), args, path.to_string())), diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index eced1ccc9..897c4c10a 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -272,10 +272,10 @@ impl Accessor for CosBackend { write: true, write_can_append: true, + write_can_multi: true, write_with_content_type: true, write_with_cache_control: true, write_with_content_disposition: true, - write_without_content_length: true, delete: true, create_dir: true, @@ -342,7 +342,7 @@ impl Accessor for CosBackend { CosWriters::One(oio::MultipartUploadWriter::new(writer)) }; - let w = if let Some(buffer_size) = args.buffer_size() { + let w = if let Some(buffer_size) = args.buffer() { let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); let w = oio::ExactBufWriter::new(w, buffer_size); diff --git a/core/src/services/dropbox/backend.rs b/core/src/services/dropbox/backend.rs index 029f1fe7f..8400b3300 100644 --- a/core/src/services/dropbox/backend.rs +++ b/core/src/services/dropbox/backend.rs @@ -106,12 +106,6 @@ impl Accessor for DropboxBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } Ok(( RpWrite::default(), oio::OneShotWriter::new(DropboxWriter::new( diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 950cfd76e..b1f1fdf49 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -263,7 +263,7 @@ impl Accessor for FsBackend { write: true, write_can_append: true, - write_without_content_length: true, + write_can_multi: true, create_dir: true, delete: true, diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index ed960c5f7..5e33d8e36 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -351,14 +351,7 @@ impl Accessor for FtpBackend { Ok((RpRead::new(size), FtpReader::new(r, ftp_stream))) } - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { // Ensure the parent dir exists. let parent = get_parent(path); let paths: Vec<&str> = parent.split('/').collect(); diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index 9e26cdce1..20e7a1f5e 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -362,8 +362,8 @@ impl Accessor for GcsBackend { read_with_if_none_match: true, write: true, + write_can_multi: true, write_with_content_type: true, - write_without_content_length: true, delete: true, copy: true, @@ -423,7 +423,7 @@ impl Accessor for GcsBackend { let w = GcsWriter::new(self.core.clone(), path, args.clone()); let w = oio::RangeWriter::new(w); - let w = if let Some(buffer_size) = args.buffer_size() { + let w = if let Some(buffer_size) = args.buffer() { // FIXME: we should align with 256KiB instead. let buffer_size = max(DEFAULT_WRITE_FIXED_SIZE, buffer_size); diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index ea67ed200..66c181d2b 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -166,14 +166,7 @@ impl Accessor for GdriveBackend { } } - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { // As Google Drive allows files have the same name, we need to check if the file exists. // If the file exists, we will keep its ID and update it. let mut file_id: Option<String> = None; diff --git a/core/src/services/ghac/backend.rs b/core/src/services/ghac/backend.rs index 8adce69a3..e78fdb405 100644 --- a/core/src/services/ghac/backend.rs +++ b/core/src/services/ghac/backend.rs @@ -404,14 +404,7 @@ impl Accessor for GhacBackend { } } - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { let req = self.ghac_reserve(path).await?; let resp = self.client.send(req).await?; diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index eda6d8895..ac70369c9 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -121,14 +121,7 @@ impl Accessor for IpmfsBackend { } } - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { Ok(( RpWrite::default(), oio::OneShotWriter::new(IpmfsWriter::new(self.clone(), path.to_string())), diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 9cca757a8..7caa6d749 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -279,9 +279,9 @@ impl Accessor for ObsBackend { write: true, write_can_append: true, + write_can_multi: true, write_with_content_type: true, write_with_cache_control: true, - write_without_content_length: true, delete: true, create_dir: true, @@ -380,7 +380,7 @@ impl Accessor for ObsBackend { ObsWriters::One(oio::MultipartUploadWriter::new(writer)) }; - let w = if let Some(buffer_size) = args.buffer_size() { + let w = if let Some(buffer_size) = args.buffer() { let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); let w = oio::ExactBufWriter::new(w, buffer_size); diff --git a/core/src/services/onedrive/backend.rs b/core/src/services/onedrive/backend.rs index 2b32abbaf..0001d8685 100644 --- a/core/src/services/onedrive/backend.rs +++ b/core/src/services/onedrive/backend.rs @@ -103,13 +103,6 @@ impl Accessor for OnedriveBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - let path = build_rooted_abs_path(&self.root, path); Ok(( diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 6d027ba5b..9022079bc 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -404,10 +404,11 @@ impl Accessor for OssBackend { write: true, write_can_append: true, + write_can_multi: true, write_with_cache_control: true, write_with_content_type: true, write_with_content_disposition: true, - write_without_content_length: true, + delete: true, create_dir: true, copy: true, @@ -478,7 +479,7 @@ impl Accessor for OssBackend { OssWriters::One(oio::MultipartUploadWriter::new(writer)) }; - let w = if let Some(buffer_size) = args.buffer_size() { + let w = if let Some(buffer_size) = args.buffer() { let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); let w = oio::ExactBufWriter::new(w, buffer_size); diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index aa2b95eb8..89a8330b8 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -913,9 +913,10 @@ impl Accessor for S3Backend { read_with_override_content_type: true, write: true, + write_can_multi: true, write_with_cache_control: true, write_with_content_type: true, - write_without_content_length: true, + create_dir: true, delete: true, copy: true, @@ -979,7 +980,7 @@ impl Accessor for S3Backend { let w = oio::MultipartUploadWriter::new(writer); - let w = if let Some(buffer_size) = args.buffer_size() { + let w = if let Some(buffer_size) = args.buffer() { let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size); oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size)) diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 931edcb23..0f9b7f3bd 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -243,7 +243,8 @@ impl Accessor for SftpBackend { read_can_seek: true, write: true, - write_without_content_length: true, + write_can_multi: true, + create_dir: true, delete: true, diff --git a/core/src/services/supabase/backend.rs b/core/src/services/supabase/backend.rs index 402c3abe7..a5d0d3db1 100644 --- a/core/src/services/supabase/backend.rs +++ b/core/src/services/supabase/backend.rs @@ -224,13 +224,6 @@ impl Accessor for SupabaseBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - Ok(( RpWrite::default(), oio::OneShotWriter::new(SupabaseWriter::new(self.core.clone(), path, args)), diff --git a/core/src/services/vercel_artifacts/backend.rs b/core/src/services/vercel_artifacts/backend.rs index 4b88be664..9a0ae95cf 100644 --- a/core/src/services/vercel_artifacts/backend.rs +++ b/core/src/services/vercel_artifacts/backend.rs @@ -84,13 +84,6 @@ impl Accessor for VercelArtifactsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - Ok(( RpWrite::default(), oio::OneShotWriter::new(VercelArtifactsWriter::new( diff --git a/core/src/services/wasabi/backend.rs b/core/src/services/wasabi/backend.rs index 835492bad..dcb91f2ea 100644 --- a/core/src/services/wasabi/backend.rs +++ b/core/src/services/wasabi/backend.rs @@ -750,13 +750,6 @@ impl Accessor for WasabiBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - Ok(( RpWrite::default(), oio::OneShotWriter::new(WasabiWriter::new(self.core.clone(), args, path.to_string())), diff --git a/core/src/services/webdav/backend.rs b/core/src/services/webdav/backend.rs index cca8728ba..1f9aa1019 100644 --- a/core/src/services/webdav/backend.rs +++ b/core/src/services/webdav/backend.rs @@ -275,13 +275,6 @@ impl Accessor for WebdavBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - self.ensure_parent_path(path).await?; let p = build_abs_path(&self.root, path); diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index e8a3a8f04..b1bfed8ae 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -474,13 +474,6 @@ impl Accessor for WebhdfsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - Ok(( RpWrite::default(), oio::OneShotWriter::new(WebhdfsWriter::new(self.clone(), args, path.to_string())), diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 3468f8b6b..22a6208f1 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -550,7 +550,7 @@ impl BlockingOperator { FunctionWrite(OperatorFunction::new( self.inner().clone(), path, - (OpWrite::default().with_content_length(bs.len() as u64), bs), + (OpWrite::default(), bs), |inner, path, (args, mut bs)| { if !validate_path(&path, EntryMode::FILE) { return Err( diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 771ffd677..8c8539bfd 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -717,7 +717,7 @@ impl Operator { let fut = FutureWrite(OperatorFuture::new( self.inner().clone(), path, - (OpWrite::default().with_content_length(bs.len() as u64), bs), + (OpWrite::default(), bs), |inner, path, (args, mut bs)| { let fut = async move { if !validate_path(&path, EntryMode::FILE) { diff --git a/core/src/types/operator/operator_functions.rs b/core/src/types/operator/operator_functions.rs index 49a0e7e85..367c5b93f 100644 --- a/core/src/types/operator/operator_functions.rs +++ b/core/src/types/operator/operator_functions.rs @@ -96,19 +96,8 @@ impl FunctionWrite { /// /// Service could have their own minimum buffer size while perform write operations like /// multipart uploads. So the buffer size may be larger than the given buffer size. - pub fn buffer_size(mut self, v: usize) -> Self { - self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs)); - self - } - - /// Set the content length of op. - /// - /// If the content length is not set, the content length will be - /// calculated automatically by buffering part of data. - pub fn content_length(mut self, v: u64) -> Self { - self.0 = self - .0 - .map_args(|(args, bs)| (args.with_content_length(v), bs)); + pub fn buffer(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|(args, bs)| (args.with_buffer(v), bs)); self } @@ -173,17 +162,8 @@ impl FunctionWriter { /// /// Service could have their own minimum buffer size while perform write operations like /// multipart uploads. So the buffer size may be larger than the given buffer size. - pub fn buffer_size(mut self, v: usize) -> Self { - self.0 = self.0.map_args(|args| args.with_buffer_size(v)); - self - } - - /// Set the content length of op. - /// - /// If the content length is not set, the content length will be - /// calculated automatically by buffering part of data. - pub fn content_length(mut self, v: u64) -> Self { - self.0 = self.0.map_args(|args| args.with_content_length(v)); + pub fn buffer(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|args| args.with_buffer(v)); self } diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index 7f2df677a..74c7eb159 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -213,17 +213,6 @@ impl Future for FuturePresignRead { pub struct FuturePresignWrite(pub(crate) OperatorFuture<(OpWrite, Duration), PresignedRequest>); impl FuturePresignWrite { - /// Set the content length of op. - /// - /// If the content length is not set, the content length will be - /// calculated automatically by buffering part of data. - pub fn content_length(mut self, v: u64) -> Self { - self.0 = self - .0 - .map_args(|(args, dur)| (args.with_content_length(v), dur)); - self - } - /// Set the content type of option pub fn content_type(mut self, v: &str) -> Self { self.0 = self @@ -403,19 +392,8 @@ impl FutureWrite { /// /// Service could have their own minimum buffer size while perform write operations like /// multipart uploads. So the buffer size may be larger than the given buffer size. - pub fn buffer_size(mut self, v: usize) -> Self { - self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs)); - self - } - - /// Set the content length of op. - /// - /// If the content length is not set, the content length will be - /// calculated automatically by buffering part of data. - pub fn content_length(mut self, v: u64) -> Self { - self.0 = self - .0 - .map_args(|(args, bs)| (args.with_content_length(v), bs)); + pub fn buffer(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|(args, bs)| (args.with_buffer(v), bs)); self } @@ -478,17 +456,8 @@ impl FutureWriter { /// /// Service could have their own minimum buffer size while perform write operations like /// multipart uploads. So the buffer size may be larger than the given buffer size. - pub fn buffer_size(mut self, v: usize) -> Self { - self.0 = self.0.map_args(|args| args.with_buffer_size(v)); - self - } - - /// Set the content length of op. - /// - /// If the content length is not set, the content length will be - /// calculated automatically by buffering part of data. - pub fn content_length(mut self, v: u64) -> Self { - self.0 = self.0.map_args(|args| args.with_content_length(v)); + pub fn buffer(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|args| args.with_buffer(v)); self } diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index 60125243a..f20cec326 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -1111,7 +1111,7 @@ pub async fn test_delete_stream(op: Operator) -> Result<()> { /// Append data into writer pub async fn test_writer_write(op: Operator) -> Result<()> { - if !(op.info().full_capability().write_without_content_length) { + if !(op.info().full_capability().write_can_multi) { return Ok(()); } @@ -1148,7 +1148,7 @@ pub async fn test_writer_write(op: Operator) -> Result<()> { /// Streaming data into writer pub async fn test_writer_sink(op: Operator) -> Result<()> { let cap = op.info().full_capability(); - if !(cap.write && cap.write_without_content_length) { + if !(cap.write && cap.write_can_multi) { return Ok(()); } @@ -1158,7 +1158,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> { let content_b = gen_fixed_bytes(size); let stream = stream::iter(vec![content_a.clone(), content_b.clone()]).map(Ok); - let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?; + let mut w = op.writer_with(&path).buffer(5 * 1024 * 1024).await?; w.sink(stream).await?; w.close().await?; @@ -1185,7 +1185,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> { /// Reading data into writer pub async fn test_writer_copy(op: Operator) -> Result<()> { let cap = op.info().full_capability(); - if !(cap.write && cap.write_without_content_length) { + if !(cap.write && cap.write_can_multi) { return Ok(()); } @@ -1194,7 +1194,7 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> { let content_a = gen_fixed_bytes(size); let content_b = gen_fixed_bytes(size); - let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?; + let mut w = op.writer_with(&path).buffer(5 * 1024 * 1024).await?; let mut content = Bytes::from([content_a.clone(), content_b.clone()].concat()); while !content.is_empty() { @@ -1226,7 +1226,7 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> { /// Copy data from reader to writer pub async fn test_writer_futures_copy(op: Operator) -> Result<()> { - if !(op.info().full_capability().write_without_content_length) { + if !(op.info().full_capability().write_can_multi) { return Ok(()); } @@ -1234,7 +1234,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> Result<()> { let (content, size): (Vec<u8>, usize) = gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024); - let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?; + let mut w = op.writer_with(&path).buffer(8 * 1024 * 1024).await?; // Wrap a buf reader here to make sure content is read in 1MiB chunks. let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone())); @@ -1258,7 +1258,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> Result<()> { /// Add test for unsized writer pub async fn test_fuzz_unsized_writer(op: Operator) -> Result<()> { - if !op.info().full_capability().write_without_content_length { + if !op.info().full_capability().write_can_multi { warn!("{op:?} doesn't support write without content length, test skip"); return Ok(()); } @@ -1267,7 +1267,7 @@ pub async fn test_fuzz_unsized_writer(op: Operator) -> Result<()> { let mut fuzzer = ObjectWriterFuzzer::new(&path, None); - let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?; + let mut w = op.writer_with(&path).buffer(8 * 1024 * 1024).await?; for _ in 0..100 { match fuzzer.fuzz() {
