This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 1bb63ab67 refactor: Make oio::Write accept Buf instead (#3021)
1bb63ab67 is described below

commit 1bb63ab676293cb20cb90fbb177d58bd25340c8c
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 00:59:16 2023 +0800

    refactor: Make oio::Write accept Buf instead (#3021)
---
 core/benches/oio/utils.rs                        |   4 +-
 core/benches/oio/write.rs                        |   4 +-
 core/src/layers/blocking.rs                      |   2 +-
 core/src/layers/complete.rs                      |  33 ++--
 core/src/layers/concurrent_limit.rs              |   4 +-
 core/src/layers/error_context.rs                 |   4 +-
 core/src/layers/logging.rs                       |  11 +-
 core/src/layers/madsim.rs                        |   2 +-
 core/src/layers/metrics.rs                       |   8 +-
 core/src/layers/minitrace.rs                     |   4 +-
 core/src/layers/oteltrace.rs                     |   4 +-
 core/src/layers/prometheus.rs                    |   4 +-
 core/src/layers/retry.rs                         |   8 +-
 core/src/layers/throttle.rs                      |   8 +-
 core/src/layers/timeout.rs                       |   4 +-
 core/src/layers/tracing.rs                       |   4 +-
 core/src/raw/adapters/kv/backend.rs              |  24 ++-
 core/src/raw/adapters/typed_kv/backend.rs        |  52 +++---
 core/src/raw/oio/buf.rs                          | 220 +++++++++++++++++++++++
 core/src/raw/oio/mod.rs                          |   3 +
 core/src/raw/oio/write/api.rs                    |  14 +-
 core/src/raw/oio/write/append_object_write.rs    |  13 +-
 core/src/raw/oio/write/compose_write.rs          |   5 +-
 core/src/raw/oio/write/exact_buf_write.rs        |  54 +++---
 core/src/raw/oio/write/multipart_upload_write.rs |   9 +-
 core/src/raw/oio/write/one_shot_write.rs         |  12 +-
 core/src/services/azblob/writer.rs               |  11 +-
 core/src/services/azdfs/writer.rs                |  15 +-
 core/src/services/cos/writer.rs                  |  12 +-
 core/src/services/dropbox/writer.rs              |   9 +-
 core/src/services/fs/writer.rs                   |  37 +---
 core/src/services/ftp/writer.rs                  |   8 +-
 core/src/services/gcs/writer.rs                  |  12 +-
 core/src/services/gdrive/writer.rs               |  10 +-
 core/src/services/ghac/writer.rs                 |  14 +-
 core/src/services/hdfs/writer.rs                 |  37 +---
 core/src/services/ipmfs/writer.rs                |  10 +-
 core/src/services/obs/writer.rs                  |  12 +-
 core/src/services/onedrive/writer.rs             |  10 +-
 core/src/services/oss/writer.rs                  |  12 +-
 core/src/services/s3/writer.rs                   |  13 +-
 core/src/services/sftp/writer.rs                 |  10 +-
 core/src/services/supabase/writer.rs             |  12 +-
 core/src/services/vercel_artifacts/writer.rs     |   9 +-
 core/src/services/wasabi/writer.rs               |   9 +-
 core/src/services/webdav/writer.rs               |   8 +-
 core/src/services/webhdfs/writer.rs              |   9 +-
 core/src/types/operator/blocking_operator.rs     |   8 +-
 core/src/types/operator/operator.rs              |  10 +-
 core/src/types/writer.rs                         |  42 ++---
 50 files changed, 519 insertions(+), 334 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index bde28481e..1671057f0 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -26,8 +26,8 @@ pub struct BlackHoleWriter;
 
 #[async_trait]
 impl oio::Write for BlackHoleWriter {
-    async fn write(&mut self, bs: Bytes) -> opendal::Result<u64> {
-        Ok(bs.len() as u64)
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> 
opendal::Result<usize> {
+        Ok(bs.remaining())
     }
 
     async fn abort(&mut self) -> opendal::Result<()> {
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index a227d4901..8da2a6dd0 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -48,8 +48,8 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
 
                 let mut bs = content.clone();
                 while !bs.is_empty() {
-                    let n = w.write(bs.clone()).await.unwrap();
-                    bs.advance(n as usize);
+                    let n = w.write(&bs).await.unwrap();
+                    bs.advance(n);
                 }
                 w.close().await.unwrap();
             })
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 6e530023c..6beb8881b 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -196,7 +196,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for 
BlockingWrapper<I> {
 }
 
 impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.handle.block_on(self.inner.write(bs))
     }
 
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 83bf12d6a..048905bae 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -711,11 +711,15 @@ impl<W> oio::Write for CompleteWriter<W>
 where
     W: oio::Write,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let n = bs.len();
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let w = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
+        })?;
+        let n = w.write(bs).await?;
+        self.written += n as u64;
 
         if let Some(size) = self.size {
-            if self.written + n as u64 > size {
+            if self.written > size {
                 return Err(Error::new(
                     ErrorKind::ContentTruncated,
                     &format!(
@@ -726,11 +730,6 @@ where
             }
         }
 
-        let w = self.inner.as_mut().ok_or_else(|| {
-            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
-        })?;
-        let n = w.write(bs).await?;
-        self.written += n;
         Ok(n)
     }
 
@@ -773,11 +772,15 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
 where
     W: oio::BlockingWrite,
 {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let n = bs.len();
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let w = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
+        })?;
+        let n = w.write(bs)?;
+        self.written += n as u64;
 
         if let Some(size) = self.size {
-            if self.written + n as u64 > size {
+            if self.written > size {
                 return Err(Error::new(
                     ErrorKind::ContentTruncated,
                     &format!(
@@ -788,13 +791,7 @@ where
             }
         }
 
-        let w = self.inner.as_mut().ok_or_else(|| {
-            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
-        })?;
-
-        w.write(bs)?;
-        self.written += n as u64;
-        Ok(n as u64)
+        Ok(n)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 99fa4e413..bdc473384 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -285,7 +285,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ConcurrentLimitWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner.write(bs).await
     }
 
@@ -299,7 +299,7 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index e325cd0f7..15d7a0dd0 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
 
 #[async_trait::async_trait]
 impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner.write(bs).await.map_err(|err| {
             err.with_operation(WriteOperation::Write)
                 .with_context("service", self.scheme)
@@ -429,7 +429,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
 }
 
 impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner.write(bs).map_err(|err| {
             err.with_operation(WriteOperation::BlockingWrite)
                 .with_context("service", self.scheme)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 1f6d7456a..f0cea32e8 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1252,10 +1252,10 @@ impl<W> LoggingWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         match self.inner.write(bs).await {
             Ok(n) => {
-                self.written += n;
+                self.written += n as u64;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} written={} -> data write 
{}B",
@@ -1349,11 +1349,10 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
 }
 
 impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         match self.inner.write(bs) {
             Ok(n) => {
-                self.written += n;
+                self.written += n as u64;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} written={} -> data write 
{}B",
@@ -1361,7 +1360,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for 
LoggingWriter<W> {
                     WriteOperation::BlockingWrite,
                     self.path,
                     self.written,
-                    size
+                    n
                 );
                 Ok(n)
             }
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 432e4bcf4..b0b5cf179 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -302,7 +302,7 @@ pub struct MadsimWriter {
 
 #[async_trait]
 impl oio::Write for MadsimWriter {
-    async fn write(&mut self, bs: Bytes) -> crate::Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> crate::Result<usize> {
         #[cfg(madsim)]
         {
             let req = Request::Write(self.path.to_string(), bs);
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 36e4957aa..b16f87a40 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -847,12 +847,12 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner
             .write(bs)
             .await
             .map(|n| {
-                self.bytes += n;
+                self.bytes += n as u64;
                 n
             })
             .map_err(|err| {
@@ -877,11 +877,11 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner
             .write(bs)
             .map(|n| {
-                self.bytes += n;
+                self.bytes += n as u64;
                 n
             })
             .map_err(|err| {
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 149410ac8..f54f40593 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner
             .write(bs)
             .in_span(Span::enter_with_parent(
@@ -369,7 +369,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         let _g = self.span.set_local_parent();
         let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
         self.inner.write(bs)
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 0722a11a7..cceca3a69 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -313,7 +313,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
OtelTraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner.write(bs).await
     }
 
@@ -327,7 +327,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 0d8f99c22..2e294c9e5 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -662,7 +662,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner
             .write(bs)
             .await
@@ -695,7 +695,7 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner
             .write(bs)
             .map(|n| {
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 3e207df9a..3746008ab 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -872,11 +872,11 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> 
oio::BlockingRead for RetryWrapp
 
 #[async_trait]
 impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         let mut backoff = self.builder.build();
 
         loop {
-            match self.inner.write(bs.clone()).await {
+            match self.inner.write(bs).await {
                 Ok(v) => return Ok(v),
                 Err(e) if !e.is_temporary() => return Err(e),
                 Err(e) => match backoff.next() {
@@ -952,8 +952,8 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
 }
 
 impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for 
RetryWrapper<R, I> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        { || self.inner.write(bs.clone()) }
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        { || self.inner.write(bs) }
             .retry(&self.builder)
             .when(|e| e.is_temporary())
             .notify(|err, dur| {
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 021d1cad9..aea7b6381 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -216,8 +216,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ThrottleWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap();
 
         loop {
             match self.limiter.check_n(buf_length) {
@@ -251,8 +251,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap();
 
         loop {
             match self.limiter.check_n(buf_length) {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 68ac5a41d..c02a4d58a 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -322,8 +322,8 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let timeout = self.io_timeout(bs.len() as u64);
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let timeout = self.io_timeout(bs.remaining() as u64);
 
         tokio::time::timeout(timeout, self.inner.write(bs))
             .await
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index c5d006980..b829f8376 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -324,7 +324,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner.write(bs).await
     }
 
@@ -350,7 +350,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index c0823d59e..513bda67e 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use super::Adapter;
 use crate::raw::*;
@@ -390,11 +389,14 @@ impl<S> KvWriter<S> {
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
     // TODO: we need to support append in the future.
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-        self.buf = Some(bs.into());
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.chunk().len();
 
-        Ok(size as u64)
+        let mut buf = self.buf.take().unwrap_or_else(|| 
Vec::with_capacity(size));
+        buf.extend_from_slice(bs.chunk());
+        self.buf = Some(buf);
+
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -414,11 +416,15 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-        self.buf = Some(bs.into());
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.chunk().len();
+
+        let mut buf = self.buf.take().unwrap_or_else(|| 
Vec::with_capacity(size));
+        buf.extend_from_slice(bs.chunk());
+
+        self.buf = Some(buf);
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 948c2b5ad..1e0864de7 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -22,7 +22,6 @@ use bytes::Bytes;
 
 use super::Adapter;
 use super::Value;
-use crate::raw::oio::VectorCursor;
 use crate::raw::*;
 use crate::*;
 
@@ -363,7 +362,7 @@ pub struct KvWriter<S> {
     path: String,
 
     op: OpWrite,
-    buf: VectorCursor,
+    buf: Option<Vec<u8>>,
 }
 
 impl<S> KvWriter<S> {
@@ -372,11 +371,13 @@ impl<S> KvWriter<S> {
             kv,
             path,
             op,
-            buf: VectorCursor::new(),
+            buf: None,
         }
     }
 
-    fn build(&self) -> Value {
+    fn build(&mut self) -> Value {
+        let value = self.buf.take().map(Bytes::from).unwrap_or_default();
+
         let mut metadata = Metadata::new(EntryMode::FILE);
         if let Some(v) = self.op.cache_control() {
             metadata.set_cache_control(v);
@@ -390,49 +391,58 @@ impl<S> KvWriter<S> {
         if let Some(v) = self.op.content_length() {
             metadata.set_content_length(v);
         } else {
-            metadata.set_content_length(self.buf.len() as u64);
+            metadata.set_content_length(value.len() as u64);
         }
 
-        Value {
-            metadata,
-            value: self.buf.peak_all(),
-        }
+        Value { metadata, value }
     }
 }
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
     // TODO: we need to support append in the future.
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-        self.buf.push(bs);
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.chunk().len();
+
+        let mut buf = self.buf.take().unwrap_or_else(|| 
Vec::with_capacity(size));
+        buf.extend_from_slice(bs.chunk());
+
+        self.buf = Some(buf);
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
-        self.buf.clear();
-
+        self.buf = None;
         Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
-        self.kv.set(&self.path, self.build()).await?;
+        let kv = self.kv.clone();
+        let value = self.build();
+
+        kv.set(&self.path, value).await?;
         Ok(())
     }
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-        self.buf.push(bs);
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.chunk().len();
+
+        let mut buf = self.buf.take().unwrap_or_else(|| 
Vec::with_capacity(size));
+        buf.extend_from_slice(bs.chunk());
+
+        self.buf = Some(buf);
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     fn close(&mut self) -> Result<()> {
-        self.kv.blocking_set(&self.path, self.build())?;
+        let kv = self.kv.clone();
+        let value = self.build();
 
+        kv.blocking_set(&self.path, value)?;
         Ok(())
     }
 }
diff --git a/core/src/raw/oio/buf.rs b/core/src/raw/oio/buf.rs
new file mode 100644
index 000000000..36d67a2e3
--- /dev/null
+++ b/core/src/raw/oio/buf.rs
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{Bytes, BytesMut};
+use std::{cmp, ptr};
+
+/// WriteBuf is used in [`oio::Write`] to provide a trait similar to 
[`bytes::Buf`].
+///
+/// The biggest difference is that `Buf`'s `copy_to_slice` and `copy_to_bytes` 
only needs `&self`
+/// instead of `&mut self`.
+pub trait WriteBuf: Send + Sync {
+    /// Returns the number of bytes between the current position and the end 
of the buffer.
+    ///
+    /// This value is greater than or equal to the length of the slice 
returned by chunk().
+    ///
+    /// # Notes
+    ///
+    /// Implementations of remaining should ensure that the return value does 
not change unless a
+    /// call is made to advance or any other function that is documented to 
change the Buf's
+    /// current position.
+    fn remaining(&self) -> usize;
+
+    /// Returns a slice starting at the current position and of length between 
0 and
+    /// Buf::remaining(). Note that this can return shorter slice (this allows 
non-continuous
+    /// internal representation).
+    ///
+    /// # Notes
+    ///
+    /// This function should never panic. Once the end of the buffer is 
reached, i.e.,
+    /// Buf::remaining returns 0, calls to chunk() should return an empty 
slice.
+    fn chunk(&self) -> &[u8];
+
+    /// Advance the internal cursor of the Buf
+    ///
+    /// The next call to chunk() will return a slice starting cnt bytes 
further into the underlying buffer.
+    ///
+    /// Panics
+    /// This function may panic if cnt > self.remaining().
+    fn advance(&mut self, cnt: usize);
+
+    /// Copies current chunk into dst.
+    ///
+    /// Returns the number of bytes copied.
+    ///
+    /// # Notes
+    ///
+    /// Users should not assume the returned bytes is the same as the 
Buf::remaining().
+    fn copy_to_slice(&self, dst: &mut [u8]) -> usize {
+        let src = self.chunk();
+        let size = cmp::min(src.len(), dst.len());
+
+        // # Safety
+        //
+        // `src` and `dst` are guaranteed have enough space for `size` bytes.
+        unsafe {
+            ptr::copy_nonoverlapping(src.as_ptr(), dst.as_mut_ptr(), size);
+        }
+
+        size
+    }
+
+    /// Copies current chunk into a bytes.
+    ///
+    /// This function may be optimized by the underlying type to avoid actual 
copies.
+    /// For example, Bytes implementation will do a shallow copy (ref-count 
increment).
+    ///
+    /// # Notes
+    ///
+    /// Users should not assume the returned bytes is the same as the 
Buf::remaining().
+    fn copy_to_bytes(&self, len: usize) -> Bytes {
+        let src = self.chunk();
+        let size = cmp::min(src.len(), len);
+
+        let mut ret = BytesMut::with_capacity(size);
+        ret.extend_from_slice(&src[..size]);
+        ret.freeze()
+    }
+}
+
+macro_rules! deref_forward_buf {
+    () => {
+        fn remaining(&self) -> usize {
+            (**self).remaining()
+        }
+
+        fn chunk(&self) -> &[u8] {
+            (**self).chunk()
+        }
+
+        fn advance(&mut self, cnt: usize) {
+            (**self).advance(cnt)
+        }
+
+        fn copy_to_slice(&self, dst: &mut [u8]) -> usize {
+            (**self).copy_to_slice(dst)
+        }
+
+        fn copy_to_bytes(&self, len: usize) -> Bytes {
+            (**self).copy_to_bytes(len)
+        }
+    };
+}
+
+impl<T: WriteBuf + ?Sized> WriteBuf for &mut T {
+    deref_forward_buf!();
+}
+
+impl<T: WriteBuf + ?Sized> WriteBuf for Box<T> {
+    deref_forward_buf!();
+}
+
+impl WriteBuf for &[u8] {
+    #[inline]
+    fn remaining(&self) -> usize {
+        self.len()
+    }
+
+    #[inline]
+    fn chunk(&self) -> &[u8] {
+        self
+    }
+
+    #[inline]
+    fn advance(&mut self, cnt: usize) {
+        *self = &self[cnt..];
+    }
+}
+
+impl<T: AsRef<[u8]> + Send + Sync> WriteBuf for std::io::Cursor<T> {
+    fn remaining(&self) -> usize {
+        let len = self.get_ref().as_ref().len();
+        let pos = self.position();
+
+        if pos >= len as u64 {
+            return 0;
+        }
+
+        len - pos as usize
+    }
+
+    fn chunk(&self) -> &[u8] {
+        let len = self.get_ref().as_ref().len();
+        let pos = self.position();
+
+        if pos >= len as u64 {
+            return &[];
+        }
+
+        &self.get_ref().as_ref()[pos as usize..]
+    }
+
+    fn advance(&mut self, cnt: usize) {
+        let pos = (self.position() as usize)
+            .checked_add(cnt)
+            .expect("overflow");
+
+        assert!(pos <= self.get_ref().as_ref().len());
+        self.set_position(pos as u64);
+    }
+}
+
+impl WriteBuf for Bytes {
+    #[inline]
+    fn remaining(&self) -> usize {
+        self.len()
+    }
+
+    #[inline]
+    fn chunk(&self) -> &[u8] {
+        self
+    }
+
+    #[inline]
+    fn advance(&mut self, cnt: usize) {
+        bytes::Buf::advance(self, cnt)
+    }
+
+    #[inline]
+    fn copy_to_bytes(&self, len: usize) -> Bytes {
+        let size = cmp::min(self.len(), len);
+        self.slice(..size)
+    }
+}
+
+impl WriteBuf for BytesMut {
+    #[inline]
+    fn remaining(&self) -> usize {
+        self.len()
+    }
+
+    #[inline]
+    fn chunk(&self) -> &[u8] {
+        self
+    }
+
+    #[inline]
+    fn advance(&mut self, cnt: usize) {
+        bytes::Buf::advance(self, cnt)
+    }
+
+    #[inline]
+    fn copy_to_bytes(&self, len: usize) -> Bytes {
+        let size = cmp::min(self.len(), len);
+        Bytes::copy_from_slice(&self[..size])
+    }
+}
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index 1b24bec9c..8c353ca50 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -41,3 +41,6 @@ pub use cursor::VectorCursor;
 
 mod entry;
 pub use entry::Entry;
+
+mod buf;
+pub use buf::WriteBuf;
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index eaf3ba0de..622303ff4 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -19,8 +19,8 @@ use std::fmt::Display;
 use std::fmt::Formatter;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
+use crate::raw::*;
 use crate::*;
 
 /// WriteOperation is the name for APIs of Writer.
@@ -81,7 +81,7 @@ pub trait Write: Unpin + Send + Sync {
     ///
     /// It's possible that `n < bs.len()`, caller should pass the remaining 
bytes
     /// repeatedly until all bytes has been written.
-    async fn write(&mut self, bs: Bytes) -> Result<u64>;
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize>;
 
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
@@ -92,7 +92,7 @@ pub trait Write: Unpin + Send + Sync {
 
 #[async_trait]
 impl Write for () {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         let _ = bs;
 
         unimplemented!("write is required to be implemented for oio::Write")
@@ -118,7 +118,7 @@ impl Write for () {
 /// To make Writer work as expected, we must add this impl.
 #[async_trait]
 impl<T: Write + ?Sized> Write for Box<T> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         (**self).write(bs).await
     }
 
@@ -137,14 +137,14 @@ pub type BlockingWriter = Box<dyn BlockingWrite>;
 /// BlockingWrite is the trait that OpenDAL returns to callers.
 pub trait BlockingWrite: Send + Sync + 'static {
     /// Write whole content at once.
-    fn write(&mut self, bs: Bytes) -> Result<u64>;
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize>;
 
     /// Close the writer and make sure all data has been flushed.
     fn close(&mut self) -> Result<()>;
 }
 
 impl BlockingWrite for () {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         let _ = bs;
 
         unimplemented!("write is required to be implemented for 
oio::BlockingWrite")
@@ -162,7 +162,7 @@ impl BlockingWrite for () {
 ///
 /// To make BlockingWriter work as expected, we must add this impl.
 impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         (**self).write(bs)
     }
 
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index d6bb819df..2a5e30609 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use crate::raw::*;
 use crate::*;
@@ -78,15 +77,19 @@ impl<W> oio::Write for AppendObjectWriter<W>
 where
     W: AppendObjectWrite,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         let offset = self.offset().await?;
 
-        let size = bs.len() as u64;
+        let size = bs.remaining();
 
         self.inner
-            .append(offset, size, AsyncBody::Bytes(bs))
+            .append(
+                offset,
+                size as u64,
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
+            )
             .await
-            .map(|_| self.offset = Some(offset + size))?;
+            .map(|_| self.offset = Some(offset + size as u64))?;
 
         Ok(size)
     }
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index dac833fcf..e2c6638af 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -39,7 +39,6 @@
 //! type_alias_impl_trait has been stabilized.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use crate::raw::*;
 use crate::*;
@@ -56,7 +55,7 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
 
 #[async_trait]
 impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         match self {
             Self::One(one) => one.write(bs).await,
             Self::Two(two) => two.write(bs).await,
@@ -94,7 +93,7 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, 
THREE: oio::Write> {
 impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
     for ThreeWaysWriter<ONE, TWO, THREE>
 {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         match self {
             Self::One(one) => one.write(bs).await,
             Self::Two(two) => two.write(bs).await,
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 7ba788e3d..d4a868472 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -18,8 +18,9 @@
 use std::cmp::min;
 
 use async_trait::async_trait;
-use bytes::{Buf, Bytes, BytesMut};
+use bytes::{Bytes, BytesMut};
 
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -61,21 +62,20 @@ enum Buffer {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
-    async fn write(&mut self, mut bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn WriteBuf) -> Result<usize> {
         loop {
             match &mut self.buffer {
                 Buffer::Empty => {
-                    if bs.len() >= self.buffer_size {
-                        bs.truncate(self.buffer_size);
-                        self.buffer = Buffer::Consuming(bs);
-                        return Ok(self.buffer_size as u64);
+                    if bs.remaining() >= self.buffer_size {
+                        self.buffer = 
Buffer::Consuming(bs.copy_to_bytes(self.buffer_size));
+                        return Ok(self.buffer_size);
                     }
 
-                    let size = bs.len() as u64;
-                    let mut fill = BytesMut::with_capacity(bs.len());
-                    fill.extend_from_slice(&bs);
+                    let chunk = bs.chunk();
+                    let mut fill = BytesMut::with_capacity(chunk.len());
+                    fill.extend_from_slice(chunk);
                     self.buffer = Buffer::Filling(fill);
-                    return Ok(size);
+                    return Ok(chunk.len());
                 }
                 Buffer::Filling(fill) => {
                     if fill.len() >= self.buffer_size {
@@ -83,18 +83,17 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                         continue;
                     }
 
-                    let size = min(self.buffer_size - fill.len(), bs.len());
-                    fill.extend_from_slice(&bs[..size]);
-                    bs.advance(size);
-                    return Ok(size as u64);
+                    let size = min(self.buffer_size - fill.len(), 
bs.chunk().len());
+                    fill.extend_from_slice(&bs.chunk()[..size]);
+                    return Ok(size);
                 }
                 Buffer::Consuming(consume) => {
                     // Make sure filled buffer has been flushed.
                     //
                     // TODO: maybe we can re-fill it after a successful write.
                     while !consume.is_empty() {
-                        let n = self.inner.write(consume.clone()).await?;
-                        consume.advance(n as usize);
+                        let n = self.inner.write(consume).await?;
+                        consume.advance(n);
                     }
                     self.buffer = Buffer::Empty;
                 }
@@ -120,8 +119,8 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                     //
                     // TODO: maybe we can re-fill it after a successful write.
                     while !consume.is_empty() {
-                        let n = self.inner.write(consume.clone()).await?;
-                        consume.advance(n as usize);
+                        let n = self.inner.write(&consume).await?;
+                        consume.advance(n);
                     }
                     self.buffer = Buffer::Empty;
                     break;
@@ -152,11 +151,14 @@ mod tests {
 
     #[async_trait]
     impl Write for MockWriter {
-        async fn write(&mut self, bs: Bytes) -> Result<u64> {
-            debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len());
-
-            self.buf.extend_from_slice(&bs);
-            Ok(bs.len() as u64)
+        async fn write(&mut self, bs: &dyn WriteBuf) -> Result<usize> {
+            debug!(
+                "test_fuzz_exact_buf_writer: flush size: {}",
+                bs.chunk().len()
+            );
+
+            self.buf.extend_from_slice(bs.chunk());
+            Ok(bs.chunk().len())
         }
 
         async fn abort(&mut self) -> Result<()> {
@@ -184,8 +186,8 @@ mod tests {
 
         let mut bs = Bytes::from(expected.clone());
         while !bs.is_empty() {
-            let n = w.write(bs.clone()).await?;
-            bs.advance(n as usize);
+            let n = w.write(&bs).await?;
+            bs.advance(n);
         }
 
         w.close().await?;
@@ -223,7 +225,7 @@ mod tests {
 
             let mut bs = Bytes::from(content.clone());
             while !bs.is_empty() {
-                let n = writer.write(bs.clone()).await?;
+                let n = writer.write(&bs).await?;
                 bs.advance(n as usize);
             }
         }
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 13d1b4aa2..39660a2f7 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use crate::raw::*;
 use crate::*;
@@ -120,22 +119,22 @@ impl<W> oio::Write for MultipartUploadWriter<W>
 where
     W: MultipartUploadWrite,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         let upload_id = self.upload_id().await?;
 
-        let size = bs.len();
+        let size = bs.remaining();
 
         self.inner
             .write_part(
                 &upload_id,
                 self.parts.len(),
                 size as u64,
-                AsyncBody::Bytes(bs),
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
             )
             .await
             .map(|v| self.parts.push(v))?;
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index 3681242ce..a02c64445 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use crate::raw::*;
 use crate::*;
@@ -32,7 +31,7 @@ pub trait OneShotWrite: Send + Sync + Unpin {
     /// write_once write all data at once.
     ///
     /// Implementations should make sure that the data is written correctly at 
once.
-    async fn write_once(&self, size: u64, stream: oio::Streamer) -> Result<()>;
+    async fn write_once(&self, body: &dyn oio::WriteBuf) -> Result<()>;
 }
 
 /// OneShotWrite is used to implement [`Write`] based on one shot.
@@ -49,12 +48,9 @@ impl<W: OneShotWrite> OneShotWriter<W> {
 
 #[async_trait]
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let cursor = oio::Cursor::from(bs);
-
-        let size = cursor.len() as u64;
-        self.inner.write_once(size, Box::new(cursor)).await?;
-
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
+        self.inner.write_once(bs).await?;
         Ok(size)
     }
 
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index b088aa57a..9745b1387 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::AzblobCore;
@@ -161,11 +160,12 @@ impl AzblobWriter {
 
 #[async_trait]
 impl oio::Write for AzblobWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
 
         if self.op.append() {
-            self.append_oneshot(size, AsyncBody::Bytes(bs)).await?;
+            self.append_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+                .await?;
         } else {
             if self.op.content_length().is_none() {
                 return Err(Error::new(
@@ -174,7 +174,8 @@ impl oio::Write for AzblobWriter {
                 ));
             }
 
-            self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+            self.write_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+                .await?;
         }
 
         Ok(size)
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index e56f9eca9..f9bec7346 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::AzdfsCore;
@@ -41,9 +40,7 @@ impl AzdfsWriter {
 
 #[async_trait]
 impl oio::Write for AzdfsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
-
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
         let mut req = self.core.azdfs_create_request(
             &self.path,
             "file",
@@ -68,9 +65,13 @@ impl oio::Write for AzdfsWriter {
             }
         }
 
-        let mut req =
-            self.core
-                .azdfs_update_request(&self.path, Some(bs.len()), 
AsyncBody::Bytes(bs))?;
+        let size = bs.remaining();
+
+        let mut req = self.core.azdfs_update_request(
+            &self.path,
+            Some(size),
+            AsyncBody::Bytes(bs.copy_to_bytes(size)),
+        )?;
 
         self.core.sign(&mut req).await?;
 
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index af2a6ebd0..e0cc8be73 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -18,12 +18,10 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Buf;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -52,14 +50,15 @@ impl CosWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for CosWriter {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, buf: &dyn oio::WriteBuf) -> Result<()> {
+        let size = buf.remaining();
         let mut req = self.core.cos_put_object_request(
             &self.path,
-            Some(size),
+            Some(size as u64),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            AsyncBody::Stream(stream),
+            AsyncBody::Bytes(buf.copy_to_bytes(size)),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -98,7 +97,8 @@ impl oio::MultipartUploadWrite for CosWriter {
                 let bs = resp.into_body().bytes().await?;
 
                 let result: InitiateMultipartUploadResult =
-                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
+                        .map_err(new_xml_deserialize_error)?;
 
                 Ok(result.upload_id)
             }
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 00998c77d..c39e95eaf 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::DropboxCore;
@@ -40,8 +39,8 @@ impl DropboxWriter {
 
 #[async_trait]
 impl oio::Write for DropboxWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
 
         let resp = self
             .core
@@ -49,14 +48,14 @@ impl oio::Write for DropboxWriter {
                 &self.path,
                 Some(size),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
             )
             .await?;
         let status = resp.status();
         match status {
             StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(size as u64)
+                Ok(size)
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 4a9a0471d..13e03ae2c 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -15,14 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::io::Seek;
-use std::io::SeekFrom;
 use std::io::Write;
 use std::path::PathBuf;
 
 use async_trait::async_trait;
-use bytes::Bytes;
-use tokio::io::AsyncSeekExt;
 use tokio::io::AsyncWriteExt;
 
 use super::error::parse_io_error;
@@ -33,7 +29,6 @@ pub struct FsWriter<F> {
     target_path: PathBuf,
     tmp_path: Option<PathBuf>,
     f: F,
-    pos: u64,
 }
 
 impl<F> FsWriter<F> {
@@ -42,28 +37,14 @@ impl<F> FsWriter<F> {
             target_path,
             tmp_path,
             f,
-            pos: 0,
         }
     }
 }
 
 #[async_trait]
 impl oio::Write for FsWriter<tokio::fs::File> {
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
-
-        self.f
-            .seek(SeekFrom::Start(self.pos))
-            .await
-            .map_err(parse_io_error)?;
-        self.f.write_all(&bs).await.map_err(parse_io_error)?;
-        self.pos += size;
-
-        Ok(size)
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        self.f.write(bs.chunk()).await.map_err(parse_io_error)
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -87,18 +68,8 @@ impl oio::Write for FsWriter<tokio::fs::File> {
 }
 
 impl oio::BlockingWrite for FsWriter<std::fs::File> {
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        self.f
-            .seek(SeekFrom::Start(self.pos))
-            .map_err(parse_io_error)?;
-        self.f.write_all(&bs).map_err(parse_io_error)?;
-        self.pos += bs.len() as u64;
-
-        Ok(bs.len() as u64)
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        self.f.write(bs.chunk()).map_err(parse_io_error)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 3488255f9..2b0673d67 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use futures::AsyncWriteExt;
 
 use super::backend::FtpBackend;
@@ -41,8 +40,9 @@ impl FtpWriter {
 
 #[async_trait]
 impl oio::Write for FtpWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
+        let bs = bs.copy_to_bytes(size);
 
         let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
         let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
@@ -52,7 +52,7 @@ impl oio::Write for FtpWriter {
 
         ftp_stream.finalize_put_stream(data_stream).await?;
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 68e13b27f..d8dbe7597 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -118,16 +118,16 @@ impl GcsWriter {
 
 #[async_trait]
 impl oio::Write for GcsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
 
         let location = match &self.location {
             Some(location) => location,
             None => {
-                if self.op.content_length().unwrap_or_default() == bs.len() as 
u64
-                    && self.written == 0
+                if self.op.content_length().unwrap_or_default() == size as u64 
&& self.written == 0
                 {
-                    self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+                    self.write_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+                        .await?;
 
                     return Ok(size);
                 } else {
@@ -138,7 +138,7 @@ impl oio::Write for GcsWriter {
             }
         };
 
-        self.buffer.push(bs);
+        self.buffer.push(bs.copy_to_bytes(size));
         // Return directly if the buffer is not full
         if self.buffer.len() <= self.write_fixed_size {
             return Ok(size);
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index 8d69bf8e2..c77a8989d 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -95,12 +95,14 @@ impl GdriveWriter {
 
 #[async_trait]
 impl oio::Write for GdriveWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
         if self.file_id.is_none() {
-            self.write_create(size, bs).await?;
+            self.write_create(size as u64, bs.copy_to_bytes(size))
+                .await?;
         } else {
-            self.write_overwrite(size, bs).await?;
+            self.write_overwrite(size as u64, bs.copy_to_bytes(size))
+                .await?;
         }
 
         Ok(size)
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index a7db6acab..13b27ccf4 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
 use super::backend::GhacBackend;
 use super::error::parse_error;
@@ -42,18 +41,23 @@ impl GhacWriter {
 
 #[async_trait]
 impl oio::Write for GhacWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
+
         let req = self
             .backend
-            .ghac_upload(self.cache_id, size, AsyncBody::Bytes(bs))
+            .ghac_upload(
+                self.cache_id,
+                size as u64,
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
+            )
             .await?;
 
         let resp = self.backend.client.send(req).await?;
 
         if resp.status().is_success() {
             resp.into_body().consume().await?;
-            self.size += size;
+            self.size += size as u64;
             Ok(size)
         } else {
             Err(parse_error(resp)
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 90499e47c..47cdb53fe 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -18,7 +18,6 @@
 use std::io::Write;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use futures::AsyncWriteExt;
 
 use super::error::parse_io_error;
@@ -27,35 +26,18 @@ use crate::*;
 
 pub struct HdfsWriter<F> {
     f: F,
-    /// The position of current written bytes in the buffer.
-    ///
-    /// We will maintain the posstion in pos to make sure the buffer is 
written correctly.
-    pos: usize,
 }
 
 impl<F> HdfsWriter<F> {
     pub fn new(f: F) -> Self {
-        Self { f, pos: 0 }
+        Self { f }
     }
 }
 
 #[async_trait]
 impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-
-        while self.pos < bs.len() {
-            let n = self
-                .f
-                .write(&bs[self.pos..])
-                .await
-                .map_err(parse_io_error)?;
-            self.pos += n;
-        }
-        // Reset pos to 0 for next write.
-        self.pos = 0;
-
-        Ok(size as u64)
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        self.f.write(bs.chunk()).await.map_err(parse_io_error)
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -73,17 +55,8 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
 }
 
 impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-
-        while self.pos < bs.len() {
-            let n = self.f.write(&bs[self.pos..]).map_err(parse_io_error)?;
-            self.pos += n;
-        }
-        // Reset pos to 0 for next write.
-        self.pos = 0;
-
-        Ok(size as u64)
+    fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        self.f.write(bs.chunk()).map_err(parse_io_error)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index d3f98e77b..6b6ab6a4d 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::IpmfsBackend;
@@ -38,9 +37,12 @@ impl IpmfsWriter {
 
 #[async_trait]
 impl oio::Write for IpmfsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
-        let resp = self.backend.ipmfs_write(&self.path, bs).await?;
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
+        let resp = self
+            .backend
+            .ipmfs_write(&self.path, bs.copy_to_bytes(size))
+            .await?;
 
         let status = resp.status();
 
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index d3b1e119f..38dc660db 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -18,13 +18,11 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Buf;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
 use crate::raw::oio::MultipartUploadPart;
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -53,13 +51,14 @@ impl ObsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for ObsWriter {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let size = bs.remaining();
         let mut req = self.core.obs_put_object_request(
             &self.path,
-            Some(size),
+            Some(size as u64),
             self.op.content_type(),
             self.op.cache_control(),
-            AsyncBody::Stream(stream),
+            AsyncBody::Bytes(bs.copy_to_bytes(size)),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -93,7 +92,8 @@ impl oio::MultipartUploadWrite for ObsWriter {
                 let bs = resp.into_body().bytes().await?;
 
                 let result: InitiateMultipartUploadResult =
-                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
+                        .map_err(new_xml_deserialize_error)?;
 
                 Ok(result.upload_id)
             }
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index abb846871..d9eabb8eb 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Buf;
 use bytes::Bytes;
 use http::StatusCode;
 
@@ -46,8 +45,9 @@ impl OneDriveWriter {
 
 #[async_trait]
 impl oio::Write for OneDriveWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
+        let bs = bs.copy_to_bytes(size);
 
         if size <= Self::MAX_SIMPLE_SIZE {
             self.write_simple(bs).await?;
@@ -55,7 +55,7 @@ impl oio::Write for OneDriveWriter {
             self.write_chunked(bs).await?;
         }
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -167,7 +167,7 @@ impl OneDriveWriter {
             StatusCode::OK => {
                 let bs = resp.into_body().bytes().await?;
                 let result: OneDriveUploadSessionCreationResponseBody =
-                    
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
                 Ok(result)
             }
             _ => Err(parse_error(resp).await?),
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 27aa09011..8fd7ac656 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -18,12 +18,10 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Buf;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -52,14 +50,15 @@ impl OssWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for OssWriter {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let size = bs.remaining();
         let mut req = self.core.oss_put_object_request(
             &self.path,
-            Some(size),
+            Some(size as u64),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            AsyncBody::Stream(stream),
+            AsyncBody::Bytes(bs.copy_to_bytes(size)),
             false,
         )?;
 
@@ -100,7 +99,8 @@ impl oio::MultipartUploadWrite for OssWriter {
                 let bs = resp.into_body().bytes().await?;
 
                 let result: InitiateMultipartUploadResult =
-                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
+                        .map_err(new_xml_deserialize_error)?;
 
                 Ok(result.upload_id)
             }
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index a27341d71..7c5ddf924 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -18,12 +18,10 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Buf;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -49,14 +47,16 @@ impl S3Writer {
 
 #[async_trait]
 impl oio::OneShotWrite for S3Writer {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let size = bs.remaining();
+
         let mut req = self.core.s3_put_object_request(
             &self.path,
-            Some(size),
+            Some(size as u64),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            AsyncBody::Stream(stream),
+            AsyncBody::Bytes(bs.copy_to_bytes(size)),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -95,7 +95,8 @@ impl oio::MultipartUploadWrite for S3Writer {
                 let bs = resp.into_body().bytes().await?;
 
                 let result: InitiateMultipartUploadResult =
-                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
+                        .map_err(new_xml_deserialize_error)?;
 
                 Ok(result.upload_id)
             }
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 2df725ab2..90e605152 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -16,13 +16,10 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use openssh_sftp_client::file::File;
 
 use crate::raw::oio;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Result;
+use crate::*;
 
 pub struct SftpWriter {
     file: File,
@@ -36,9 +33,8 @@ impl SftpWriter {
 
 #[async_trait]
 impl oio::Write for SftpWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
-        self.file.write_all(&bs).await?;
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = self.file.write(bs.chunk()).await?;
 
         Ok(size)
     }
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index 5c57d7a95..95a2ee92f 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -67,14 +67,10 @@ impl SupabaseWriter {
 
 #[async_trait]
 impl oio::Write for SupabaseWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        if bs.is_empty() {
-            return Ok(9);
-        }
-
-        let size = bs.len();
-        self.upload(bs).await?;
-        Ok(size as u64)
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
+        self.upload(bs.copy_to_bytes(size)).await?;
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index efb223df6..36e62b734 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::VercelArtifactsBackend;
@@ -39,15 +38,15 @@ impl VercelArtifactsWriter {
 
 #[async_trait]
 impl oio::Write for VercelArtifactsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
 
         let resp = self
             .backend
             .vercel_artifacts_put(
                 self.path.as_str(),
                 self.op.content_length().unwrap(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
             )
             .await?;
 
@@ -56,7 +55,7 @@ impl oio::Write for VercelArtifactsWriter {
         match status {
             StatusCode::OK | StatusCode::ACCEPTED => {
                 resp.into_body().consume().await?;
-                Ok(size as u64)
+                Ok(size)
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 55d898ccc..d0509ebbc 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::*;
@@ -41,8 +40,8 @@ impl WasabiWriter {
 
 #[async_trait]
 impl oio::Write for WasabiWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
 
         let resp = self
             .core
@@ -52,14 +51,14 @@ impl oio::Write for WasabiWriter {
                 self.op.content_type(),
                 self.op.content_disposition(),
                 self.op.cache_control(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
             )
             .await?;
 
         match resp.status() {
             StatusCode::CREATED | StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(size as u64)
+                Ok(size)
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 30b9827f2..413fe891a 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::WebdavBackend;
@@ -62,10 +61,11 @@ impl WebdavWriter {
 
 #[async_trait]
 impl oio::Write for WebdavWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
 
-        self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+        self.write_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+            .await?;
 
         Ok(size)
     }
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 8dcbc9dc3..34e6b20fa 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::WebhdfsBackend;
@@ -39,8 +38,8 @@ impl WebhdfsWriter {
 
 #[async_trait]
 impl oio::Write for WebhdfsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+        let size = bs.remaining();
 
         let req = self
             .backend
@@ -48,7 +47,7 @@ impl oio::Write for WebhdfsWriter {
                 &self.path,
                 Some(size),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
             )
             .await?;
 
@@ -58,7 +57,7 @@ impl oio::Write for WebhdfsWriter {
         match status {
             StatusCode::CREATED | StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(size as u64)
+                Ok(size)
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index c1d34f87d..3468f8b6b 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -20,6 +20,7 @@ use std::io::Read;
 use bytes::Bytes;
 
 use super::operator_functions::*;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -550,7 +551,7 @@ impl BlockingOperator {
             self.inner().clone(),
             path,
             (OpWrite::default().with_content_length(bs.len() as u64), bs),
-            |inner, path, (args, bs)| {
+            |inner, path, (args, mut bs)| {
                 if !validate_path(&path, EntryMode::FILE) {
                     return Err(
                         Error::new(ErrorKind::IsADirectory, "write path is a 
directory")
@@ -561,7 +562,10 @@ impl BlockingOperator {
                 }
 
                 let (_, mut w) = inner.blocking_write(&path, args)?;
-                w.write(bs)?;
+                while bs.remaining() > 0 {
+                    let n = w.write(&bs)?;
+                    bs.advance(n);
+                }
                 w.close()?;
 
                 Ok(())
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index ab97a2df2..b51c2bc60 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -17,7 +17,7 @@
 
 use std::time::Duration;
 
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
 use futures::stream;
 use futures::AsyncReadExt;
 use futures::Stream;
@@ -716,7 +716,7 @@ impl Operator {
             self.inner().clone(),
             path,
             (OpWrite::default().with_content_length(bs.len() as u64), bs),
-            |inner, path, (args, bs)| {
+            |inner, path, (args, mut bs)| {
                 let fut = async move {
                     if !validate_path(&path, EntryMode::FILE) {
                         return Err(Error::new(
@@ -729,7 +729,11 @@ impl Operator {
                     }
 
                     let (_, mut w) = inner.write(&path, args).await?;
-                    w.write(bs).await?;
+                    while bs.remaining() > 0 {
+                        let n = w.write(&bs).await?;
+                        bs.advance(n);
+                    }
+
                     w.close().await?;
 
                     Ok(())
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 3ae7f6f4c..0474d6ea4 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -22,13 +22,14 @@ use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
-use bytes::{Buf, Bytes};
+use bytes::Bytes;
 use futures::future::BoxFuture;
 use futures::AsyncWrite;
 use futures::FutureExt;
 use futures::TryStreamExt;
 
 use crate::raw::oio::Write;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -91,10 +92,9 @@ impl Writer {
         };
 
         let mut bs = bs.into();
-
-        while !bs.is_empty() {
-            let n = w.write(bs.clone()).await?;
-            bs.advance(n as usize);
+        while bs.remaining() > 0 {
+            let n = w.write(&bs).await?;
+            bs.advance(n);
         }
 
         Ok(())
@@ -149,10 +149,10 @@ impl Writer {
         let mut written = 0;
         while let Some(bs) = sink_from.try_next().await? {
             let mut bs = bs.into();
-            while bs.has_remaining() {
-                let n = w.write(bs.clone()).await?;
-                bs.advance(n as usize);
-                written += n;
+            while bs.remaining() > 0 {
+                let n = w.write(&bs).await?;
+                bs.advance(n);
+                written += n as u64;
             }
         }
         Ok(written)
@@ -262,10 +262,11 @@ impl AsyncWrite for Writer {
                     let mut w = w
                         .take()
                         .expect("invalid state of writer: Idle state with 
empty write");
-                    let bs = Bytes::from(buf.to_vec());
+                    // FIXME: This will the buf everytime, we should avoid 
this.
+                    let bs = Bytes::copy_from_slice(buf);
                     let fut = async move {
-                        let n = w.write(bs).await?;
-                        Ok((n as usize, w))
+                        let n = w.write(&bs).await?;
+                        Ok((n, w))
                     };
                     self.state = State::Write(Box::pin(fut));
                 }
@@ -334,10 +335,11 @@ impl tokio::io::AsyncWrite for Writer {
                     let mut w = w
                         .take()
                         .expect("invalid state of writer: Idle state with 
empty write");
-                    let bs = Bytes::from(buf.to_vec());
+                    // FIXME: This will the buf everytime, we should avoid 
this.
+                    let bs = Bytes::copy_from_slice(buf);
                     let fut = async move {
-                        let n = w.write(bs).await?;
-                        Ok((n as usize, w))
+                        let n = w.write(&bs).await?;
+                        Ok((n, w))
                     };
                     self.state = State::Write(Box::pin(fut));
                 }
@@ -416,10 +418,9 @@ impl BlockingWriter {
     /// Write into inner writer.
     pub fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
         let mut bs = bs.into();
-
-        while !bs.is_empty() {
-            let n = self.inner.write(bs.clone())?;
-            bs.advance(n as usize);
+        while bs.remaining() > 0 {
+            let n = self.inner.write(&bs)?;
+            bs.advance(n);
         }
 
         Ok(())
@@ -434,8 +435,7 @@ impl BlockingWriter {
 impl io::Write for BlockingWriter {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
         self.inner
-            .write(Bytes::from(buf.to_vec()))
-            .map(|n| n as usize)
+            .write(&buf)
             .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
     }
 

Reply via email to