This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new fda9670d6 refactor: Remove the requirement of passing `content_length` 
to writer (#3044)
fda9670d6 is described below

commit fda9670d6eba11e61175e65a29e6ee3f14120c33
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 13 10:16:36 2023 +0800

    refactor: Remove the requirement of passing `content_length` to writer 
(#3044)
    
    * Polish
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Save work
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * delay write for oneshot
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix doc test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add comments
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix naming
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix ftp
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add check for append
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 .github/workflows/service_test_webhdfs.yml    |  4 +-
 core/fuzz/fuzz_writer.rs                      |  2 +-
 core/src/layers/complete.rs                   | 86 +++++++--------------------
 core/src/raw/adapters/kv/backend.rs           | 18 +-----
 core/src/raw/adapters/typed_kv/api.rs         |  8 +--
 core/src/raw/adapters/typed_kv/backend.rs     |  7 +--
 core/src/raw/oio/write/one_shot_write.rs      | 68 ++++++++++++++-------
 core/src/raw/ops.rs                           | 35 +++--------
 core/src/services/azblob/writer.rs            |  6 +-
 core/src/services/azdfs/backend.rs            |  7 ---
 core/src/services/azdfs/writer.rs             | 15 ++---
 core/src/services/cos/backend.rs              |  4 +-
 core/src/services/dropbox/backend.rs          |  6 --
 core/src/services/dropbox/writer.rs           |  9 ++-
 core/src/services/fs/backend.rs               |  2 +-
 core/src/services/ftp/backend.rs              | 20 +++----
 core/src/services/ftp/writer.rs               | 60 +++++--------------
 core/src/services/gcs/backend.rs              |  4 +-
 core/src/services/gdrive/backend.rs           |  9 +--
 core/src/services/gdrive/writer.rs            |  4 +-
 core/src/services/ghac/backend.rs             |  9 +--
 core/src/services/ipmfs/backend.rs            | 15 ++---
 core/src/services/ipmfs/writer.rs             |  5 +-
 core/src/services/obs/backend.rs              |  4 +-
 core/src/services/onedrive/backend.rs         |  7 ---
 core/src/services/onedrive/writer.rs          |  4 +-
 core/src/services/oss/backend.rs              |  5 +-
 core/src/services/s3/backend.rs               |  5 +-
 core/src/services/sftp/backend.rs             |  3 +-
 core/src/services/supabase/backend.rs         |  7 ---
 core/src/services/supabase/writer.rs          | 11 ++--
 core/src/services/vercel_artifacts/backend.rs |  7 ---
 core/src/services/vercel_artifacts/writer.rs  | 11 ++--
 core/src/services/wasabi/backend.rs           |  7 ---
 core/src/services/wasabi/writer.rs            | 10 ++--
 core/src/services/webdav/backend.rs           |  7 ---
 core/src/services/webdav/writer.rs            | 10 ++--
 core/src/services/webhdfs/backend.rs          |  7 ---
 core/src/services/webhdfs/writer.rs           |  9 +--
 core/src/types/capability.rs                  | 71 ++++++++++------------
 core/src/types/operator/blocking_operator.rs  |  2 +-
 core/src/types/operator/operator.rs           |  2 +-
 core/src/types/operator/operator_functions.rs | 28 ++-------
 core/src/types/operator/operator_futures.rs   | 41 ++-----------
 core/src/types/writer.rs                      | 42 +++++++++----
 core/tests/behavior/write.rs                  | 18 +++---
 46 files changed, 266 insertions(+), 455 deletions(-)

diff --git a/.github/workflows/service_test_webhdfs.yml 
b/.github/workflows/service_test_webhdfs.yml
index 3487c2db6..f32d9d2d3 100644
--- a/.github/workflows/service_test_webhdfs.yml
+++ b/.github/workflows/service_test_webhdfs.yml
@@ -37,7 +37,7 @@ concurrency:
   cancel-in-progress: true
 
 jobs:
-  hdfs:
+  webhdfs:
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v3
@@ -76,7 +76,7 @@ jobs:
           OPENDAL_WEBHDFS_ROOT: /
           OPENDAL_WEBHDFS_ENDPOINT: http://127.0.0.1:9870
 
-  hdfs_with_list_batch_disabled:
+  webhdfs_with_list_batch_disabled:
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v3
diff --git a/core/fuzz/fuzz_writer.rs b/core/fuzz/fuzz_writer.rs
index 2f992b919..ff1eb0bdb 100644
--- a/core/fuzz/fuzz_writer.rs
+++ b/core/fuzz/fuzz_writer.rs
@@ -107,7 +107,7 @@ async fn fuzz_writer(op: Operator, input: FuzzInput) -> 
Result<()> {
 
     let checker = WriteChecker::new(total_size);
 
-    let mut writer = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
+    let mut writer = op.writer_with(&path).buffer(8 * 1024 * 1024).await?;
 
     for chunk in &checker.chunks {
         writer.write(chunk.clone()).await?;
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index cc767f90e..1abed7bbd 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -426,12 +426,17 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
         if !capability.write {
             return new_capability_unsupported_error(Operation::Write);
         }
+        if args.append() && !capability.write_can_append {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "write with append enabled is not supported",
+            ));
+        }
 
-        let size = args.content_length();
         self.inner
             .write(path, args)
             .await
-            .map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
+            .map(|(rp, w)| (rp, CompleteWriter::new(w)))
     }
 
     fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
@@ -439,11 +444,16 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
         if !capability.write || !capability.blocking {
             return new_capability_unsupported_error(Operation::BlockingWrite);
         }
+        if args.append() && !capability.write_can_append {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "write with append enabled is not supported",
+            ));
+        }
 
-        let size = args.content_length();
         self.inner
             .blocking_write(path, args)
-            .map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
+            .map(|(rp, w)| (rp, CompleteWriter::new(w)))
     }
 
     async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
