This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 56e0c6ad refactor: Polish Writer API by merging append and write 
together (#2036)
56e0c6ad is described below

commit 56e0c6addb6d7564387051a251efba24477f0128
Author: Xuanwo <[email protected]>
AuthorDate: Wed Apr 19 18:10:11 2023 +0800

    refactor: Polish Writer API by merging append and write together (#2036)
    
    * Save work
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Remove append
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add test for copy
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix tests
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix typo
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix unit tests
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/concurrent_limit.rs          |   8 --
 core/src/layers/error_context.rs             |  16 ---
 core/src/layers/logging.rs                   |  66 ----------
 core/src/layers/metrics.rs                   |  23 ----
 core/src/layers/minitrace.rs                 |  16 ---
 core/src/layers/oteltrace.rs                 |   8 --
 core/src/layers/prometheus.rs                |  33 -----
 core/src/layers/retry.rs                     |  35 ------
 core/src/layers/tracing.rs                   |  16 ---
 core/src/raw/adapters/kv/backend.rs          |  44 +++----
 core/src/raw/oio/cursor.rs                   | 171 ++++++++++++++++++++++++++
 core/src/raw/oio/mod.rs                      |   1 +
 core/src/raw/oio/write.rs                    |  60 +++------
 core/src/services/azblob/backend.rs          |   4 +-
 core/src/services/azblob/writer.rs           |   9 --
 core/src/services/azdfs/backend.rs           |   4 +-
 core/src/services/azdfs/writer.rs            |   9 --
 core/src/services/fs/writer.rs               |  22 ----
 core/src/services/ftp/backend.rs             |   4 +-
 core/src/services/ftp/writer.rs              |   9 --
 core/src/services/gcs/backend.rs             |  25 +---
 core/src/services/gcs/core.rs                |  10 +-
 core/src/services/gcs/writer.rs              | 176 +++++++++++++++++----------
 core/src/services/ghac/backend.rs            |   4 +-
 core/src/services/ghac/writer.rs             |   9 --
 core/src/services/hdfs/writer.rs             |  25 ----
 core/src/services/ipmfs/backend.rs           |   4 +-
 core/src/services/ipmfs/writer.rs            |   9 --
 core/src/services/obs/backend.rs             |  10 +-
 core/src/services/obs/core.rs                |   5 -
 core/src/services/obs/writer.rs              |  10 --
 core/src/services/oss/backend.rs             |  18 +--
 core/src/services/oss/writer.rs              | 108 +++++++++++++---
 core/src/services/s3/backend.rs              |  32 +----
 core/src/services/s3/core.rs                 |   5 -
 core/src/services/s3/writer.rs               | 131 ++++++++++++++++----
 core/src/services/wasabi/backend.rs          |   2 +-
 core/src/services/wasabi/writer.rs           |  80 +++++-------
 core/src/services/webdav/backend.rs          |   4 +-
 core/src/services/webdav/writer.rs           |   9 --
 core/src/services/webhdfs/backend.rs         |   4 +-
 core/src/services/webhdfs/writer.rs          |   9 --
 core/src/types/operator/blocking_operator.rs |  22 ++--
 core/src/types/operator/operator.rs          |  26 ++--
 core/src/types/ops.rs                        |  32 ++---
 core/src/types/writer.rs                     |  97 ++++++++++++---
 core/tests/behavior/utils.rs                 |  11 ++
 core/tests/behavior/write.rs                 |  50 ++++++--
 48 files changed, 756 insertions(+), 729 deletions(-)

diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 99e947b3..c1ccb76d 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -316,10 +316,6 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
         self.inner.write(bs).await
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs).await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await
     }
@@ -334,10 +330,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
ConcurrentLimitWrapper<R> {
         self.inner.write(bs)
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs)
-    }
-
     fn close(&mut self) -> Result<()> {
         self.inner.close()
     }
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 29724613..5417a040 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -411,14 +411,6 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs).await.map_err(|err| {
-            err.with_operation(WriteOperation::Append)
-                .with_context("service", self.scheme)
-                .with_context("path", &self.path)
-        })
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             err.with_operation(WriteOperation::Append)
@@ -445,14 +437,6 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for 
ErrorContextWrapper<T> {
         })
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs).map_err(|err| {
-            err.with_operation(WriteOperation::BlockingAppend)
-                .with_context("service", self.scheme)
-                .with_context("path", &self.path)
-        })
-    }
-
     fn close(&mut self) -> Result<()> {
         self.inner.close().map_err(|err| {
             err.with_operation(WriteOperation::BlockingClose)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 706287ad..abecc48f 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1386,39 +1386,6 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        match self.inner.append(bs).await {
-            Ok(_) => {
-                self.written += size as u64;
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={} -> data write 
{}B",
-                    self.scheme,
-                    WriteOperation::Append,
-                    self.path,
-                    self.written,
-                    size
-                );
-                Ok(())
-            }
-            Err(err) => {
-                if let Some(lvl) = self.failure_level {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={} -> data 
write failed: {err:?}",
-                        self.scheme,
-                        WriteOperation::Append,
-                        self.path,
-                        self.written,
-                    )
-                }
-                Err(err)
-            }
-        }
-    }
-
     async fn abort(&mut self) -> Result<()> {
         match self.inner.abort().await {
             Ok(_) => {
@@ -1504,39 +1471,6 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for 
LoggingWriter<W> {
         }
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        match self.inner.append(bs) {
-            Ok(_) => {
-                self.written += size as u64;
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={} -> data write 
{}B",
-                    self.scheme,
-                    WriteOperation::BlockingAppend,
-                    self.path,
-                    self.written,
-                    size
-                );
-                Ok(())
-            }
-            Err(err) => {
-                if let Some(lvl) = self.failure_level {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={} -> data 
write failed: {err:?}",
-                        self.scheme,
-                        WriteOperation::BlockingAppend,
-                        self.path,
-                        self.written,
-                    )
-                }
-                Err(err)
-            }
-        }
-    }
-
     fn close(&mut self) -> Result<()> {
         match self.inner.close() {
             Ok(_) => Ok(()),
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 2aff9003..6bdfe19f 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -925,18 +925,6 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
             })
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        self.inner
-            .append(bs)
-            .await
-            .map(|_| self.bytes += size as u64)
-            .map_err(|err| {
-                self.handle.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             self.handle.increment_errors_total(self.op, err.kind());
@@ -964,17 +952,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
MetricWrapper<R> {
             })
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        self.inner
-            .append(bs)
-            .map(|_| self.bytes += size as u64)
-            .map_err(|err| {
-                self.handle.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     fn close(&mut self) -> Result<()> {
         self.inner.close().map_err(|err| {
             self.handle.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 3e91bac6..35c48d27 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,16 +337,6 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
             .await
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner
-            .append(bs)
-            .in_span(Span::enter_with_parent(
-                WriteOperation::Append.into_static(),
-                &self.span,
-            ))
-            .await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner
             .abort()
@@ -375,12 +365,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
MinitraceWrapper<R> {
         self.inner.write(bs)
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _span =
-            
Span::enter_with_parent(WriteOperation::BlockingAppend.into_static(), 
&self.span);
-        self.inner.append(bs)
-    }
-
     fn close(&mut self) -> Result<()> {
         let _span =
             
Span::enter_with_parent(WriteOperation::BlockingClose.into_static(), 
&self.span);
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index c607e975..f9f3a931 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -339,10 +339,6 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
         self.inner.write(bs).await
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs).await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await
     }
@@ -357,10 +353,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
OtelTraceWrapper<R> {
         self.inner.write(bs)
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs)
-    }
-
     fn close(&mut self) -> Result<()> {
         self.inner.close()
     }
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 4689c383..6919abc4 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -720,23 +720,6 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        self.inner
-            .append(bs)
-            .await
-            .map(|_| {
-                self.stats
-                    .bytes_total
-                    .with_label_values(&[&self.scheme, 
Operation::Write.into_static()])
-                    .observe(size as f64)
-            })
-            .map_err(|err| {
-                self.stats.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             self.stats.increment_errors_total(self.op, err.kind());
@@ -769,22 +752,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        self.inner
-            .append(bs)
-            .map(|_| {
-                self.stats
-                    .bytes_total
-                    .with_label_values(&[&self.scheme, 
Operation::BlockingWrite.into_static()])
-                    .observe(size as f64)
-            })
-            .map_err(|err| {
-                self.stats.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     fn close(&mut self) -> Result<()> {
         self.inner.close().map_err(|err| {
             self.stats.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 401bd53e..497c1ca8 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -651,27 +651,6 @@ impl<R: oio::Write> oio::Write for RetryWrapper<R> {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let mut backoff = self.builder.build();
-
-        loop {
-            match self.inner.append(bs.clone()).await {
-                Ok(v) => return Ok(v),
-                Err(e) if !e.is_temporary() => return Err(e),
-                Err(e) => match backoff.next() {
-                    None => return Err(e),
-                    Some(dur) => {
-                        warn!(target: "opendal::service",
-                              "operation={} path={} -> pager retry after {}s: 
error={:?}",
-                              WriteOperation::Append, self.path, 
dur.as_secs_f64(), e);
-                        tokio::time::sleep(dur).await;
-                        continue;
-                    }
-                },
-            }
-        }
-    }
-
     async fn abort(&mut self) -> Result<()> {
         let mut backoff = self.builder.build();
 
@@ -730,20 +709,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
RetryWrapper<R> {
             .map_err(|e| e.set_persistent())
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        { || self.inner.append(bs.clone()) }
-            .retry(&self.builder)
-            .when(|e| e.is_temporary())
-            .notify(move |err, dur| {
-                warn!(
-                target: "opendal::service",
-                "operation={} -> pager retry after {}s: error={:?}",
-                WriteOperation::BlockingAppend, dur.as_secs_f64(), err)
-            })
-            .call()
-            .map_err(|e| e.set_persistent())
-    }
-
     fn close(&mut self) -> Result<()> {
         { || self.inner.close() }
             .retry(&self.builder)
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 8f9b8a5d..2782d049 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -344,14 +344,6 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         self.inner.write(bs).await
     }
 
-    #[tracing::instrument(
-        parent = &self.span,
-        level = "trace",
-        skip_all)]
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs).await
-    }
-
     #[tracing::instrument(
         parent = &self.span,
         level = "trace",
@@ -378,14 +370,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
TracingWrapper<R> {
         self.inner.write(bs)
     }
 
-    #[tracing::instrument(
-        parent = &self.span,
-        level = "trace",
-        skip_all)]
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs)
-    }
-
     #[tracing::instrument(
         parent = &self.span,
         level = "trace",
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 15dc7beb..dae293e0 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -107,13 +107,27 @@ impl<S: Adapter> Accessor for Backend<S> {
         Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs)))
     }
 
-    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        if args.content_length().is_none() {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "write without content length is not supported",
+            ));
+        }
+
         let p = build_abs_path(&self.root, path);
 
         Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
     }
 
