This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch write_can_multig
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 253a029a9074d67c79748de166ac3bd7a7632a89
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 13 08:15:52 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/fuzz/fuzz_writer.rs                      |  2 +-
 core/src/layers/complete.rs                   | 74 +++------------------------
 core/src/raw/adapters/kv/backend.rs           | 18 +------
 core/src/raw/adapters/typed_kv/backend.rs     |  7 +--
 core/src/raw/ops.rs                           | 35 ++++---------
 core/src/services/azdfs/backend.rs            |  7 ---
 core/src/services/cos/backend.rs              |  4 +-
 core/src/services/dropbox/backend.rs          |  6 ---
 core/src/services/fs/backend.rs               |  2 +-
 core/src/services/ftp/backend.rs              |  9 +---
 core/src/services/gcs/backend.rs              |  4 +-
 core/src/services/gdrive/backend.rs           |  9 +---
 core/src/services/ghac/backend.rs             |  9 +---
 core/src/services/ipmfs/backend.rs            |  9 +---
 core/src/services/obs/backend.rs              |  4 +-
 core/src/services/onedrive/backend.rs         |  7 ---
 core/src/services/oss/backend.rs              |  5 +-
 core/src/services/s3/backend.rs               |  5 +-
 core/src/services/sftp/backend.rs             |  3 +-
 core/src/services/supabase/backend.rs         |  7 ---
 core/src/services/vercel_artifacts/backend.rs |  7 ---
 core/src/services/wasabi/backend.rs           |  7 ---
 core/src/services/webdav/backend.rs           |  7 ---
 core/src/services/webhdfs/backend.rs          |  7 ---
 core/src/types/operator/blocking_operator.rs  |  2 +-
 core/src/types/operator/operator.rs           |  2 +-
 core/src/types/operator/operator_functions.rs | 28 ++--------
 core/src/types/operator/operator_futures.rs   | 39 ++------------
 core/tests/behavior/write.rs                  | 18 +++----
 29 files changed, 60 insertions(+), 283 deletions(-)

diff --git a/core/fuzz/fuzz_writer.rs b/core/fuzz/fuzz_writer.rs
index 2f992b919..ff1eb0bdb 100644
--- a/core/fuzz/fuzz_writer.rs
+++ b/core/fuzz/fuzz_writer.rs
@@ -107,7 +107,7 @@ async fn fuzz_writer(op: Operator, input: FuzzInput) -> 
Result<()> {
 
     let checker = WriteChecker::new(total_size);
 
-    let mut writer = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
+    let mut writer = op.writer_with(&path).buffer(8 * 1024 * 1024).await?;
 
     for chunk in &checker.chunks {
         writer.write(chunk.clone()).await?;
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index cc767f90e..e0a796623 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -427,11 +427,10 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
             return new_capability_unsupported_error(Operation::Write);
         }
 
-        let size = args.content_length();
         self.inner
             .write(path, args)
             .await
-            .map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
+            .map(|(rp, w)| (rp, CompleteWriter::new(w)))
     }
 
     fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
@@ -440,10 +439,9 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
             return new_capability_unsupported_error(Operation::BlockingWrite);
         }
 
-        let size = args.content_length();
         self.inner
             .blocking_write(path, args)
-            .map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
+            .map(|(rp, w)| (rp, CompleteWriter::new(w)))
     }
 
     async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