@@ -681,17 +691,11 @@ where
 
 pub struct CompleteWriter<W> {
     inner: Option<W>,
-    size: Option<u64>,
-    written: u64,
 }
 
 impl<W> CompleteWriter<W> {
-    pub fn new(inner: W, size: Option<u64>) -> CompleteWriter<W> {
-        CompleteWriter {
-            inner: Some(inner),
-            size,
-            written: 0,
-        }
+    pub fn new(inner: W) -> CompleteWriter<W> {
+        CompleteWriter { inner: Some(inner) }
     }
 }
 
@@ -717,52 +721,27 @@ where
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
         let n = ready!(w.poll_write(cx, bs))?;
-        self.written += n as u64;
-
-        if let Some(size) = self.size {
-            if self.written > size {
-                return Poll::Ready(Err(Error::new(
-                    ErrorKind::ContentTruncated,
-                    &format!(
-                        "writer got too much data, expect: {size}, actual: {}",
-                        self.written + n as u64
-                    ),
-                )));
-            }
-        }
 
         Poll::Ready(Ok(n))
     }
 
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
 
-        ready!(w.poll_abort(cx))?;
+        ready!(w.poll_close(cx))?;
         self.inner = None;
 
         Poll::Ready(Ok(()))
     }
 
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        if let Some(size) = self.size {
-            if self.written < size {
-                return Poll::Ready(Err(Error::new(
-                    ErrorKind::ContentIncomplete,
-                    &format!(
-                        "writer got too less data, expect: {size}, actual: {}",
-                        self.written
-                    ),
-                )));
-            }
-        }
-
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
 
-        ready!(w.poll_close(cx))?;
+        ready!(w.poll_abort(cx))?;
         self.inner = None;
 
         Poll::Ready(Ok(()))
@@ -778,36 +757,11 @@ where
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
         let n = w.write(bs)?;
-        self.written += n as u64;
-
-        if let Some(size) = self.size {
-            if self.written > size {
-                return Err(Error::new(
-                    ErrorKind::ContentTruncated,
-                    &format!(
-                        "writer got too much data, expect: {size}, actual: {}",
-                        self.written + n as u64
-                    ),
-                ));
-            }
-        }
 
         Ok(n)
     }
 
     fn close(&mut self) -> Result<()> {
-        if let Some(size) = self.size {
-            if self.written < size {
-                return Err(Error::new(
-                    ErrorKind::ContentIncomplete,
-                    &format!(
-                        "writer got too less data, expect: {size}, actual: {}",
-                        self.written
-                    ),
-                ));
-            }
-        }
-
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index db82e1cd6..f7a40e698 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -143,27 +143,13 @@ impl<S: Adapter> Accessor for Backend<S> {
         Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs)))
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let p = build_abs_path(&self.root, path);
 
         Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
     }
 
-    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
         let p = build_abs_path(&self.root, path);
 
         Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