-    fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+        if args.content_length().is_none() {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "write without content length is not supported",
+            ));
+        }
+
         let p = build_abs_path(&self.root, path);
 
         Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
@@ -264,35 +278,17 @@ impl<S> KvWriter<S> {
             buf: None,
         }
     }
-
-    fn extend_buf(&mut self, bs: Bytes) {
-        if let Some(buf) = self.buf.as_mut() {
-            buf.extend(bs);
-        } else {
-            self.buf = Some(bs.into())
-        }
-    }
 }
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
+    // TODO: we need to support append in the future.
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         self.buf = Some(bs.into());
 
         Ok(())
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        if let Err(e) = self.kv.append(&self.path, 
bs.to_vec().as_slice()).await {
-            if e.kind() == ErrorKind::Unsupported {
-                self.extend_buf(bs);
-            } else {
-                return Err(e);
-            }
-        }
-        Ok(())
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
@@ -316,12 +312,6 @@ impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
         Ok(())
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.extend_buf(bs);
-
-        Ok(())
-    }
-
     fn close(&mut self) -> Result<()> {
         if let Some(buf) = self.buf.as_deref() {
             self.kv.blocking_set(&self.path, buf)?;
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index c0282ffd..1e34c9b9 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -15,12 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::VecDeque;
 use std::io::Read;
 use std::io::SeekFrom;
 use std::task::Context;
 use std::task::Poll;
 
+use bytes::Buf;
 use bytes::Bytes;
+use bytes::BytesMut;
 
 use crate::raw::*;
 use crate::*;
@@ -138,3 +141,171 @@ impl oio::BlockingRead for Cursor {
         }
     }
 }