@@ -681,17 +679,11 @@ where
 
 pub struct CompleteWriter<W> {
     inner: Option<W>,
-    size: Option<u64>,
-    written: u64,
 }
 
 impl<W> CompleteWriter<W> {
-    pub fn new(inner: W, size: Option<u64>) -> CompleteWriter<W> {
-        CompleteWriter {
-            inner: Some(inner),
-            size,
-            written: 0,
-        }
+    pub fn new(inner: W) -> CompleteWriter<W> {
+        CompleteWriter { inner: Some(inner) }
     }
 }
 
@@ -717,52 +709,27 @@ where
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
         let n = ready!(w.poll_write(cx, bs))?;
-        self.written += n as u64;
-
-        if let Some(size) = self.size {
-            if self.written > size {
-                return Poll::Ready(Err(Error::new(
-                    ErrorKind::ContentTruncated,
-                    &format!(
-                        "writer got too much data, expect: {size}, actual: {}",
-                        self.written + n as u64
-                    ),
-                )));
-            }
-        }
 
         Poll::Ready(Ok(n))
     }
 
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
 
-        ready!(w.poll_abort(cx))?;
+        ready!(w.poll_close(cx))?;
         self.inner = None;
 
         Poll::Ready(Ok(()))
     }
 
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        if let Some(size) = self.size {
-            if self.written < size {
-                return Poll::Ready(Err(Error::new(
-                    ErrorKind::ContentIncomplete,
-                    &format!(
-                        "writer got too less data, expect: {size}, actual: {}",
-                        self.written
-                    ),
-                )));
-            }
-        }
-
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
 
-        ready!(w.poll_close(cx))?;
+        ready!(w.poll_abort(cx))?;
         self.inner = None;
 
         Poll::Ready(Ok(()))
@@ -778,36 +745,11 @@ where
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
         let n = w.write(bs)?;
-        self.written += n as u64;
-
-        if let Some(size) = self.size {
-            if self.written > size {
-                return Err(Error::new(
-                    ErrorKind::ContentTruncated,
-                    &format!(
-                        "writer got too much data, expect: {size}, actual: {}",
-                        self.written + n as u64
-                    ),
-                ));
-            }
-        }
 
         Ok(n)
     }
 
     fn close(&mut self) -> Result<()> {
-        if let Some(size) = self.size {
-            if self.written < size {
-                return Err(Error::new(
-                    ErrorKind::ContentIncomplete,
-                    &format!(
-                        "writer got too less data, expect: {size}, actual: {}",
-                        self.written
-                    ),
-                ));
-            }
-        }
-
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index db82e1cd6..f7a40e698 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -143,27 +143,13 @@ impl<S: Adapter> Accessor for Backend<S> {
         Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs)))
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let p = build_abs_path(&self.root, path);
 
         Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
     }
 