diff --git a/core/src/raw/adapters/typed_kv/api.rs 
b/core/src/raw/adapters/typed_kv/api.rs
index 3217db576..7a1b929a0 100644
--- a/core/src/raw/adapters/typed_kv/api.rs
+++ b/core/src/raw/adapters/typed_kv/api.rs
@@ -121,13 +121,13 @@ impl Value {
 /// by Typed KV Operator.
 #[derive(Copy, Clone, Default)]
 pub struct Capability {
-    /// If typed_kv operator supports get natively, it will be true.
+    /// If typed_kv operator supports get natively.
     pub get: bool,
-    /// If typed_kv operator supports set natively, it will be true.
+    /// If typed_kv operator supports set natively.
     pub set: bool,
-    /// If typed_kv operator supports delete natively, it will be true.
+    /// If typed_kv operator supports delete natively.
     pub delete: bool,
-    /// If typed_kv operator supports scan natively, it will be true.
+    /// If typed_kv operator supports scan natively.
     pub scan: bool,
 }
 
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index e04822cdd..6d515b01a 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -393,6 +393,8 @@ impl<S> KvWriter<S> {
         let value = self.buf.take().map(Bytes::from).unwrap_or_default();
 
         let mut metadata = Metadata::new(EntryMode::FILE);
+        metadata.set_content_length(value.len() as u64);
+
         if let Some(v) = self.op.cache_control() {
             metadata.set_cache_control(v);
         }
@@ -402,11 +404,6 @@ impl<S> KvWriter<S> {
         if let Some(v) = self.op.content_type() {
             metadata.set_content_type(v);
         }
-        if let Some(v) = self.op.content_length() {
-            metadata.set_content_length(v);
-        } else {
-            metadata.set_content_length(value.len() as u64);
-        }
 
         Value { metadata, value }
     }
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index c56679e19..4efb3655b 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -20,7 +20,6 @@ use std::task::Context;
 use std::task::Poll;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use futures::future::BoxFuture;
 
 use crate::raw::*;
@@ -37,17 +36,18 @@ pub trait OneShotWrite: Send + Sync + Unpin + 'static {
     /// write_once write all data at once.
     ///
     /// Implementations should make sure that the data is written correctly at 
once.
-    async fn write_once(&self, bs: Bytes) -> Result<()>;
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()>;
 }
 
 /// OneShotWrite is used to implement [`Write`] based on one shot.
 pub struct OneShotWriter<W: OneShotWrite> {
     state: State<W>,
+    buffer: Option<oio::ChunkedBytes>,
 }
 
 enum State<W> {
     Idle(Option<W>),
-    Write(BoxFuture<'static, (W, Result<usize>)>),
+    Write(BoxFuture<'static, (W, Result<()>)>),
 }
 
 /// # Safety
@@ -60,45 +60,73 @@ impl<W: OneShotWrite> OneShotWriter<W> {
     pub fn new(inner: W) -> Self {
         Self {
             state: State::Idle(Some(inner)),
+            buffer: None,
         }
     }
 }
 
 #[async_trait]
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
+    fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
+        loop {
+            match &mut self.state {
+                State::Idle(_) => {
+                    return match &self.buffer {
+                        Some(_) => Poll::Ready(Err(Error::new(
+                            ErrorKind::Unsupported,
+                            "OneShotWriter doesn't support multiple write",
+                        ))),
+                        None => {
+                            let size = bs.remaining();
+                            let bs = bs.vectored_bytes(size);
+                            self.buffer = 
Some(oio::ChunkedBytes::from_vec(bs));
+                            Poll::Ready(Ok(size))
+                        }
+                    }
+                }
+                State::Write(_) => {
+                    unreachable!("OneShotWriter must not go into State::Write 
during poll_write")
+                }
+            }
+        }
+    }
+
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         loop {
             match &mut self.state {
                 State::Idle(w) => {
                     let w = w.take().expect("writer must be valid");
 
-                    let size = bs.remaining();
-                    let bs = bs.bytes(size);
-                    let fut = async move {
-                        let res = w.write_once(bs).await;
+                    match self.buffer.clone() {
+                        Some(bs) => {
+                            let fut = Box::pin(async move {
+                                let res = w.write_once(&bs).await;
 
-                        (w, res.map(|_| size))
-                    };
+                                (w, res)
+                            });
+                            self.state = State::Write(fut);
+                        }
+                        None => {
+                            let fut = Box::pin(async move {
+                                let res = w.write_once(&"".as_bytes()).await;
 
-                    self.state = State::Write(Box::pin(fut));
+                                (w, res)
+                            });
+                            self.state = State::Write(fut);
+                        }
+                    };
                 }
                 State::Write(fut) => {
-                    let (w, size) = ready!(fut.as_mut().poll(cx));
+                    let (w, res) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(w));
-                    return Poll::Ready(size);
+                    return Poll::Ready(res);
                 }
             }
         }
     }
 
     fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        Poll::Ready(Err(Error::new(
-            ErrorKind::Unsupported,
-            "OneShotWriter doesn't support abort since all content has been 
flushed",
-        )))
-    }
-
-    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        self.buffer = None;
         Poll::Ready(Ok(()))
     }
 }
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index 0404ed468..1e60d329e 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -405,9 +405,8 @@ impl OpStat {
 #[derive(Debug, Clone, Default)]
 pub struct OpWrite {
     append: bool,
+    buffer: Option<usize>,
 
-    buffer_size: Option<usize>,
-    content_length: Option<u64>,
     content_type: Option<String>,
     content_disposition: Option<String>,
     cache_control: Option<String>,
@@ -440,39 +439,23 @@ impl OpWrite {
         self
     }
 
-    /// Get the buffer size from op.
+    /// Get the buffer from op.
     ///
-    /// The buffer size is used by service to decide the buffer size of the 
underlying writer.
-    pub fn buffer_size(&self) -> Option<usize> {
-        self.buffer_size
+    /// The buffer is used by service to decide the buffer size of the 
underlying writer.
+    pub fn buffer(&self) -> Option<usize> {
+        self.buffer
     }
 
-    /// Set the buffer size of op.
+    /// Set the buffer of op.
     ///
-    /// If buffer size is set, the data will be buffered by the underlying 
writer.
+    /// If buffer is set, the data will be buffered by the underlying writer.
     ///
     /// ## NOTE
     ///
     /// Service could have their own minimum buffer size while perform write 
operations like
     /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
-    pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
-        self.buffer_size = Some(buffer_size);
-        self
-    }
-
-    /// Get the content length from op.
-    ///
-    /// The content length is the total length of the data to be written.
-    pub fn content_length(&self) -> Option<u64> {
-        self.content_length
-    }
-
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn with_content_length(mut self, content_length: u64) -> Self {
-        self.content_length = Some(content_length);
+    pub fn with_buffer(mut self, buffer: usize) -> Self {
+        self.buffer = Some(buffer);
         self
     }
 
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index 029278ad5..301c9f733 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::AzblobCore;
@@ -46,13 +45,14 @@ impl AzblobWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for AzblobWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
         let mut req = self.core.azblob_put_blob_request(
             &self.path,
             Some(bs.len() as u64),
             self.op.content_type(),
             self.op.cache_control(),
-            AsyncBody::Bytes(bs),
+            AsyncBody::ChunkedBytes(bs),
         )?;
 
         self.core.sign(&mut req).await?;
diff --git a/core/src/services/azdfs/backend.rs 
b/core/src/services/azdfs/backend.rs
index 1c0dd3a8c..c729ab754 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -296,13 +296,6 @@ impl Accessor for AzdfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(AzdfsWriter::new(self.core.clone(), args, 
path.to_string())),
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 9e2a962f2..0424fe494 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -18,11 +18,11 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::AzdfsCore;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -41,7 +41,7 @@ impl AzdfsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for AzdfsWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
         let mut req = self.core.azdfs_create_request(
             &self.path,
             "file",
@@ -66,11 +66,12 @@ impl oio::OneShotWrite for AzdfsWriter {
             }
         }
 
-        let size = bs.len();
-
-        let mut req =
-            self.core
-                .azdfs_update_request(&self.path, Some(size), 
AsyncBody::Bytes(bs))?;
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+        let mut req = self.core.azdfs_update_request(
+            &self.path,
+            Some(bs.len()),
+            AsyncBody::ChunkedBytes(bs),
+        )?;
 
         self.core.sign(&mut req).await?;
 
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index eced1ccc9..897c4c10a 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -272,10 +272,10 @@ impl Accessor for CosBackend {
 
                 write: true,
                 write_can_append: true,
+                write_can_multi: true,
                 write_with_content_type: true,
                 write_with_cache_control: true,
                 write_with_content_disposition: true,
-                write_without_content_length: true,
 
                 delete: true,
                 create_dir: true,
@@ -342,7 +342,7 @@ impl Accessor for CosBackend {
             CosWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
+        let w = if let Some(buffer_size) = args.buffer() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
             let w = oio::ExactBufWriter::new(w, buffer_size);
diff --git a/core/src/services/dropbox/backend.rs 
b/core/src/services/dropbox/backend.rs
index 029f1fe7f..8400b3300 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -106,12 +106,6 @@ impl Accessor for DropboxBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(DropboxWriter::new(
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 3a5c6cdd7..b5a4a69af 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::DropboxCore;
@@ -40,16 +39,16 @@ impl DropboxWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for DropboxWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
 
         let resp = self
             .core
             .dropbox_update(
                 &self.path,
-                Some(size),
+                Some(bs.len()),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::ChunkedBytes(bs),
             )
             .await?;
         let status = resp.status();
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 950cfd76e..b1f1fdf49 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -263,7 +263,7 @@ impl Accessor for FsBackend {
 
                 write: true,
                 write_can_append: true,
-                write_without_content_length: true,
+                write_can_multi: true,
                 create_dir: true,
                 delete: true,
 
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index ed960c5f7..2fe8ffe9a 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -42,6 +42,7 @@ use super::pager::FtpPager;
 use super::util::FtpReader;
 use super::writer::FtpWriter;
 use crate::raw::*;
+use crate::services::ftp::writer::FtpWriters;
 use crate::*;
 
 /// FTP and FTPS services support.
@@ -264,7 +265,7 @@ impl Debug for FtpBackend {
 impl Accessor for FtpBackend {
     type Reader = FtpReader;
     type BlockingReader = ();
-    type Writer = FtpWriter;
+    type Writer = FtpWriters;
     type BlockingWriter = ();
     type Pager = FtpPager;
     type BlockingPager = ();
@@ -351,14 +352,7 @@ impl Accessor for FtpBackend {
         Ok((RpRead::new(size), FtpReader::new(r, ftp_stream)))
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         // Ensure the parent dir exists.
         let parent = get_parent(path);
         let paths: Vec<&str> = parent.split('/').collect();
@@ -381,10 +375,10 @@ impl Accessor for FtpBackend {
             }
         }
 
-        Ok((
-            RpWrite::new(),
-            FtpWriter::new(self.clone(), path.to_string()),
-        ))
+        let w = FtpWriter::new(self.clone(), path.to_string());
+        let w = oio::OneShotWriter::new(w);
+
+        Ok((RpWrite::new(), w))
     }
 
     async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 4b2658907..79ae8e55d 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -15,24 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::task::ready;
-use std::task::Context;
-use std::task::Poll;
-
 use async_trait::async_trait;
-use futures::future::BoxFuture;
 use futures::AsyncWriteExt;
-use futures::FutureExt;
 
 use super::backend::FtpBackend;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
+pub type FtpWriters = oio::OneShotWriter<FtpWriter>;
+
 pub struct FtpWriter {
     backend: FtpBackend,
     path: String,
-
-    fut: Option<BoxFuture<'static, Result<usize>>>,
 }
 
 /// # TODO
@@ -42,11 +37,7 @@ pub struct FtpWriter {
 /// After we can use data stream, we should return it directly.
 impl FtpWriter {
     pub fn new(backend: FtpBackend, path: String) -> Self {
-        FtpWriter {
-            backend,
-            path,
-            fut: None,
-        }
+        FtpWriter { backend, path }
     }
 }
 
@@ -56,39 +47,18 @@ impl FtpWriter {
 unsafe impl Sync for FtpWriter {}
 
 #[async_trait]
-impl oio::Write for FtpWriter {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        loop {
-            if let Some(fut) = self.fut.as_mut() {
-                let res = ready!(fut.poll_unpin(cx));
-                self.fut = None;
-                return Poll::Ready(res);
-            }
-
-            let size = bs.remaining();
-            let bs = bs.bytes(size);
+impl oio::OneShotWrite for FtpWriter {
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let size = bs.remaining();
+        let bs = bs.bytes(size);
 
-            let path = self.path.clone();
-            let backend = self.backend.clone();
-            let fut = async move {
-                let mut ftp_stream = 
backend.ftp_connect(Operation::Write).await?;
-                let mut data_stream = 
ftp_stream.append_with_stream(&path).await?;
-                data_stream.write_all(&bs).await.map_err(|err| {
-                    Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
-                })?;
-
-                ftp_stream.finalize_put_stream(data_stream).await?;
-                Ok(size)
-            };
-            self.fut = Some(Box::pin(fut));
-        }
-    }
-
-    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        Poll::Ready(Ok(()))
-    }
+        let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
+        let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
+        data_stream.write_all(&bs).await.map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
+        })?;
 
-    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        Poll::Ready(Ok(()))
+        ftp_stream.finalize_put_stream(data_stream).await?;
+        Ok(())
     }
 }
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 9e26cdce1..20e7a1f5e 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -362,8 +362,8 @@ impl Accessor for GcsBackend {
                 read_with_if_none_match: true,
 
                 write: true,
+                write_can_multi: true,
                 write_with_content_type: true,
-                write_without_content_length: true,
                 delete: true,
                 copy: true,
 
@@ -423,7 +423,7 @@ impl Accessor for GcsBackend {
         let w = GcsWriter::new(self.core.clone(), path, args.clone());
         let w = oio::RangeWriter::new(w);
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
+        let w = if let Some(buffer_size) = args.buffer() {
             // FIXME: we should align with 256KiB instead.
             let buffer_size = max(DEFAULT_WRITE_FIXED_SIZE, buffer_size);
 
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index ea67ed200..66c181d2b 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -166,14 +166,7 @@ impl Accessor for GdriveBackend {
         }
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         // As Google Drive allows files have the same name, we need to check 
if the file exists.
         // If the file exists, we will keep its ID and update it.
         let mut file_id: Option<String> = None;
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index e70f1754e..445ecb36f 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -23,6 +23,7 @@ use http::StatusCode;
 
 use super::core::GdriveCore;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -88,7 +89,8 @@ impl GdriveWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for GdriveWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = bs.bytes(bs.remaining());
         let size = bs.len();
         if self.file_id.is_none() {
             self.write_create(size as u64, bs).await?;
diff --git a/core/src/services/ghac/backend.rs 
b/core/src/services/ghac/backend.rs
index 8adce69a3..e78fdb405 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -404,14 +404,7 @@ impl Accessor for GhacBackend {
         }
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let req = self.ghac_reserve(path).await?;
 
         let resp = self.client.send(req).await?;
diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index eda6d8895..e7999767b 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -21,7 +21,6 @@ use std::str;
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::Request;
 use http::Response;
 use http::StatusCode;
@@ -121,14 +120,7 @@ impl Accessor for IpmfsBackend {
         }
     }
 
-    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(IpmfsWriter::new(self.clone(), 
path.to_string())),
@@ -290,7 +282,7 @@ impl IpmfsBackend {
     pub async fn ipmfs_write(
         &self,
         path: &str,
-        body: Bytes,
+        body: oio::ChunkedBytes,
     ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_rooted_abs_path(&self.root, path);
 
@@ -300,7 +292,8 @@ impl IpmfsBackend {
             percent_encode_path(&p)
         );
 
-        let multipart = 
Multipart::new().part(FormDataPart::new("data").content(body));
+        let multipart = Multipart::new()
+            .part(FormDataPart::new("data").stream(body.len() as u64, 
Box::new(body)));
 
         let req: http::request::Builder = Request::post(url);
         let req = multipart.apply(req)?;
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index e6a32ac1b..96d9dab18 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -16,11 +16,11 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::IpmfsBackend;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -38,7 +38,8 @@ impl IpmfsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for IpmfsWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
         let resp = self.backend.ipmfs_write(&self.path, bs).await?;
 
         let status = resp.status();
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 9cca757a8..7caa6d749 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -279,9 +279,9 @@ impl Accessor for ObsBackend {
 
                 write: true,
                 write_can_append: true,
+                write_can_multi: true,
                 write_with_content_type: true,
                 write_with_cache_control: true,
-                write_without_content_length: true,
 
                 delete: true,
                 create_dir: true,
@@ -380,7 +380,7 @@ impl Accessor for ObsBackend {
             ObsWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
+        let w = if let Some(buffer_size) = args.buffer() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
             let w = oio::ExactBufWriter::new(w, buffer_size);
diff --git a/core/src/services/onedrive/backend.rs 
b/core/src/services/onedrive/backend.rs
index 2b32abbaf..0001d8685 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -103,13 +103,6 @@ impl Accessor for OnedriveBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         let path = build_rooted_abs_path(&self.root, path);
 
         Ok((
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 5ad92ac63..326035acb 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -23,6 +23,7 @@ use super::backend::OnedriveBackend;
 use super::error::parse_error;
 use super::graph_model::OneDriveUploadSessionCreationRequestBody;
 use super::graph_model::OneDriveUploadSessionCreationResponseBody;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -45,7 +46,8 @@ impl OneDriveWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for OneDriveWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = bs.bytes(bs.remaining());
         let size = bs.len();
 
         if size <= Self::MAX_SIMPLE_SIZE {
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 6d027ba5b..9022079bc 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -404,10 +404,11 @@ impl Accessor for OssBackend {
 
                 write: true,
                 write_can_append: true,
+                write_can_multi: true,
                 write_with_cache_control: true,
                 write_with_content_type: true,
                 write_with_content_disposition: true,
-                write_without_content_length: true,
+
                 delete: true,
                 create_dir: true,
                 copy: true,
@@ -478,7 +479,7 @@ impl Accessor for OssBackend {
             OssWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
+        let w = if let Some(buffer_size) = args.buffer() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
             let w = oio::ExactBufWriter::new(w, buffer_size);
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index aa2b95eb8..89a8330b8 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -913,9 +913,10 @@ impl Accessor for S3Backend {
                 read_with_override_content_type: true,
 
                 write: true,
+                write_can_multi: true,
                 write_with_cache_control: true,
                 write_with_content_type: true,
-                write_without_content_length: true,
+
                 create_dir: true,
                 delete: true,
                 copy: true,
@@ -979,7 +980,7 @@ impl Accessor for S3Backend {
 
         let w = oio::MultipartUploadWriter::new(writer);
 
-        let w = if let Some(buffer_size) = args.buffer_size() {
+        let w = if let Some(buffer_size) = args.buffer() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
             oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
diff --git a/core/src/services/sftp/backend.rs 
b/core/src/services/sftp/backend.rs
index 931edcb23..0f9b7f3bd 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -243,7 +243,8 @@ impl Accessor for SftpBackend {
                 read_can_seek: true,
 
                 write: true,
-                write_without_content_length: true,
+                write_can_multi: true,
+
                 create_dir: true,
                 delete: true,
 
diff --git a/core/src/services/supabase/backend.rs 
b/core/src/services/supabase/backend.rs
index 402c3abe7..a5d0d3db1 100644
--- a/core/src/services/supabase/backend.rs
+++ b/core/src/services/supabase/backend.rs
@@ -224,13 +224,6 @@ impl Accessor for SupabaseBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(SupabaseWriter::new(self.core.clone(), 
path, args)),
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index c7fc6c8f8..b6929c4f5 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -18,11 +18,11 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -45,13 +45,14 @@ impl SupabaseWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for SupabaseWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+
         let mut req = self.core.supabase_upload_object_request(
             &self.path,
-            Some(size),
+            Some(bs.len()),
             self.op.content_type(),
-            AsyncBody::Bytes(bs),
+            AsyncBody::ChunkedBytes(bs),
         )?;
 
         self.core.sign(&mut req)?;
diff --git a/core/src/services/vercel_artifacts/backend.rs 
b/core/src/services/vercel_artifacts/backend.rs
index 4b88be664..9a0ae95cf 100644
--- a/core/src/services/vercel_artifacts/backend.rs
+++ b/core/src/services/vercel_artifacts/backend.rs
@@ -84,13 +84,6 @@ impl Accessor for VercelArtifactsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(VercelArtifactsWriter::new(
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index f62d76cf5..058e57041 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::VercelArtifactsBackend;
@@ -43,12 +42,16 @@ impl VercelArtifactsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for VercelArtifactsWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
 
         let resp = self
             .backend
-            .vercel_artifacts_put(self.path.as_str(), size as u64, 
AsyncBody::Bytes(bs))
+            .vercel_artifacts_put(
+                self.path.as_str(),
+                bs.len() as u64,
+                AsyncBody::ChunkedBytes(bs),
+            )
             .await?;
 
         let status = resp.status();
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
index 835492bad..dcb91f2ea 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -750,13 +750,6 @@ impl Accessor for WasabiBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(WasabiWriter::new(self.core.clone(), args, 
path.to_string())),
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index f9e27334e..cd76bf82c 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -18,11 +18,11 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -41,18 +41,18 @@ impl WasabiWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for WasabiWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
 
         let resp = self
             .core
             .put_object(
                 &self.path,
-                Some(size),
+                Some(bs.len()),
                 self.op.content_type(),
                 self.op.content_disposition(),
                 self.op.cache_control(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::ChunkedBytes(bs),
             )
             .await?;
 
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index cca8728ba..1f9aa1019 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -275,13 +275,6 @@ impl Accessor for WebdavBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         self.ensure_parent_path(path).await?;
 
         let p = build_abs_path(&self.root, path);
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 5b6ea319f..42de4fc0e 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -16,11 +16,11 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::WebdavBackend;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -39,17 +39,17 @@ impl WebdavWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for WebdavWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
 
         let resp = self
             .backend
             .webdav_put(
                 &self.path,
-                Some(size as u64),
+                Some(bs.len() as u64),
                 self.op.content_type(),
                 self.op.content_disposition(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::ChunkedBytes(bs),
             )
             .await?;
 
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index e8a3a8f04..b1bfed8ae 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -474,13 +474,6 @@ impl Accessor for WebhdfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         Ok((
             RpWrite::default(),
             oio::OneShotWriter::new(WebhdfsWriter::new(self.clone(), args, 
path.to_string())),
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index b323c0173..ddc8dd328 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -16,11 +16,11 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::WebhdfsBackend;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -39,14 +39,15 @@ impl WebhdfsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for WebhdfsWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    /// Using `bytes` instead of `vectored_bytes` to allow request to be 
redirected.
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = bs.bytes(bs.remaining());
 
         let req = self
             .backend
             .webhdfs_create_object_request(
                 &self.path,
-                Some(size),
+                Some(bs.len()),
                 self.op.content_type(),
                 AsyncBody::Bytes(bs),
             )
diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs
index 6d3389c2b..b568b3ee8 100644
--- a/core/src/types/capability.rs
+++ b/core/src/types/capability.rs
@@ -46,90 +46,85 @@ use std::fmt::Debug;
 /// - Operation with limitations should be named like `batch_max_operations`.
 #[derive(Copy, Clone, Default)]
 pub struct Capability {
-    /// If operator supports stat , it will be true.
+    /// If operator supports stat.
     pub stat: bool,
-    /// If operator supports stat with if match , it will be true.
+    /// If operator supports stat with if match.
     pub stat_with_if_match: bool,
-    /// If operator supports stat with if none match , it will be true.
+    /// If operator supports stat with if none match.
     pub stat_with_if_none_match: bool,
 
-    /// If operator supports read , it will be true.
+    /// If operator supports read.
     pub read: bool,
-    /// If operator supports seek on returning reader , it will
-    /// be true.
+    /// If operator supports seek on returning reader.
     pub read_can_seek: bool,
-    /// If operator supports next on returning reader , it will
-    /// be true.
+    /// If operator supports next on returning reader.
     pub read_can_next: bool,
-    /// If operator supports read with range , it will be true.
+    /// If operator supports read with range.
     pub read_with_range: bool,
-    /// If operator supports read with if match , it will be true.
+    /// If operator supports read with if match.
     pub read_with_if_match: bool,
-    /// If operator supports read with if none match , it will be true.
+    /// If operator supports read with if none match.
     pub read_with_if_none_match: bool,
-    /// if operator supports read with override cache control , it will be 
true.
+    /// if operator supports read with override cache control.
     pub read_with_override_cache_control: bool,
-    /// if operator supports read with override content disposition , it will 
be true.
+    /// if operator supports read with override content disposition.
     pub read_with_override_content_disposition: bool,
-    /// if operator supports read with override content type , it will be true.
+    /// if operator supports read with override content type.
     pub read_with_override_content_type: bool,
 
-    /// If operator supports write , it will be true.
+    /// If operator supports write.
     pub write: bool,
-    /// If operator supports write by append, it will be true.
+    /// If operator supports write can be called in multi times.
+    pub write_can_multi: bool,
+    /// If operator supports write by append.
     pub write_can_append: bool,
-    /// If operator supports write with without content length, it will
-    /// be true.
-    ///
-    /// This feature also be called as `Unsized` write or streaming write.
-    pub write_without_content_length: bool,
-    /// If operator supports write with content type , it will be true.
+    /// If operator supports write with content type.
     pub write_with_content_type: bool,
-    /// If operator supports write with content disposition , it will be true.
+    /// If operator supports write with content disposition.
     pub write_with_content_disposition: bool,
-    /// If operator supports write with cache control , it will be true.
+    /// If operator supports write with cache control.
     pub write_with_cache_control: bool,
 
-    /// If operator supports create dir , it will be true.
+    /// If operator supports create dir.
     pub create_dir: bool,
 
-    /// If operator supports delete , it will be true.
+    /// If operator supports delete.
     pub delete: bool,
 
-    /// If operator supports copy , it will be true.
+    /// If operator supports copy.
     pub copy: bool,
 
-    /// If operator supports rename , it will be true.
+    /// If operator supports rename.
     pub rename: bool,
 
-    /// If operator supports list , it will be true.
+    /// If operator supports list.
     pub list: bool,
-    /// If backend supports list with limit, it will be true.
+    /// If backend supports list with limit.
     pub list_with_limit: bool,
-    /// If backend supports list with start after, it will be true.
+    /// If backend supports list with start after.
     pub list_with_start_after: bool,
     /// If backend support list with using slash as delimiter.
     pub list_with_delimiter_slash: bool,
     /// If backend supports list without delimiter.
     pub list_without_delimiter: bool,
 
-    /// If operator supports presign , it will be true.
+    /// If operator supports presign.
     pub presign: bool,
-    /// If operator supports presign read , it will be true.
+    /// If operator supports presign read.
     pub presign_read: bool,
-    /// If operator supports presign stat , it will be true.
+    /// If operator supports presign stat.
     pub presign_stat: bool,
-    /// If operator supports presign write , it will be true.
+    /// If operator supports presign write.
     pub presign_write: bool,
 
-    /// If operator supports batch , it will be true.
+    /// If operator supports batch.
     pub batch: bool,
-    /// If operator supports batch delete , it will be true.
+    /// If operator supports batch delete.
     pub batch_delete: bool,
     /// The max operations that operator supports in batch.
     pub batch_max_operations: Option<usize>,
 
-    /// If operator supports blocking , it will be true.
+    /// If operator supports blocking.
     pub blocking: bool,
 }
 
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index 3468f8b6b..22a6208f1 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -550,7 +550,7 @@ impl BlockingOperator {
         FunctionWrite(OperatorFunction::new(
             self.inner().clone(),
             path,
-            (OpWrite::default().with_content_length(bs.len() as u64), bs),
+            (OpWrite::default(), bs),
             |inner, path, (args, mut bs)| {
                 if !validate_path(&path, EntryMode::FILE) {
                     return Err(
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index 771ffd677..8c8539bfd 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -717,7 +717,7 @@ impl Operator {
         let fut = FutureWrite(OperatorFuture::new(
             self.inner().clone(),
             path,
-            (OpWrite::default().with_content_length(bs.len() as u64), bs),
+            (OpWrite::default(), bs),
             |inner, path, (args, mut bs)| {
                 let fut = async move {
                     if !validate_path(&path, EntryMode::FILE) {
diff --git a/core/src/types/operator/operator_functions.rs 
b/core/src/types/operator/operator_functions.rs
index 49a0e7e85..367c5b93f 100644
--- a/core/src/types/operator/operator_functions.rs
+++ b/core/src/types/operator/operator_functions.rs
@@ -96,19 +96,8 @@ impl FunctionWrite {
     ///
     /// Service could have their own minimum buffer size while perform write 
operations like
     /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
-    pub fn buffer_size(mut self, v: usize) -> Self {
-        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
-        self
-    }
-
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn content_length(mut self, v: u64) -> Self {
-        self.0 = self
-            .0
-            .map_args(|(args, bs)| (args.with_content_length(v), bs));
+    pub fn buffer(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer(v), bs));
         self
     }
 
@@ -173,17 +162,8 @@ impl FunctionWriter {
     ///
     /// Service could have their own minimum buffer size while perform write 
operations like
     /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
-    pub fn buffer_size(mut self, v: usize) -> Self {
-        self.0 = self.0.map_args(|args| args.with_buffer_size(v));
-        self
-    }
-
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn content_length(mut self, v: u64) -> Self {
-        self.0 = self.0.map_args(|args| args.with_content_length(v));
+    pub fn buffer(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|args| args.with_buffer(v));
         self
     }
 
diff --git a/core/src/types/operator/operator_futures.rs 
b/core/src/types/operator/operator_futures.rs
index 7f2df677a..63c013f05 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -213,17 +213,6 @@ impl Future for FuturePresignRead {
 pub struct FuturePresignWrite(pub(crate) OperatorFuture<(OpWrite, Duration), 
PresignedRequest>);
 
 impl FuturePresignWrite {
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn content_length(mut self, v: u64) -> Self {
-        self.0 = self
-            .0
-            .map_args(|(args, dur)| (args.with_content_length(v), dur));
-        self
-    }
-
     /// Set the content type of option
     pub fn content_type(mut self, v: &str) -> Self {
         self.0 = self
@@ -403,19 +392,8 @@ impl FutureWrite {
     ///
     /// Service could have their own minimum buffer size while perform write 
operations like
     /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
-    pub fn buffer_size(mut self, v: usize) -> Self {
-        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
-        self
-    }
-
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn content_length(mut self, v: u64) -> Self {
-        self.0 = self
-            .0
-            .map_args(|(args, bs)| (args.with_content_length(v), bs));
+    pub fn buffer(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer(v), bs));
         self
     }
 
@@ -462,7 +440,7 @@ impl FutureWriter {
     ///
     /// If the append mode is set, the data will be appended to the end of the 
file.
     ///
-    /// # Notes
+    /// ## Notes
     ///
     /// Service could return `Unsupported` if the underlying storage does not 
support append.
     pub fn append(mut self, v: bool) -> Self {
@@ -478,17 +456,8 @@ impl FutureWriter {
     ///
     /// Service could have their own minimum buffer size while perform write 
operations like
     /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
-    pub fn buffer_size(mut self, v: usize) -> Self {
-        self.0 = self.0.map_args(|args| args.with_buffer_size(v));
-        self
-    }
-
-    /// Set the content length of op.
-    ///
-    /// If the content length is not set, the content length will be
-    /// calculated automatically by buffering part of data.
-    pub fn content_length(mut self, v: u64) -> Self {
-        self.0 = self.0.map_args(|args| args.with_content_length(v));
+    pub fn buffer(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|args| args.with_buffer(v));
         self
     }
 
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 56e87c5fb..b350f9625 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -38,20 +38,39 @@ use crate::*;
 /// Please make sure either `close` or `abort` has been called before
 /// dropping the writer otherwise the data could be lost.
 ///
-/// ## Notes
+/// ## Usage
+///
+/// ### Write Multiple Chunks
+///
+/// Some services support to write multiple chunks of data into given path. 
Services that doesn't
+/// support write multiple chunks will return [`ErrorKind::Unsupported`] error 
when calling `write`
+/// at the second time.
+///
+/// ```no_build
+/// let mut w = op.writer("path/to/file").await?;
+/// w.write(bs).await?;
+/// w.write(bs).await?;
+/// w.close().await?
+/// ```
+///
+/// Our writer also provides [`Writer::sink`] and [`Writer::copy`] support.
+///
+/// Besides, our writer implements [`AsyncWrite`] and 
[`tokio::io::AsyncWrite`].
+///
+/// ### Write with append enabled
 ///
-/// Writer can be used in two ways:
+/// Writer also supports to write with append enabled. This is useful when 
users want to append
+/// some data to the end of the file.
 ///
-/// - Sized: write data with a known size by specify the content length.
-/// - Unsized: write data with an unknown size, also known as streaming.
+/// - If file doesn't exist, it will be created and just like calling `write`.
+/// - If file exists, data will be appended to the end of the file.
 ///
-/// All services will support `sized` writer and provide special optimization 
if
-/// the given data size is the same as the content length, allowing them to
-/// be written in one request.
+/// Possible Errors:
 ///
-/// Some services also supports `unsized` writer. They MAY buffer part of the 
data
-/// and flush them into storage at needs. And finally, the file will be 
available
-/// after `close` has been called.
+/// - Some services store normal file and appendable file in different way. 
Trying to append
+///   on non-appendable file could return [`ErrorKind::ConditionNotMatch`] 
error.
+/// - Services that doesn't support append will return 
[`ErrorKind::Unsupported`] error when
+///   creating writer with `append` enabled.
 pub struct Writer {
     inner: oio::Writer,
 }
@@ -105,7 +124,6 @@ impl Writer {
     /// async fn sink_example(op: Operator) -> Result<()> {
     ///     let mut w = op
     ///         .writer_with("path/to/file")
-    ///         .content_length(2 * 4096)
     ///         .await?;
     ///     let stream = stream::iter(vec![vec![0; 4096], vec![1; 
4096]]).map(Ok);
     ///     w.sink(stream).await?;
@@ -154,7 +172,7 @@ impl Writer {
     ///
     /// #[tokio::main]
     /// async fn copy_example(op: Operator) -> Result<()> {
-    ///     let mut w = 
op.writer_with("path/to/file").content_length(4096).await?;
+    ///     let mut w = op.writer_with("path/to/file").await?;
     ///     let reader = Cursor::new(vec![0; 4096]);
     ///     w.copy(reader).await?;
     ///     w.close().await?;
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 60125243a..f20cec326 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -1111,7 +1111,7 @@ pub async fn test_delete_stream(op: Operator) -> 
Result<()> {
 
 /// Append data into writer
 pub async fn test_writer_write(op: Operator) -> Result<()> {
-    if !(op.info().full_capability().write_without_content_length) {
+    if !(op.info().full_capability().write_can_multi) {
         return Ok(());
     }
 
@@ -1148,7 +1148,7 @@ pub async fn test_writer_write(op: Operator) -> 
Result<()> {
 /// Streaming data into writer
 pub async fn test_writer_sink(op: Operator) -> Result<()> {
     let cap = op.info().full_capability();
-    if !(cap.write && cap.write_without_content_length) {
+    if !(cap.write && cap.write_can_multi) {
         return Ok(());
     }
 
@@ -1158,7 +1158,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> 
{
     let content_b = gen_fixed_bytes(size);
     let stream = stream::iter(vec![content_a.clone(), 
content_b.clone()]).map(Ok);
 
-    let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?;
+    let mut w = op.writer_with(&path).buffer(5 * 1024 * 1024).await?;
     w.sink(stream).await?;
     w.close().await?;
 
@@ -1185,7 +1185,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> 
{
 /// Reading data into writer
 pub async fn test_writer_copy(op: Operator) -> Result<()> {
     let cap = op.info().full_capability();
-    if !(cap.write && cap.write_without_content_length) {
+    if !(cap.write && cap.write_can_multi) {
         return Ok(());
     }
 
@@ -1194,7 +1194,7 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> 
{
     let content_a = gen_fixed_bytes(size);
     let content_b = gen_fixed_bytes(size);
 
-    let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?;
+    let mut w = op.writer_with(&path).buffer(5 * 1024 * 1024).await?;
 
     let mut content = Bytes::from([content_a.clone(), 
content_b.clone()].concat());
     while !content.is_empty() {
@@ -1226,7 +1226,7 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> 
{
 
 /// Copy data from reader to writer
 pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
-    if !(op.info().full_capability().write_without_content_length) {
+    if !(op.info().full_capability().write_can_multi) {
         return Ok(());
     }
 
@@ -1234,7 +1234,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> 
Result<()> {
     let (content, size): (Vec<u8>, usize) =
         gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
 
-    let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
+    let mut w = op.writer_with(&path).buffer(8 * 1024 * 1024).await?;
 
     // Wrap a buf reader here to make sure content is read in 1MiB chunks.
     let mut cursor = BufReader::with_capacity(1024 * 1024, 
Cursor::new(content.clone()));
@@ -1258,7 +1258,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> 
Result<()> {
 
 /// Add test for unsized writer
 pub async fn test_fuzz_unsized_writer(op: Operator) -> Result<()> {
-    if !op.info().full_capability().write_without_content_length {
+    if !op.info().full_capability().write_can_multi {
         warn!("{op:?} doesn't support write without content length, test 
skip");
         return Ok(());
     }
@@ -1267,7 +1267,7 @@ pub async fn test_fuzz_unsized_writer(op: Operator) -> 
Result<()> {
 
     let mut fuzzer = ObjectWriterFuzzer::new(&path, None);
 
-    let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
+    let mut w = op.writer_with(&path).buffer(8 * 1024 * 1024).await?;
 
     for _ in 0..100 {
         match fuzzer.fuzz() {

Reply via email to