+
+/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Read`]
+pub struct VectorCursor {
+    inner: VecDeque<Bytes>,
+    size: usize,
+}
+
+impl Default for VectorCursor {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl VectorCursor {
+    /// Create a new vector cursor.
+    pub fn new() -> Self {
+        Self {
+            inner: VecDeque::new(),
+            size: 0,
+        }
+    }
+
+    /// Returns `true` if current vector is empty.
+    pub fn is_empty(&self) -> bool {
+        self.size == 0
+    }
+
+    /// Return current bytes size of current vector.
+    pub fn len(&self) -> usize {
+        self.size
+    }
+
+    /// Push a new bytes into vector cursor.
+    pub fn push(&mut self, bs: Bytes) {
+        self.size += bs.len();
+        self.inner.push_back(bs);
+    }
+
+    /// Pop a bytes from vector cursor.
+    pub fn pop(&mut self) {
+        let bs = self.inner.pop_back();
+        self.size -= bs.expect("pop bytes must exist").len()
+    }
+
+    /// Clear the entire vector.
+    pub fn clear(&mut self) {
+        self.inner.clear();
+        self.size = 0;
+    }
+
+    /// Peak will read and copy exactly n bytes from current cursor
+    /// without change it's content.
+    ///
+    /// This function is useful if you want to read a fixed size
+    /// content to make sure it aligned.
+    ///
+    /// # Panics
+    ///
+    /// Panics if n is larger than current size.
+    ///
+    /// # TODO
+    ///
+    /// Optimize to avoid data copy.
+    pub fn peak_exact(&self, n: usize) -> Bytes {
+        assert!(n <= self.size, "peak size must smaller than current size");
+
+        // Avoid data copy if n is smaller than first chunk.
+        if self.inner[0].len() >= n {
+            return self.inner[0].slice(..n);
+        }
+
+        let mut bs = BytesMut::with_capacity(n);
+        let mut n = n;
+        for b in &self.inner {
+            if n == 0 {
+                break;
+            }
+            let len = b.len().min(n);
+            bs.extend_from_slice(&b[..len]);
+            n -= len;
+        }
+        bs.freeze()
+    }
+
+    /// peak_at_least will read and copy at least n bytes from current
+    /// cursor without change it's content.
+    ///
+    /// This function is useful if you only want to make sure the
+    /// returning bytes is larger.
+    ///
+    /// # Panics
+    ///
+    /// Panics if n is larger than current size.
+    ///
+    /// # TODO
+    ///
+    /// Optimize to avoid data copy.
+    pub fn peak_at_least(&self, n: usize) -> Bytes {
+        assert!(n <= self.size, "peak size must smaller than current size");
+
+        // Avoid data copy if n is smaller than first chunk.
+        if self.inner[0].len() >= n {
+            return self.inner[0].clone();
+        }
+
+        let mut bs = BytesMut::with_capacity(n);
+        let mut n = n;
+        for b in &self.inner {
+            if n == 0 {
+                break;
+            }
+            let len = b.len().min(n);
+            bs.extend_from_slice(&b[..len]);
+            n -= len;
+        }
+        bs.freeze()
+    }
+
+    /// Take will consume n bytes from current cursor.
+    ///
+    /// # Panics
+    ///
+    /// Panics if n is larger than current size.
+    pub fn take(&mut self, n: usize) {
+        assert!(n <= self.size, "take size must smamller than current size");
+
+        // Update current size
+        self.size -= n;
+
+        let mut n = n;
+        while n > 0 {
+            assert!(!self.inner.is_empty(), "inner must not be empty");
+
+            if self.inner[0].len() <= n {
+                n -= self.inner[0].len();
+                self.inner.pop_front();
+            } else {
+                self.inner[0].advance(n);
+                n = 0;
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_vector_cursor() {
+        let mut vc = VectorCursor::new();
+
+        vc.push(Bytes::from("hello"));
+        vc.push(Bytes::from("world"));
+
+        assert_eq!(vc.peak_exact(1), Bytes::from("h"));
+        assert_eq!(vc.peak_exact(1), Bytes::from("h"));
+        assert_eq!(vc.peak_exact(4), Bytes::from("hell"));
+        assert_eq!(vc.peak_exact(10), Bytes::from("helloworld"));
+
+        vc.take(1);
+        assert_eq!(vc.peak_exact(1), Bytes::from("e"));
+        vc.take(1);
+        assert_eq!(vc.peak_exact(1), Bytes::from("l"));
+        vc.take(5);
+        assert_eq!(vc.peak_exact(1), Bytes::from("r"));
+    }
+}
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index f82af4ca..56b48dda 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -43,6 +43,7 @@ pub use write::Writer;
 
 mod cursor;
 pub use cursor::Cursor;
+pub use cursor::VectorCursor;
 
 mod into_streamable;
 pub use into_streamable::into_streamable_reader;
diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs
index 05f93a93..a2437312 100644
--- a/core/src/raw/oio/write.rs
+++ b/core/src/raw/oio/write.rs
@@ -76,23 +76,30 @@ impl From<WriteOperation> for &'static str {
 pub type Writer = Box<dyn Write>;
 
 /// Write is the trait that OpenDAL returns to callers.
+///
+/// # Notes
+///
+/// There are two possible two cases:
+///
+/// - Sized: The total size of the object is known in advance.
+/// - Unsized: The total size of the object is unknown in advance.
+///
+/// And it's possible that the given bs length is less than the total
+/// content length. Users will call write multiple times to write
+/// the whole data.
 #[async_trait]
 pub trait Write: Unpin + Send + Sync {
-    /// Write whole content at once.
+    /// Write given into writer.
+    ///
+    /// # Notes
+    ///
+    /// It's possible that the given bs length is less than the total
+    /// content length. And users will call write multiple times.
     ///
-    /// To append multiple bytes together, use `append` instead.
+    /// Please make sure `write` is safe to re-enter.
     async fn write(&mut self, bs: Bytes) -> Result<()>;
 
-    /// Append bytes to the writer.
-    ///
-    /// It is highly recommended to align the length of the input bytes
-    /// into blocks of 4MiB (except the last block) for better performance
-    /// and compatibility.
-    async fn append(&mut self, bs: Bytes) -> Result<()>;
-
-    /// Abort the pending appendable writer.
-    /// #note
-    /// This method is only applicable to writers opened in append mode.
+    /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
 
     /// Close the writer and make sure all data has been flushed.
@@ -107,15 +114,6 @@ impl Write for () {
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
@@ -139,10 +137,6 @@ impl<T: Write + ?Sized> Write for Box<T> {
         (**self).write(bs).await
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        (**self).append(bs).await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         (**self).abort().await
     }
@@ -160,9 +154,6 @@ pub trait BlockingWrite: Send + Sync + 'static {
     /// Write whole content at once.
     fn write(&mut self, bs: Bytes) -> Result<()>;
 
-    /// Append content at tailing.
-    fn append(&mut self, bs: Bytes) -> Result<()>;
-
     /// Close the writer and make sure all data has been flushed.
     fn close(&mut self) -> Result<()>;
 }
@@ -174,15 +165,6 @@ impl BlockingWrite for () {
         unimplemented!("write is required to be implemented for 
oio::BlockingWrite")
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     fn close(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
@@ -198,10 +180,6 @@ impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
         (**self).write(bs)
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        (**self).append(bs)
-    }
-
     fn close(&mut self) -> Result<()> {
         (**self).close()
     }
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index dfd7ca18..659d9f5e 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -500,10 +500,10 @@ impl Accessor for AzblobBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index d74af141..9e6bdb40 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -65,15 +65,6 @@ impl oio::Write for AzblobWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/azdfs/backend.rs 
b/core/src/services/azdfs/backend.rs
index 5c0b68db..1dbe605e 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -354,10 +354,10 @@ impl Accessor for AzdfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index b9f6faf2..a6efaf08 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -87,15 +87,6 @@ impl oio::Write for AzdfsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 33dcb2d5..03d6d824 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -54,17 +54,6 @@ impl oio::Write for FsWriter<tokio::fs::File> {
     /// File could be partial written, so we will seek to start to make sure
     /// we write the same content.
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.f.rewind().await.map_err(parse_io_error)?;
-        self.f.write_all(&bs).await.map_err(parse_io_error)?;
-
-        Ok(())
-    }
-
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
         self.f
             .seek(SeekFrom::Start(self.pos))
             .await
@@ -101,17 +90,6 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
     /// File could be partial written, so we will seek to start to make sure
     /// we write the same content.
     fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.f.rewind().map_err(parse_io_error)?;
-        self.f.write_all(&bs).map_err(parse_io_error)?;
-
-        Ok(())
-    }
-
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    fn append(&mut self, bs: Bytes) -> Result<()> {
         self.f
             .seek(SeekFrom::Start(self.pos))
             .map_err(parse_io_error)?;
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index f509eff2..5dd4b0ff 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -388,10 +388,10 @@ impl Accessor for FtpBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index dec8c473..653dc003 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -53,15 +53,6 @@ impl oio::Write for FtpWriter {
         Ok(())
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 7e865dcb..aba03063 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -410,32 +410,9 @@ impl Accessor for GcsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let upload_location = if args.append() {
-            let resp = self.core.gcs_initiate_resumable_upload(path).await?;
-            let status = resp.status();
-
-            match status {
-                StatusCode::OK => {
-                    let bs = parse_location(resp.headers())
-                        .expect("Failed to retrieve location of resumable 
upload");
-                    if let Some(location) = bs {
-                        Some(String::from(location))
-                    } else {
-                        return Err(Error::new(
-                            ErrorKind::NotFound,
-                            "location is not in the response header",
-                        ));
-                    }
-                }
-                _ => return Err(parse_error(resp).await?),
-            }
-        } else {
-            None
-        };
-
         Ok((
             RpWrite::default(),
-            GcsWriter::new(self.core.clone(), args, path.to_string(), 
upload_location),
+            GcsWriter::new(self.core.clone(), path, args),
         ))
     }
 
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 18376acd..907d0ef4 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -322,7 +322,7 @@ impl GcsCore {
         &self,
         location: &str,
         size: u64,
-        written_bytes: u64,
+        written: u64,
         is_last_part: bool,
         body: AsyncBody,
     ) -> Result<Request<AsyncBody>> {
@@ -331,12 +331,12 @@ impl GcsCore {
         let range_header = if is_last_part {
             format!(
                 "bytes {}-{}/{}",
-                written_bytes,
-                written_bytes + size - 1,
-                written_bytes + size
+                written,
+                written + size - 1,
+                written + size
             )
         } else {
-            format!("bytes {}-{}/*", written_bytes, written_bytes + size - 1)
+            format!("bytes {}-{}/*", written, written + size - 1)
         };
 
         req = req
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index b47b2051..c4ed1f6e 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -29,37 +29,40 @@ use crate::*;
 
 pub struct GcsWriter {
     core: Arc<GcsCore>,
-
-    op: OpWrite,
     path: String,
+    op: OpWrite,
+
     location: Option<String>,
-    written_bytes: u64,
-    is_last_part_written: bool,
-    last: Option<Bytes>,
+    written: u64,
+    buffer: oio::VectorCursor,
+    buffer_size: usize,
 }
 
 impl GcsWriter {
-    pub fn new(
-        core: Arc<GcsCore>,
-        op: OpWrite,
-        path: String,
-        upload_location: Option<String>,
-    ) -> Self {
+    pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self {
         GcsWriter {
             core,
+            path: path.to_string(),
             op,
-            path,
-            location: upload_location,
-            written_bytes: 0,
-            is_last_part_written: false,
-            last: None,
+
+            location: None,
+            written: 0,
+            buffer: oio::VectorCursor::new(),
+            // The chunk size should be a multiple of 256 KiB
+            // (256 x 1024 bytes), unless it's the last chunk
+            // that completes the upload.
+            //
+            // Larger chunk sizes typically make uploads faster,
+            // but note that there's a tradeoff between speed and
+            // memory usage. It's recommended that you use at least
+            // 8 MiB for the chunk size.
+            //
+            // TODO: allow this value to be configured.
+            buffer_size: 8 * 1024 * 1024,
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for GcsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
         let mut req = self.core.gcs_insert_object_request(
             &percent_encode_path(&self.path),
             Some(bs.len()),
@@ -82,80 +85,119 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let location = if let Some(location) = &self.location {
-            location
-        } else {
-            return Ok(());
-        };
+    async fn initiate_upload(&self) -> Result<String> {
+        let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?;
+        let status = resp.status();
 
-        let result = if let Some(last) = &self.last {
-            let bytes_to_upload = last.slice(0..last.len());
-            let part_size = bytes_to_upload.len() as u64;
-            let is_last_part = part_size % (256 * 1024) != 0;
-            let mut req = self.core.gcs_upload_in_resumable_upload(
-                location,
-                part_size,
-                self.written_bytes,
-                is_last_part,
-                AsyncBody::Bytes(bytes_to_upload),
-            )?;
-
-            self.core.sign(&mut req).await?;
-
-            let resp = self.core.send(req).await?;
-
-            let status = resp.status();
-
-            match status {
-                StatusCode::OK | StatusCode::PERMANENT_REDIRECT => {
-                    if is_last_part {
-                        self.is_last_part_written = true
-                    } else {
-                        self.written_bytes += part_size;
-                    }
-                    Ok(())
+        match status {
+            StatusCode::OK => {
+                let bs = parse_location(resp.headers())?;
+                if let Some(location) = bs {
+                    Ok(location.to_string())
+                } else {
+                    Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "location is not in the response header",
+                    ))
+                }
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(&self, location: &str, bs: Bytes) -> Result<()> {
+        let mut req = self.core.gcs_upload_in_resumable_upload(
+            location,
+            bs.len() as u64,
+            self.written,
+            false,
+            AsyncBody::Bytes(bs),
+        )?;
+
+        self.core.sign(&mut req).await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+        match status {
+            StatusCode::OK | StatusCode::PERMANENT_REDIRECT => Ok(()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Write for GcsWriter {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let location = match &self.location {
+            Some(location) => location,
+            None => {
+                if self.op.content_length().unwrap_or_default() == bs.len() as 
u64
+                    && self.written == 0
+                {
+                    return self.write_oneshot(bs).await;
+                } else {
+                    let location = self.initiate_upload().await?;
+                    self.location = Some(location);
+                    self.location.as_deref().unwrap()
                 }
-                _ => Err(parse_error(resp).await?),
             }
-        } else {
-            Ok(())
         };
 
-        self.last = Some(bs.slice(0..bs.len()));
-        return result;
+        // Ignore empty bytes
+        if bs.is_empty() {
+            return Ok(());
+        }
+
+        self.buffer.push(bs);
+        // Return directly if the buffer is not full
+        if self.buffer.len() <= self.buffer_size {
+            return Ok(());
+        }
+
+        let bs = self.buffer.peak_exact(self.buffer_size);
+
+        match self.write_part(location, bs).await {
+            Ok(_) => {
+                self.buffer.take(self.buffer_size);
+                self.written += self.buffer_size as u64;
+                Ok(())
+            }
+            Err(e) => {
+                // If the upload fails, we should pop the given bs to make sure
+                // write is re-enter safe.
+                self.buffer.pop();
+                Err(e)
+            }
+        }
     }
 
+    // TODO: we can cancel the upload by sending a DELETE request to the 
location
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
-        if self.is_last_part_written {
-            return Ok(());
-        }
-
         let location = if let Some(location) = &self.location {
             location
         } else {
             return Ok(());
         };
 
-        let bs = self
-            .last
-            .as_ref()
-            .expect("failed to get the previously uploaded part");
+        let bs = self.buffer.peak_exact(self.buffer.len());
 
         let resp = self
             .core
-            .gcs_complete_resumable_upload(location, self.written_bytes, 
bs.slice(0..bs.len()))
+            .gcs_complete_resumable_upload(location, self.written, bs)
             .await?;
 
         let status = resp.status();
-
         match status {
             StatusCode::OK => {
                 resp.into_body().consume().await?;
+
+                self.location = None;
+                self.buffer.clear();
                 Ok(())
             }
             _ => Err(parse_error(resp).await?),
diff --git a/core/src/services/ghac/backend.rs 
b/core/src/services/ghac/backend.rs
index 159b0022..c5afeb81 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -391,10 +391,10 @@ impl Accessor for GhacBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index b9e4eefd..8ff12c8b 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,15 +62,6 @@ impl oio::Write for GhacWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 6062d305..d045c063 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -46,20 +46,6 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
     /// File could be partial written, so we will seek to start to make sure
     /// we write the same content.
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.f
-            .seek(SeekFrom::Start(0))
-            .await
-            .map_err(parse_io_error)?;
-        self.f.write_all(&bs).await.map_err(parse_io_error)?;
-
-        Ok(())
-    }
-
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
         self.f
             .seek(SeekFrom::Start(self.pos))
             .await
@@ -90,17 +76,6 @@ impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
     /// File could be partial written, so we will seek to start to make sure
     /// we write the same content.
     fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.f.rewind().map_err(parse_io_error)?;
-        self.f.write_all(&bs).map_err(parse_io_error)?;
-
-        Ok(())
-    }
-
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    fn append(&mut self, bs: Bytes) -> Result<()> {
         self.f
             .seek(SeekFrom::Start(self.pos))
             .map_err(parse_io_error)?;
diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index c2e59f84..9e9377b8 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -110,10 +110,10 @@ impl Accessor for IpmfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 4394c751..4f240000 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -55,15 +55,6 @@ impl oio::Write for IpmfsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 3412999e..c8223d1a 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -315,9 +315,9 @@ impl Accessor for ObsBackend {
     }
 
     async fn create_dir(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
-        let mut req =
-            self.core
-                .obs_put_object_request(path, Some(0), None, None, 
AsyncBody::Empty)?;
+        let mut req = self
+            .core
+            .obs_put_object_request(path, Some(0), None, AsyncBody::Empty)?;
 
         self.core.sign(&mut req).await?;
 
@@ -352,10 +352,10 @@ impl Accessor for ObsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs
index c61f1f02..191ec95b 100644
--- a/core/src/services/obs/core.rs
+++ b/core/src/services/obs/core.rs
@@ -116,7 +116,6 @@ impl ObsCore {
         path: &str,
         size: Option<usize>,
         content_type: Option<&str>,
-        if_match: Option<&str>,
         body: AsyncBody,
     ) -> Result<Request<AsyncBody>> {
         let p = build_abs_path(&self.root, path);
@@ -125,10 +124,6 @@ impl ObsCore {
 
         let mut req = Request::put(&url);
 
-        if let Some(if_match) = if_match {
-            req = req.header(IF_MATCH, if_match);
-        }
-
         if let Some(size) = size {
             req = req.header(CONTENT_LENGTH, size)
         }
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index 080e0919..d71c7893 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -47,7 +47,6 @@ impl oio::Write for ObsWriter {
             &self.path,
             Some(bs.len()),
             self.op.content_type(),
-            self.op.if_match(),
             AsyncBody::Bytes(bs),
         )?;
 
@@ -66,15 +65,6 @@ impl oio::Write for ObsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index b3077c7b..c314586c 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -407,25 +407,9 @@ impl Accessor for OssBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let upload_id = if args.append() {
-            let resp = self.core.oss_initiate_upload(path, &args).await?;
-            match resp.status() {
-                StatusCode::OK => {
-                    let bs = resp.into_body().bytes().await?;
-                    let result: InitiateMultipartUploadResult =
-                        quick_xml::de::from_reader(bs.reader())
-                            .map_err(new_xml_deserialize_error)?;
-                    Some(result.upload_id)
-                }
-                _ => return Err(parse_error(resp).await?),
-            }
-        } else {
-            None
-        };
-
         Ok((
             RpWrite::default(),
-            OssWriter::new(self.core.clone(), args, path.to_string(), 
upload_id),
+            OssWriter::new(self.core.clone(), path, args),
         ))
     }
 
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 3c56226c..1f730cfa 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -18,7 +18,7 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
 use http::StatusCode;
 
 use super::core::*;
@@ -33,24 +33,33 @@ pub struct OssWriter {
     op: OpWrite,
     path: String,
     upload_id: Option<String>,
+
     parts: Vec<MultipartUploadPart>,
+    buffer: oio::VectorCursor,
+    buffer_size: usize,
 }
 
 impl OssWriter {
-    pub fn new(core: Arc<OssCore>, op: OpWrite, path: String, upload_id: 
Option<String>) -> Self {
+    pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
         OssWriter {
             core,
+            path: path.to_string(),
             op,
-            path,
-            upload_id,
+
+            upload_id: None,
             parts: vec![],
+            buffer: oio::VectorCursor::new(),
+            // The part size must be 5 MiB to 5 GiB. There is no minimum
+            // size limit on the last part of your multipart upload.
+            //
+            // We pick the default value as 8 MiB for better thoughput.
+            //
+            // TODO: allow this value to be configured.
+            buffer_size: 8 * 1024 * 1024,
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for OssWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
         let mut req = self.core.oss_put_object_request(
             &self.path,
             Some(bs.len()),
@@ -76,10 +85,20 @@ impl oio::Write for OssWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let upload_id = self.upload_id.as_ref().expect(
-            "Writer doesn't have upload id, but users trying to call append, 
must be buggy",
-        );
+    async fn initiate_upload(&self) -> Result<String> {
+        let resp = self.core.oss_initiate_upload(&self.path, &self.op).await?;
+        match resp.status() {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+                let result: InitiateMultipartUploadResult =
+                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+                Ok(result.upload_id)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(&self, upload_id: &str, bs: Bytes) -> 
Result<MultipartUploadPart> {
         // Aliyun OSS requires part number must between [1..=10000]
         let part_number = self.parts.len() + 1;
         let mut req = self
@@ -108,13 +127,59 @@ impl oio::Write for OssWriter {
                     })?
                     .to_string();
                 resp.into_body().consume().await?;
-                self.parts.push(MultipartUploadPart { part_number, etag });
-                Ok(())
+                Ok(MultipartUploadPart { part_number, etag })
             }
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::Write for OssWriter {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let upload_id = match &self.upload_id {
+            Some(upload_id) => upload_id,
+            None => {
+                if self.op.content_length().unwrap_or_default() == bs.len() as 
u64 {
+                    return self.write_oneshot(bs).await;
+                } else {
+                    let upload_id = self.initiate_upload().await?;
+                    self.upload_id = Some(upload_id);
+                    self.upload_id.as_deref().unwrap()
+                }
+            }
+        };
+
+        // Ignore empty bytes
+        if bs.is_empty() {
+            return Ok(());
+        }
+
+        self.buffer.push(bs);
+        // Return directly if the buffer is not full
+        if self.buffer.len() <= self.buffer_size {
+            return Ok(());
+        }
+
+        let bs = self.buffer.peak_at_least(self.buffer_size);
+        let size = bs.len();
+
+        match self.write_part(upload_id, bs).await {
+            Ok(part) => {
+                self.buffer.take(size);
+                self.parts.push(part);
+                Ok(())
+            }
+            Err(e) => {
+                // If the upload fails, we should pop the given bs to make sure
+                // write is re-enter safe.
+                self.buffer.pop();
+                Err(e)
+            }
+        }
+    }
+
+    // TODO: we can cancel the upload by sending an abort request.
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
@@ -129,6 +194,21 @@ impl oio::Write for OssWriter {
             return Ok(());
         };
 
+        // Make sure internal buffer has been flushed.
+        if !self.buffer.is_empty() {
+            let bs = self.buffer.peak_exact(self.buffer.len());
+
+            match self.write_part(upload_id, bs).await {
+                Ok(part) => {
+                    self.buffer.clear();
+                    self.parts.push(part);
+                }
+                Err(e) => {
+                    return Err(e);
+                }
+            }
+        }
+
         let resp = self
             .core
             .oss_complete_multipart_upload_request(&self.path, upload_id, 
false, &self.parts)
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index da2f1181..66940674 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -959,39 +959,9 @@ impl Accessor for S3Backend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let upload_id = if args.append() {
-            let resp = self
-                .core
-                .s3_initiate_multipart_upload(
-                    path,
-                    args.content_type(),
-                    args.content_disposition(),
-                    args.cache_control(),
-                    args.if_match(),
-                )
-                .await?;
-
-            let status = resp.status();
-
-            match status {
-                StatusCode::OK => {
-                    let bs = resp.into_body().bytes().await?;
-
-                    let result: InitiateMultipartUploadResult =
-                        quick_xml::de::from_reader(bs.reader())
-                            .map_err(new_xml_deserialize_error)?;
-
-                    Some(result.upload_id)
-                }
-                _ => return Err(parse_error(resp).await?),
-            }
-        } else {
-            None
-        };
-
         Ok((
             RpWrite::default(),
-            S3Writer::new(self.core.clone(), args, path.to_string(), 
upload_id),
+            S3Writer::new(self.core.clone(), path, args),
         ))
     }
 
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index c618b87d..4f1c36a2 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -489,7 +489,6 @@ impl S3Core {
         content_type: Option<&str>,
         content_disposition: Option<&str>,
         cache_control: Option<&str>,
-        if_match: Option<&str>,
     ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
 
@@ -509,10 +508,6 @@ impl S3Core {
             req = req.header(CACHE_CONTROL, cache_control)
         }
 
-        if let Some(if_match) = if_match {
-            req = req.header(IF_MATCH, if_match)
-        }
-
         // Set storage class header
         if let Some(v) = &self.default_storage_class {
             req = 
req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index d75906f9..9e80bad2 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -18,7 +18,7 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
 use http::StatusCode;
 
 use super::core::*;
@@ -32,32 +32,34 @@ pub struct S3Writer {
 
     op: OpWrite,
     path: String,
-
     upload_id: Option<String>,
+
     parts: Vec<CompleteMultipartUploadRequestPart>,
+    buffer: oio::VectorCursor,
+    buffer_size: usize,
 }
 
 impl S3Writer {
-    pub fn new(core: Arc<S3Core>, op: OpWrite, path: String, upload_id: 
Option<String>) -> Self {
+    pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
         S3Writer {
             core,
-
+            path: path.to_string(),
             op,
-            path,
-            upload_id,
+
+            upload_id: None,
             parts: vec![],
+            buffer: oio::VectorCursor::new(),
+            // The part size must be 5 MiB to 5 GiB. There is no minimum
+            // size limit on the last part of your multipart upload.
+            //
+            // We pick the default value as 8 MiB for better thoughput.
+            //
+            // TODO: allow this value to be configured.
+            buffer_size: 8 * 1024 * 1024,
         }
     }
-}
-
-#[async_trait]
-impl oio::Write for S3Writer {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        debug_assert!(
-            self.upload_id.is_none(),
-            "Writer initiated with upload id, but users trying to call write, 
must be buggy"
-        );
 
+    async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
         let mut req = self.core.s3_put_object_request(
             &self.path,
             Some(bs.len()),
@@ -82,10 +84,37 @@ impl oio::Write for S3Writer {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let upload_id = self.upload_id.as_ref().expect(
-            "Writer doesn't have upload id, but users trying to call append, 
must be buggy",
-        );
+    async fn initiate_upload(&self) -> Result<String> {
+        let resp = self
+            .core
+            .s3_initiate_multipart_upload(
+                &self.path,
+                self.op.content_type(),
+                self.op.content_disposition(),
+                self.op.cache_control(),
+            )
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let result: InitiateMultipartUploadResult =
+                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+
+                Ok(result.upload_id)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(
+        &self,
+        upload_id: &str,
+        bs: Bytes,
+    ) -> Result<CompleteMultipartUploadRequestPart> {
         // AWS S3 requires part number must between [1..=10000]
         let part_number = self.parts.len() + 1;
 
@@ -116,12 +145,55 @@ impl oio::Write for S3Writer {
 
                 resp.into_body().consume().await?;
 
-                self.parts
-                    .push(CompleteMultipartUploadRequestPart { part_number, 
etag });
+                Ok(CompleteMultipartUploadRequestPart { part_number, etag })
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Write for S3Writer {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let upload_id = match &self.upload_id {
+            Some(upload_id) => upload_id,
+            None => {
+                if self.op.content_length().unwrap_or_default() == bs.len() as 
u64 {
+                    return self.write_oneshot(bs).await;
+                } else {
+                    let upload_id = self.initiate_upload().await?;
+                    self.upload_id = Some(upload_id);
+                    self.upload_id.as_deref().unwrap()
+                }
+            }
+        };
+
+        // Ignore empty bytes
+        if bs.is_empty() {
+            return Ok(());
+        }
+
+        self.buffer.push(bs);
+        // Return directly if the buffer is not full
+        if self.buffer.len() <= self.buffer_size {
+            return Ok(());
+        }
+
+        let bs = self.buffer.peak_at_least(self.buffer_size);
+        let size = bs.len();
 
+        match self.write_part(upload_id, bs).await {
+            Ok(part) => {
+                self.buffer.take(size);
+                self.parts.push(part);
                 Ok(())
             }
-            _ => Err(parse_error(resp).await?),
+            Err(e) => {
+                // If the upload fails, we should pop the given bs to make sure
+                // write is re-enter safe.
+                self.buffer.pop();
+                Err(e)
+            }
         }
     }
 
@@ -153,6 +225,21 @@ impl oio::Write for S3Writer {
             return Ok(());
         };
 
+        // Make sure internal buffer has been flushed.
+        if !self.buffer.is_empty() {
+            let bs = self.buffer.peak_exact(self.buffer.len());
+
+            match self.write_part(upload_id, bs).await {
+                Ok(part) => {
+                    self.buffer.clear();
+                    self.parts.push(part);
+                }
+                Err(e) => {
+                    return Err(e);
+                }
+            }
+        }
+
         let resp = self
             .core
             .s3_complete_multipart_upload(&self.path, upload_id, &self.parts)
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
index 49ed6bc3..0cdbe3a5 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -951,7 +951,7 @@ impl Accessor for WasabiBackend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         Ok((
             RpWrite::default(),
-            WasabiWriter::new(self.core.clone(), args, path.to_string(), None),
+            WasabiWriter::new(self.core.clone(), args, path.to_string()),
         ))
     }
 
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 90d193df..f9b998b4 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -32,68 +32,50 @@ pub struct WasabiWriter {
 
     op: OpWrite,
     path: String,
-
-    upload_id: Option<String>,
 }
 
 impl WasabiWriter {
-    pub fn new(
-        core: Arc<WasabiCore>,
-        op: OpWrite,
-        path: String,
-        upload_id: Option<String>,
-    ) -> Self {
-        WasabiWriter {
-            core,
-
-            op,
-            path,
-            upload_id,
-        }
+    pub fn new(core: Arc<WasabiCore>, op: OpWrite, path: String) -> Self {
+        WasabiWriter { core, op, path }
     }
 }
 
 #[async_trait]
 impl oio::Write for WasabiWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        debug_assert!(
-            self.upload_id.is_none(),
-            "Writer initiated with upload id, but users trying to call write, 
must be buggy"
-        );
-
-        let resp = self
-            .core
-            .put_object(
-                &self.path,
-                Some(bs.len()),
-                self.op.content_type(),
-                self.op.content_disposition(),
-                self.op.cache_control(),
-                AsyncBody::Bytes(bs),
-            )
-            .await?;
+        if self.op.content_length().unwrap_or_default() == bs.len() as u64 {
+            let resp = self
+                .core
+                .put_object(
+                    &self.path,
+                    Some(bs.len()),
+                    self.op.content_type(),
+                    self.op.content_disposition(),
+                    self.op.cache_control(),
+                    AsyncBody::Bytes(bs),
+                )
+                .await?;
 
-        match resp.status() {
-            StatusCode::CREATED | StatusCode::OK => {
-                resp.into_body().consume().await?;
-                Ok(())
+            match resp.status() {
+                StatusCode::CREATED | StatusCode::OK => {
+                    resp.into_body().consume().await?;
+                    Ok(())
+                }
+                _ => Err(parse_error(resp).await?),
             }
-            _ => Err(parse_error(resp).await?),
-        }
-    }
-
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let resp = self
-            .core
-            .append_object(&self.path, Some(bs.len()), AsyncBody::Bytes(bs))
-            .await?;
+        } else {
+            let resp = self
+                .core
+                .append_object(&self.path, Some(bs.len()), 
AsyncBody::Bytes(bs))
+                .await?;
 
-        match resp.status() {
-            StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT => {
-                resp.into_body().consume().await?;
-                Ok(())
+            match resp.status() {
+                StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT 
=> {
+                    resp.into_body().consume().await?;
+                    Ok(())
+                }
+                _ => Err(parse_error(resp).await?),
             }
-            _ => Err(parse_error(resp).await?),
         }
     }
 
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index 23032855..2f872229 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -300,10 +300,10 @@ impl Accessor for WebdavBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 0fa180c9..89ddd7b8 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -63,15 +63,6 @@ impl oio::Write for WebdavWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 094bef24..3a8bb7c2 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -519,10 +519,10 @@ impl Accessor for WebhdfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index f3ab7633..16448141 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -63,15 +63,6 @@ impl oio::Write for WebhdfsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index c6bae2dd..6b7a48df 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -443,7 +443,12 @@ impl BlockingOperator {
     /// # }
     /// ```
     pub fn write(&self, path: &str, bs: impl Into<Bytes>) -> Result<()> {
-        self.write_with(path, OpWrite::new(), bs)
+        let bs = bs.into();
+        self.write_with(
+            path,
+            OpWrite::new().with_content_length(bs.len() as u64),
+            bs,
+        )
     }
 
     /// Copy a file from `from` to `to`.