-    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
         let p = build_abs_path(&self.root, path);
 
         Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index e04822cdd..6d515b01a 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -393,6 +393,8 @@ impl<S> KvWriter<S> {
         let value = self.buf.take().map(Bytes::from).unwrap_or_default();
 
         let mut metadata = Metadata::new(EntryMode::FILE);
+        metadata.set_content_length(value.len() as u64);
+
         if let Some(v) = self.op.cache_control() {
             metadata.set_cache_control(v);
         }
@@ -402,11 +404,6 @@ impl<S> KvWriter<S> {
         if let Some(v) = self.op.content_type() {
             metadata.set_content_type(v);
         }
-        if let Some(v) = self.op.content_length() {
-            metadata.set_content_length(v);
-        } else {
-            metadata.set_content_length(value.len() as u64);
-        }
 
         Value { metadata, value }
     }
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index 0404ed468..1e60d329e 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -405,9 +405,8 @@ impl OpStat {
 #[derive(Debug, Clone, Default)]
 pub struct OpWrite {
     append: bool,
+    buffer: Option<usize>,
 
-    buffer_size: Option<usize>,
-    content_length: Option<u64>,
     content_type: Option<String>,
     content_disposition: Option<String>,
     cache_control: Option<String>,
@@ -440,39 +439,23 @@ impl OpWrite {
         self
     }
 
-    /// Get the buffer size from op.
+    /// Get the buffer from op.
     ///
-    /// The buffer size is used by service to decide the buffer size of the 
underlying writer.
-    pub fn buffer_size(&self) -> Option<usize> {
-        self.buffer_size
+    /// The buffer is used by service to decide the buffer size of the 
underlying writer.
+    pub fn buffer(&self) -> Option<usize> {
+        self.buffer
     }
 
-    /// Set the buffer size of op.
+    /// Set the buffer of op.
     ///
-    /// If buffer size is set, the data will be buffered by the underlying 
writer.
+    /// If buffer is set, the data will be buffered by the underlying writer.
     ///
     /// ## NOTE
     ///
     /// Service could have their own minimum buffer size while perform write 
operations like
     /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
-    pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
-        self.buffer_size = Some(buffer_size);
-        self
-    }
-
-    /// Get the content length from op.
-    ///
-    /// The content length is the total length of the data to be written.
-    pub fn content_length(&self) -> Option<u64> {
-        self.content_length
-    }
-
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn with_content_length(mut self, content_length: u64) -> Self {
-        self.content_length = Some(content_length);
+    pub fn with_buffer(mut self, buffer: usize) -> Self {
+        self.buffer = Some(buffer);
         self
     }
 
diff --git a/core/src/services/azdfs/backend.rs 
b/core/src/services/azdfs/backend.rs
index 1c0dd3a8c..c729ab754 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -296,13 +296,6 @@ impl Accessor for AzdfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(AzdfsWriter::new(self.core.clone(), args, 
path.to_string())),
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index eced1ccc9..897c4c10a 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -272,10 +272,10 @@ impl Accessor for CosBackend {
 
                 write: true,
                 write_can_append: true,
+                write_can_multi: true,
                 write_with_content_type: true,
                 write_with_cache_control: true,
                 write_with_content_disposition: true,
-                write_without_content_length: true,
 
                 delete: true,
                 create_dir: true,
@@ -342,7 +342,7 @@ impl Accessor for CosBackend {
             CosWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
+        let w = if let Some(buffer_size) = args.buffer() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
             let w = oio::ExactBufWriter::new(w, buffer_size);
diff --git a/core/src/services/dropbox/backend.rs 
b/core/src/services/dropbox/backend.rs
index 029f1fe7f..8400b3300 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -106,12 +106,6 @@ impl Accessor for DropboxBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(DropboxWriter::new(
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 950cfd76e..b1f1fdf49 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -263,7 +263,7 @@ impl Accessor for FsBackend {
 
                 write: true,
                 write_can_append: true,
-                write_without_content_length: true,
+                write_can_multi: true,
                 create_dir: true,
                 delete: true,
 
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index ed960c5f7..5e33d8e36 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -351,14 +351,7 @@ impl Accessor for FtpBackend {
         Ok((RpRead::new(size), FtpReader::new(r, ftp_stream)))
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         // Ensure the parent dir exists.
         let parent = get_parent(path);
         let paths: Vec<&str> = parent.split('/').collect();
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 9e26cdce1..20e7a1f5e 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -362,8 +362,8 @@ impl Accessor for GcsBackend {
                 read_with_if_none_match: true,
 
                 write: true,
+                write_can_multi: true,
                 write_with_content_type: true,
-                write_without_content_length: true,
                 delete: true,
                 copy: true,
 
@@ -423,7 +423,7 @@ impl Accessor for GcsBackend {
         let w = GcsWriter::new(self.core.clone(), path, args.clone());
         let w = oio::RangeWriter::new(w);
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
+        let w = if let Some(buffer_size) = args.buffer() {
             // FIXME: we should align with 256KiB instead.
             let buffer_size = max(DEFAULT_WRITE_FIXED_SIZE, buffer_size);
 
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index ea67ed200..66c181d2b 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -166,14 +166,7 @@ impl Accessor for GdriveBackend {
         }
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         // As Google Drive allows files have the same name, we need to check 
if the file exists.
         // If the file exists, we will keep its ID and update it.
         let mut file_id: Option<String> = None;
diff --git a/core/src/services/ghac/backend.rs 
b/core/src/services/ghac/backend.rs
index 8adce69a3..e78fdb405 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -404,14 +404,7 @@ impl Accessor for GhacBackend {
         }
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let req = self.ghac_reserve(path).await?;
 
         let resp = self.client.send(req).await?;
diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index eda6d8895..ac70369c9 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -121,14 +121,7 @@ impl Accessor for IpmfsBackend {
         }
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(IpmfsWriter::new(self.clone(), 
path.to_string())),
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 9cca757a8..7caa6d749 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -279,9 +279,9 @@ impl Accessor for ObsBackend {
 
                 write: true,
                 write_can_append: true,
+                write_can_multi: true,
                 write_with_content_type: true,
                 write_with_cache_control: true,
-                write_without_content_length: true,
 
                 delete: true,
                 create_dir: true,
@@ -380,7 +380,7 @@ impl Accessor for ObsBackend {
             ObsWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
+        let w = if let Some(buffer_size) = args.buffer() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
             let w = oio::ExactBufWriter::new(w, buffer_size);
diff --git a/core/src/services/onedrive/backend.rs 
b/core/src/services/onedrive/backend.rs
index 2b32abbaf..0001d8685 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -103,13 +103,6 @@ impl Accessor for OnedriveBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         let path = build_rooted_abs_path(&self.root, path);
 
         Ok((
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 6d027ba5b..9022079bc 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -404,10 +404,11 @@ impl Accessor for OssBackend {
 
                 write: true,
                 write_can_append: true,
+                write_can_multi: true,
                 write_with_cache_control: true,
                 write_with_content_type: true,
                 write_with_content_disposition: true,
-                write_without_content_length: true,
+
                 delete: true,
                 create_dir: true,
                 copy: true,
@@ -478,7 +479,7 @@ impl Accessor for OssBackend {
             OssWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
+        let w = if let Some(buffer_size) = args.buffer() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
             let w = oio::ExactBufWriter::new(w, buffer_size);
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index aa2b95eb8..89a8330b8 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -913,9 +913,10 @@ impl Accessor for S3Backend {
                 read_with_override_content_type: true,
 
                 write: true,
+                write_can_multi: true,
                 write_with_cache_control: true,
                 write_with_content_type: true,
-                write_without_content_length: true,
+
                 create_dir: true,
                 delete: true,
                 copy: true,
@@ -979,7 +980,7 @@ impl Accessor for S3Backend {
 
         let w = oio::MultipartUploadWriter::new(writer);
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
+        let w = if let Some(buffer_size) = args.buffer() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
             oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
diff --git a/core/src/services/sftp/backend.rs 
b/core/src/services/sftp/backend.rs
index 931edcb23..0f9b7f3bd 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -243,7 +243,8 @@ impl Accessor for SftpBackend {
                 read_can_seek: true,
 
                 write: true,
-                write_without_content_length: true,
+                write_can_multi: true,
+
                 create_dir: true,
                 delete: true,
 
diff --git a/core/src/services/supabase/backend.rs 
b/core/src/services/supabase/backend.rs
index 402c3abe7..a5d0d3db1 100644
--- a/core/src/services/supabase/backend.rs
+++ b/core/src/services/supabase/backend.rs
@@ -224,13 +224,6 @@ impl Accessor for SupabaseBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(SupabaseWriter::new(self.core.clone(), 
path, args)),
diff --git a/core/src/services/vercel_artifacts/backend.rs 
b/core/src/services/vercel_artifacts/backend.rs
index 4b88be664..9a0ae95cf 100644
--- a/core/src/services/vercel_artifacts/backend.rs
+++ b/core/src/services/vercel_artifacts/backend.rs
@@ -84,13 +84,6 @@ impl Accessor for VercelArtifactsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(VercelArtifactsWriter::new(
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
index 835492bad..dcb91f2ea 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -750,13 +750,6 @@ impl Accessor for WasabiBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(WasabiWriter::new(self.core.clone(), args, 
path.to_string())),
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index cca8728ba..1f9aa1019 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -275,13 +275,6 @@ impl Accessor for WebdavBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         self.ensure_parent_path(path).await?;
 
         let p = build_abs_path(&self.root, path);
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index e8a3a8f04..b1bfed8ae 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -474,13 +474,6 @@ impl Accessor for WebhdfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(WebhdfsWriter::new(self.clone(), args, 
path.to_string())),
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index 3468f8b6b..22a6208f1 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -550,7 +550,7 @@ impl BlockingOperator {
         FunctionWrite(OperatorFunction::new(
             self.inner().clone(),
             path,
-            (OpWrite::default().with_content_length(bs.len() as u64), bs),
+            (OpWrite::default(), bs),
             |inner, path, (args, mut bs)| {
                 if !validate_path(&path, EntryMode::FILE) {
                     return Err(
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index 771ffd677..8c8539bfd 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -717,7 +717,7 @@ impl Operator {
         let fut = FutureWrite(OperatorFuture::new(
             self.inner().clone(),
             path,
-            (OpWrite::default().with_content_length(bs.len() as u64), bs),
+            (OpWrite::default(), bs),
             |inner, path, (args, mut bs)| {
                 let fut = async move {
                     if !validate_path(&path, EntryMode::FILE) {
diff --git a/core/src/types/operator/operator_functions.rs 
b/core/src/types/operator/operator_functions.rs
index 49a0e7e85..367c5b93f 100644
--- a/core/src/types/operator/operator_functions.rs
+++ b/core/src/types/operator/operator_functions.rs
@@ -96,19 +96,8 @@ impl FunctionWrite {
     ///
     /// Service could have their own minimum buffer size while perform write 
operations like
     /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
-    pub fn buffer_size(mut self, v: usize) -> Self {
-        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
-        self
-    }
-
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn content_length(mut self, v: u64) -> Self {
-        self.0 = self
-            .0
-            .map_args(|(args, bs)| (args.with_content_length(v), bs));
+    pub fn buffer(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer(v), bs));
         self
     }
 
@@ -173,17 +162,8 @@ impl FunctionWriter {
     ///
     /// Service could have their own minimum buffer size while perform write 
operations like
     /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
-    pub fn buffer_size(mut self, v: usize) -> Self {
-        self.0 = self.0.map_args(|args| args.with_buffer_size(v));
-        self
-    }
-
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn content_length(mut self, v: u64) -> Self {
-        self.0 = self.0.map_args(|args| args.with_content_length(v));
+    pub fn buffer(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|args| args.with_buffer(v));
         self
     }
 
diff --git a/core/src/types/operator/operator_futures.rs 
b/core/src/types/operator/operator_futures.rs
index 7f2df677a..74c7eb159 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -213,17 +213,6 @@ impl Future for FuturePresignRead {
 pub struct FuturePresignWrite(pub(crate) OperatorFuture<(OpWrite, Duration), 
PresignedRequest>);
 
 impl FuturePresignWrite {
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn content_length(mut self, v: u64) -> Self {
-        self.0 = self
-            .0
-            .map_args(|(args, dur)| (args.with_content_length(v), dur));
-        self
-    }
-
     /// Set the content type of option
     pub fn content_type(mut self, v: &str) -> Self {
         self.0 = self
@@ -403,19 +392,8 @@ impl FutureWrite {
     ///
     /// Service could have their own minimum buffer size while perform write 
operations like
     /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
-    pub fn buffer_size(mut self, v: usize) -> Self {
-        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
-        self
-    }
-
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn content_length(mut self, v: u64) -> Self {
-        self.0 = self
-            .0
-            .map_args(|(args, bs)| (args.with_content_length(v), bs));
+    pub fn buffer(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer(v), bs));
         self
     }
 
@@ -478,17 +456,8 @@ impl FutureWriter {
     ///
     /// Service could have their own minimum buffer size while perform write 
operations like
     /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
-    pub fn buffer_size(mut self, v: usize) -> Self {
-        self.0 = self.0.map_args(|args| args.with_buffer_size(v));
-        self
-    }
-
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn content_length(mut self, v: u64) -> Self {
-        self.0 = self.0.map_args(|args| args.with_content_length(v));
+    pub fn buffer(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|args| args.with_buffer(v));
         self
     }
 
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 60125243a..f20cec326 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -1111,7 +1111,7 @@ pub async fn test_delete_stream(op: Operator) -> 
Result<()> {
 
 /// Append data into writer
 pub async fn test_writer_write(op: Operator) -> Result<()> {
-    if !(op.info().full_capability().write_without_content_length) {
+    if !(op.info().full_capability().write_can_multi) {
         return Ok(());
     }
 
@@ -1148,7 +1148,7 @@ pub async fn test_writer_write(op: Operator) -> 
Result<()> {
 /// Streaming data into writer
 pub async fn test_writer_sink(op: Operator) -> Result<()> {
     let cap = op.info().full_capability();
-    if !(cap.write && cap.write_without_content_length) {
+    if !(cap.write && cap.write_can_multi) {
         return Ok(());
     }
 
@@ -1158,7 +1158,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> 
{
     let content_b = gen_fixed_bytes(size);
     let stream = stream::iter(vec![content_a.clone(), 
content_b.clone()]).map(Ok);
 
-    let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?;
+    let mut w = op.writer_with(&path).buffer(5 * 1024 * 1024).await?;
     w.sink(stream).await?;
     w.close().await?;
 
@@ -1185,7 +1185,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> 
{
 /// Reading data into writer
 pub async fn test_writer_copy(op: Operator) -> Result<()> {
     let cap = op.info().full_capability();
-    if !(cap.write && cap.write_without_content_length) {
+    if !(cap.write && cap.write_can_multi) {
         return Ok(());
     }
 
@@ -1194,7 +1194,7 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> 
{
     let content_a = gen_fixed_bytes(size);
     let content_b = gen_fixed_bytes(size);
 
-    let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?;
+    let mut w = op.writer_with(&path).buffer(5 * 1024 * 1024).await?;
 
     let mut content = Bytes::from([content_a.clone(), 
content_b.clone()].concat());
     while !content.is_empty() {
@@ -1226,7 +1226,7 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> 
{
 
 /// Copy data from reader to writer
 pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
-    if !(op.info().full_capability().write_without_content_length) {
+    if !(op.info().full_capability().write_can_multi) {
         return Ok(());
     }
 
@@ -1234,7 +1234,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> 
Result<()> {
     let (content, size): (Vec<u8>, usize) =
         gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
 
-    let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
+    let mut w = op.writer_with(&path).buffer(8 * 1024 * 1024).await?;
 
     // Wrap a buf reader here to make sure content is read in 1MiB chunks.
     let mut cursor = BufReader::with_capacity(1024 * 1024, 
Cursor::new(content.clone()));
@@ -1258,7 +1258,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> 
Result<()> {
 
 /// Add test for unsized writer
 pub async fn test_fuzz_unsized_writer(op: Operator) -> Result<()> {
-    if !op.info().full_capability().write_without_content_length {
+    if !op.info().full_capability().write_can_multi {
         warn!("{op:?} doesn't support write without content length, test 
skip");
         return Ok(());
     }
@@ -1267,7 +1267,7 @@ pub async fn test_fuzz_unsized_writer(op: Operator) -> 
Result<()> {
 
     let mut fuzzer = ObjectWriterFuzzer::new(&path, None);
 
-    let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
+    let mut w = op.writer_with(&path).buffer(8 * 1024 * 1024).await?;
 
     for _ in 0..100 {
         match fuzzer.fuzz() {


Reply via email to