@@ -594,8 +599,11 @@ impl BlockingOperator {
             );
         }
 
-        let (_, mut w) = self.inner().blocking_write(&path, args)?;
-        w.write(bs.into())?;
+        let bs = bs.into();
+        let (_, mut w) = self
+            .inner()
+            .blocking_write(&path, args.with_content_length(bs.len() as u64))?;
+        w.write(bs)?;
         w.close()?;
 
         Ok(())
@@ -618,8 +626,8 @@ impl BlockingOperator {
     ///
     /// # fn test(op: BlockingOperator) -> Result<()> {
     /// let mut w = op.writer("path/to/file")?;
-    /// w.append(vec![0; 4096])?;
-    /// w.append(vec![1; 4096])?;
+    /// w.write(vec![0; 4096])?;
+    /// w.write(vec![1; 4096])?;
     /// w.close()?;
     /// # Ok(())
     /// # }
@@ -636,8 +644,8 @@ impl BlockingOperator {
             );
         }
 
-        let op = OpWrite::default().with_append();
-        BlockingWriter::create_dir(self.inner().clone(), &path, op)
+        let op = OpWrite::default();
+        BlockingWriter::create(self.inner().clone(), &path, op)
     }
 
     /// Delete given path.
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index 8156dcc8..ad24a542 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -631,7 +631,13 @@ impl Operator {
     /// # }
     /// ```
     pub async fn write(&self, path: &str, bs: impl Into<Bytes>) -> Result<()> {
-        self.write_with(path, OpWrite::new(), bs).await
+        let bs = bs.into();
+        self.write_with(
+            path,
+            OpWrite::new().with_content_length(bs.len() as u64),
+            bs,
+        )
+        .await
     }
 
     /// Copy a file from `from` to `to`.
@@ -769,8 +775,8 @@ impl Operator {
     /// # #[tokio::main]
     /// # async fn test(op: Operator) -> Result<()> {
     /// let mut w = op.writer("path/to/file").await?;
-    /// w.append(vec![0; 4096]).await?;
-    /// w.append(vec![1; 4096]).await?;
+    /// w.write(vec![0; 4096]).await?;
+    /// w.write(vec![1; 4096]).await?;
     /// w.close().await?;
     /// # Ok(())
     /// # }
@@ -799,8 +805,8 @@ impl Operator {
     /// # async fn test(op: Operator) -> Result<()> {
     /// let args = 
OpWrite::new().with_content_type("application/octet-stream");
     /// let mut w = op.writer_with("path/to/file", args).await?;
-    /// w.append(vec![0; 4096]).await?;
-    /// w.append(vec![1; 4096]).await?;
+    /// w.write(vec![0; 4096]).await?;
+    /// w.write(vec![1; 4096]).await?;
     /// w.close().await?;
     /// # Ok(())
     /// # }
@@ -817,7 +823,7 @@ impl Operator {
             );
         }
 
-        Writer::create_dir(self.inner().clone(), &path, 
args.with_append()).await
+        Writer::create(self.inner().clone(), &path, args).await
     }
 
     /// Write data with extra options.
@@ -854,8 +860,12 @@ impl Operator {
             );
         }
 
-        let (_, mut w) = self.inner().write(&path, args).await?;
-        w.write(bs.into()).await?;
+        let bs = bs.into();
+        let (_, mut w) = self
+            .inner()
+            .write(&path, args.with_content_length(bs.len() as u64))
+            .await?;
+        w.write(bs).await?;
         w.close().await?;
 
         Ok(())
diff --git a/core/src/types/ops.rs b/core/src/types/ops.rs
index 616316a5..63180d5a 100644
--- a/core/src/types/ops.rs
+++ b/core/src/types/ops.rs
@@ -320,12 +320,10 @@ impl OpStat {
 /// Args for `write` operation.
 #[derive(Debug, Clone, Default)]
 pub struct OpWrite {
-    append: bool,
-
+    content_length: Option<u64>,
     content_type: Option<String>,
     content_disposition: Option<String>,
     cache_control: Option<String>,
-    if_match: Option<String>,
 }
 
 impl OpWrite {
@@ -336,13 +334,20 @@ impl OpWrite {
         Self::default()
     }
 
-    pub(crate) fn with_append(mut self) -> Self {
-        self.append = true;
-        self
+    /// Get the content length from op.
+    ///
+    /// The content length is the total length of the data to be written.
+    pub fn content_length(&self) -> Option<u64> {
+        self.content_length
     }
 
-    pub(crate) fn append(&self) -> bool {
-        self.append
+    /// Set the content length of op.
+    ///
+    /// If the content length is not set, the content length will be
+    /// calculated automatically by buffering part of data.
+    pub fn with_content_length(mut self, content_length: u64) -> Self {
+        self.content_length = Some(content_length);
+        self
     }
 
     /// Get the content type from option
@@ -377,17 +382,6 @@ impl OpWrite {
         self.cache_control = Some(cache_control.to_string());
         self
     }
-
-    /// Set the If-Match of the option
-    pub fn with_if_match(mut self, if_match: &str) -> Self {
-        self.if_match = Some(if_match.to_string());
-        self
-    }
-
-    /// Get If-Match from option
-    pub fn if_match(&self) -> Option<&str> {
-        self.if_match.as_deref()
-    }
 }
 
 /// Args for `copy` operation.
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index dc4d2bfd..321988b9 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -52,7 +52,7 @@ impl Writer {
     ///
     /// We don't want to expose those details to users so keep this function
     /// in crate only.
-    pub(crate) async fn create_dir(acc: FusedAccessor, path: &str, op: 
OpWrite) -> Result<Self> {
+    pub(crate) async fn create(acc: FusedAccessor, path: &str, op: OpWrite) -> 
Result<Self> {
         let (_, w) = acc.write(path, op).await?;
 
         Ok(Writer {
@@ -60,17 +60,13 @@ impl Writer {
         })
     }
 
-    /// Append data into writer.
-    ///
-    /// It is highly recommended to align the length of the input bytes
-    /// into blocks of 4MiB (except the last block) for better performance
-    /// and compatibility.
-    pub async fn append(&mut self, bs: impl Into<Bytes>) -> Result<()> {
+    /// Write into inner writer.
+    pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
         if let State::Idle(Some(w)) = &mut self.state {
-            w.append(bs.into()).await
+            w.write(bs.into()).await
         } else {
             unreachable!(
-                "writer state invalid while append, expect Idle, actual {}",
+                "writer state invalid while write, expect Idle, actual {}",
                 self.state
             );
         }
@@ -132,7 +128,7 @@ impl AsyncWrite for Writer {
                     let bs = Bytes::from(buf.to_vec());
                     let size = bs.len();
                     let fut = async move {
-                        w.append(bs).await?;
+                        w.write(bs).await?;
                         Ok((size, w))
                     };
                     self.state = State::Write(Box::pin(fut));
@@ -184,6 +180,72 @@ impl AsyncWrite for Writer {
     }
 }
 
+impl tokio::io::AsyncWrite for Writer {
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        loop {
+            match &mut self.state {
+                State::Idle(w) => {
+                    let mut w = w
+                        .take()
+                        .expect("invalid state of writer: Idle state with 
empty write");
+                    let bs = Bytes::from(buf.to_vec());
+                    let size = bs.len();
+                    let fut = async move {
+                        w.write(bs).await?;
+                        Ok((size, w))
+                    };
+                    self.state = State::Write(Box::pin(fut));
+                }
+                State::Write(fut) => match ready!(fut.poll_unpin(cx)) {
+                    Ok((size, w)) => {
+                        self.state = State::Idle(Some(w));
+                        return Poll::Ready(Ok(size));
+                    }
+                    Err(err) => return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+                },
+                State::Close(_) => {
+                    unreachable!("invalid state of writer: poll_write with 
State::Close")
+                }
+            };
+        }
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> 
Poll<io::Result<()>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<io::Result<()>> {
+        loop {
+            match &mut self.state {
+                State::Idle(w) => {
+                    let mut w = w
+                        .take()
+                        .expect("invalid state of writer: Idle state with 
empty write");
+                    let fut = async move {
+                        w.close().await?;
+                        Ok(w)
+                    };
+                    self.state = State::Close(Box::pin(fut));
+                }
+                State::Write(_) => {
+                    unreachable!("invalid state of writer: poll_close with 
State::Write")
+                }
+                State::Close(fut) => match ready!(fut.poll_unpin(cx)) {
+                    Ok(w) => {
+                        self.state = State::Idle(Some(w));
+                        return Poll::Ready(Ok(()));
+                    }
+                    Err(err) => return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+                },
+            }
+        }
+    }
+}
+
 /// BlockingWriter is designed to write data into given path in an blocking
 /// manner.
 pub struct BlockingWriter {
@@ -198,19 +260,15 @@ impl BlockingWriter {
     ///
     /// We don't want to expose those details to users so keep this function
     /// in crate only.
-    pub(crate) fn create_dir(acc: FusedAccessor, path: &str, op: OpWrite) -> 
Result<Self> {
+    pub(crate) fn create(acc: FusedAccessor, path: &str, op: OpWrite) -> 
Result<Self> {
         let (_, w) = acc.blocking_write(path, op)?;
 
         Ok(BlockingWriter { inner: w })
     }
 
-    /// Append data into writer.
-    ///
-    /// It is highly recommended to align the length of the input bytes
-    /// into blocks of 4MiB (except the last block) for better performance
-    /// and compatibility.
-    pub fn append(&mut self, bs: impl Into<Bytes>) -> Result<()> {
-        self.inner.append(bs.into())
+    /// Write into inner writer.
+    pub fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
+        self.inner.write(bs.into())
     }
 
     /// Close the writer and make sure all data have been stored.
@@ -222,7 +280,8 @@ impl BlockingWriter {
 impl io::Write for BlockingWriter {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
         let size = buf.len();
-        self.append(Bytes::from(buf.to_vec()))
+        self.inner
+            .write(Bytes::from(buf.to_vec()))
             .map(|_| size)
             .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
     }
diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs
index 44190997..b909d436 100644
--- a/core/tests/behavior/utils.rs
+++ b/core/tests/behavior/utils.rs
@@ -25,6 +25,7 @@ use log::debug;
 use opendal::layers::LoggingLayer;
 use opendal::layers::RetryLayer;
 use opendal::*;
+use rand::distributions::uniform::SampleRange;
 use rand::prelude::*;
 use sha2::Digest;
 use sha2::Sha256;
@@ -88,6 +89,16 @@ pub fn gen_bytes() -> (Vec<u8>, usize) {
     (content, size)
 }
 
+pub fn gen_bytes_with_range(range: impl SampleRange<usize>) -> (Vec<u8>, 
usize) {
+    let mut rng = thread_rng();
+
+    let size = rng.gen_range(range);
+    let mut content = vec![0; size];
+    rng.fill_bytes(&mut content);
+
+    (content, size)
+}
+
 pub fn gen_fixed_bytes(size: usize) -> Vec<u8> {
     let mut rng = thread_rng();
 
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 91c9ec08..10082b57 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -97,8 +97,9 @@ macro_rules! behavior_write_tests {
                 test_delete_with_special_chars,
                 test_delete_not_existing,
                 test_delete_stream,
-                test_append,
-                test_abort_writer,
+                test_writer_write,
+                test_writer_abort,
+                test_writer_futures_copy,
             );
         )*
     };
@@ -580,7 +581,7 @@ pub async fn test_read_with_special_chars(op: Operator) -> 
Result<()> {
 }
 
 // Delete existing file should succeed.
-pub async fn test_abort_writer(op: Operator) -> Result<()> {
+pub async fn test_writer_abort(op: Operator) -> Result<()> {
     let path = uuid::Uuid::new_v4().to_string();
     let (content, _) = gen_bytes();
 
@@ -592,7 +593,7 @@ pub async fn test_abort_writer(op: Operator) -> Result<()> {
         }
     };
 
-    if let Err(e) = writer.append(content).await {
+    if let Err(e) = writer.write(content).await {
         assert_eq!(e.kind(), ErrorKind::Unsupported);
         return Ok(());
     }
@@ -685,8 +686,8 @@ pub async fn test_delete_stream(op: Operator) -> Result<()> 
{
     Ok(())
 }
 
-// Append write
-pub async fn test_append(op: Operator) -> Result<()> {
+// Append data into writer
+pub async fn test_writer_write(op: Operator) -> Result<()> {
     let path = uuid::Uuid::new_v4().to_string();
     let size = 5 * 1024 * 1024; // write file with 5 MiB
     let content_a = gen_fixed_bytes(size);
@@ -700,8 +701,8 @@ pub async fn test_append(op: Operator) -> Result<()> {
         }
         Err(err) => return Err(err.into()),
     };
-    w.append(content_a.clone()).await?;
-    w.append(content_b.clone()).await?;
+    w.write(content_a.clone()).await?;
+    w.write(content_b.clone()).await?;
     w.close().await?;
 
     let meta = op.stat(&path).await.expect("stat must succeed");
@@ -723,3 +724,36 @@ pub async fn test_append(op: Operator) -> Result<()> {
     op.delete(&path).await.expect("delete must succeed");
     Ok(())
 }
+
+// copy data from reader to writer
+pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
+    let path = uuid::Uuid::new_v4().to_string();
+    let (content, size): (Vec<u8>, usize) =
+        gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
+
+    let mut w = match op.writer(&path).await {
+        Ok(w) => w,
+        Err(err) if err.kind() == ErrorKind::Unsupported => {
+            warn!("service doesn't support write with append");
+            return Ok(());
+        }
+        Err(err) => return Err(err.into()),
+    };
+
+    futures::io::copy(&mut content.as_slice(), &mut w).await?;
+    w.close().await?;
+
+    let meta = op.stat(&path).await.expect("stat must succeed");
+    assert_eq!(meta.content_length(), size as u64);
+
+    let bs = op.read(&path).await?;
+    assert_eq!(bs.len(), size, "read size");
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[..size])),
+        format!("{:x}", Sha256::digest(content)),
+        "read content"
+    );
+
+    op.delete(&path).await.expect("delete must succeed");
+    Ok(())
+}

Reply via email to