This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 527fc8d34a8cf37bb24019df9710ea72cbafe394
Author: Xuanwo <[email protected]>
AuthorDate: Thu Sep 7 16:12:28 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/blocking.rs                      |  2 +-
 core/src/layers/complete.rs                      | 33 +++++++--------
 core/src/layers/concurrent_limit.rs              |  4 +-
 core/src/layers/error_context.rs                 |  4 +-
 core/src/layers/logging.rs                       | 11 +++--
 core/src/layers/madsim.rs                        |  2 +-
 core/src/layers/metrics.rs                       |  8 ++--
 core/src/layers/minitrace.rs                     |  4 +-
 core/src/layers/oteltrace.rs                     |  4 +-
 core/src/layers/prometheus.rs                    |  4 +-
 core/src/layers/retry.rs                         |  4 +-
 core/src/layers/throttle.rs                      |  8 ++--
 core/src/layers/timeout.rs                       |  4 +-
 core/src/layers/tracing.rs                       |  4 +-
 core/src/raw/adapters/kv/backend.rs              | 24 +++++++----
 core/src/raw/adapters/typed_kv/backend.rs        | 40 ++++++++++--------
 core/src/raw/oio/mod.rs                          |  3 --
 core/src/raw/oio/write/api.rs                    | 13 +++---
 core/src/raw/oio/write/append_object_write.rs    | 12 ++++--
 core/src/raw/oio/write/compose_write.rs          |  4 +-
 core/src/raw/oio/write/exact_buf_write.rs        | 53 ++++++++++++------------
 core/src/raw/oio/write/multipart_upload_write.rs |  8 ++--
 core/src/raw/oio/write/one_shot_write.rs         | 11 ++---
 core/src/services/azblob/writer.rs               | 10 +++--
 core/src/services/azdfs/writer.rs                | 14 ++++---
 core/src/services/cos/writer.rs                  | 12 +++---
 core/src/services/dropbox/writer.rs              |  8 ++--
 core/src/services/fs/writer.rs                   | 31 ++------------
 core/src/services/ftp/writer.rs                  |  7 ++--
 core/src/services/gcs/writer.rs                  | 12 +++---
 core/src/services/gdrive/writer.rs               | 10 +++--
 core/src/services/ghac/writer.rs                 | 13 ++++--
 core/src/services/hdfs/writer.rs                 | 30 ++------------
 core/src/services/ipmfs/writer.rs                |  9 ++--
 core/src/services/obs/writer.rs                  | 12 +++---
 core/src/services/onedrive/writer.rs             | 10 ++---
 core/src/services/oss/writer.rs                  | 12 +++---
 core/src/services/s3/writer.rs                   | 13 +++---
 core/src/services/sftp/writer.rs                 |  9 ++--
 core/src/services/supabase/writer.rs             | 12 ++----
 core/src/services/vercel_artifacts/writer.rs     |  8 ++--
 core/src/services/wasabi/writer.rs               |  8 ++--
 core/src/services/webdav/writer.rs               |  7 ++--
 core/src/services/webhdfs/writer.rs              |  8 ++--
 core/src/{raw/oio => types}/buf.rs               | 25 ++++++-----
 core/src/types/mod.rs                            |  3 ++
 core/src/types/operator/blocking_operator.rs     |  7 +++-
 core/src/types/operator/operator.rs              | 10 +++--
 core/src/types/reader.rs                         |  4 +-
 core/src/types/writer.rs                         | 41 ++++++++----------
 50 files changed, 294 insertions(+), 315 deletions(-)

diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 6e530023c..504111199 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -196,7 +196,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for 
BlockingWrapper<I> {
 }
 
 impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.handle.block_on(self.inner.write(bs))
     }
 
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 83bf12d6a..f1e76d5a5 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -711,11 +711,15 @@ impl<W> oio::Write for CompleteWriter<W>
 where
     W: oio::Write,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let n = bs.len();
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let w = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
+        })?;
+        let n = w.write(bs).await?;
+        self.written += n as u64;
 
         if let Some(size) = self.size {
-            if self.written + n as u64 > size {
+            if self.written > size {
                 return Err(Error::new(
                     ErrorKind::ContentTruncated,
                     &format!(
@@ -726,11 +730,6 @@ where
             }
         }
 
-        let w = self.inner.as_mut().ok_or_else(|| {
-            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
-        })?;
-        let n = w.write(bs).await?;
-        self.written += n;
         Ok(n)
     }
 
@@ -773,11 +772,15 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
 where
     W: oio::BlockingWrite,
 {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let n = bs.len();
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let w = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
+        })?;
+        let n = w.write(bs)?;
+        self.written += n as u64;
 
         if let Some(size) = self.size {
-            if self.written + n as u64 > size {
+            if self.written > size {
                 return Err(Error::new(
                     ErrorKind::ContentTruncated,
                     &format!(
@@ -788,13 +791,7 @@ where
             }
         }
 
-        let w = self.inner.as_mut().ok_or_else(|| {
-            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
-        })?;
-
-        w.write(bs)?;
-        self.written += n as u64;
-        Ok(n as u64)
+        Ok(n)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 99fa4e413..ec965a95a 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -285,7 +285,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ConcurrentLimitWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner.write(bs).await
     }
 
@@ -299,7 +299,7 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index e325cd0f7..1e01f68d3 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
 
 #[async_trait::async_trait]
 impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner.write(bs).await.map_err(|err| {
             err.with_operation(WriteOperation::Write)
                 .with_context("service", self.scheme)
@@ -429,7 +429,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
 }
 
 impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner.write(bs).map_err(|err| {
             err.with_operation(WriteOperation::BlockingWrite)
                 .with_context("service", self.scheme)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 1f6d7456a..c69372123 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1252,10 +1252,10 @@ impl<W> LoggingWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         match self.inner.write(bs).await {
             Ok(n) => {
-                self.written += n;
+                self.written += n as u64;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} written={} -> data write 
{}B",
@@ -1349,11 +1349,10 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
 }
 
 impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         match self.inner.write(bs) {
             Ok(n) => {
-                self.written += n;
+                self.written += n as u64;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} written={} -> data write 
{}B",
@@ -1361,7 +1360,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for 
LoggingWriter<W> {
                     WriteOperation::BlockingWrite,
                     self.path,
                     self.written,
-                    size
+                    n
                 );
                 Ok(n)
             }
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 432e4bcf4..e6047bb42 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -302,7 +302,7 @@ pub struct MadsimWriter {
 
 #[async_trait]
 impl oio::Write for MadsimWriter {
-    async fn write(&mut self, bs: Bytes) -> crate::Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> crate::Result<usize> {
         #[cfg(madsim)]
         {
             let req = Request::Write(self.path.to_string(), bs);
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 36e4957aa..dcba90bc2 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -847,12 +847,12 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner
             .write(bs)
             .await
             .map(|n| {
-                self.bytes += n;
+                self.bytes += n as u64;
                 n
             })
             .map_err(|err| {
@@ -877,11 +877,11 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner
             .write(bs)
             .map(|n| {
-                self.bytes += n;
+                self.bytes += n as u64;
                 n
             })
             .map_err(|err| {
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 149410ac8..9d1f397be 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner
             .write(bs)
             .in_span(Span::enter_with_parent(
@@ -369,7 +369,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         let _g = self.span.set_local_parent();
         let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
         self.inner.write(bs)
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 0722a11a7..a0e5a1fe1 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -313,7 +313,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
OtelTraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner.write(bs).await
     }
 
@@ -327,7 +327,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 0d8f99c22..2c7d7d7cf 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -662,7 +662,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner
             .write(bs)
             .await
@@ -695,7 +695,7 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner
             .write(bs)
             .map(|n| {
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 3e207df9a..04495df8d 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -872,7 +872,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> 
oio::BlockingRead for RetryWrapp
 
 #[async_trait]
 impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         let mut backoff = self.builder.build();
 
         loop {
@@ -952,7 +952,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
 }
 
 impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for 
RetryWrapper<R, I> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         { || self.inner.write(bs.clone()) }
             .retry(&self.builder)
             .when(|e| e.is_temporary())
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 021d1cad9..96bd9d00b 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -216,8 +216,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ThrottleWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap();
 
         loop {
             match self.limiter.check_n(buf_length) {
@@ -251,8 +251,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap();
 
         loop {
             match self.limiter.check_n(buf_length) {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 68ac5a41d..a6179010a 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -322,8 +322,8 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let timeout = self.io_timeout(bs.len() as u64);
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let timeout = self.io_timeout(bs.remaining() as u64);
 
         tokio::time::timeout(timeout, self.inner.write(bs))
             .await
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index c5d006980..a512f74ba 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -324,7 +324,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner.write(bs).await
     }
 
@@ -350,7 +350,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index c0823d59e..9b8740f10 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -390,11 +390,15 @@ impl<S> KvWriter<S> {
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
     // TODO: we need to support append in the future.
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-        self.buf = Some(bs.into());
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.chunk().len();
 
-        Ok(size as u64)
+        let mut buf = self.buf.unwrap_or_else(|| Vec::with_capacity(size));
+        buf.extend_from_slice(bs.chunk());
+
+        self.buf = Some(buf);
+
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -414,11 +418,15 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-        self.buf = Some(bs.into());
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.chunk().len();
+
+        let mut buf = self.buf.unwrap_or_else(|| Vec::with_capacity(size));
+        buf.extend_from_slice(bs.chunk());
+
+        self.buf = Some(buf);
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 948c2b5ad..336b04a50 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -363,7 +363,7 @@ pub struct KvWriter<S> {
     path: String,
 
     op: OpWrite,
-    buf: VectorCursor,
+    buf: Option<Vec<u8>>,
 }
 
 impl<S> KvWriter<S> {
@@ -372,11 +372,13 @@ impl<S> KvWriter<S> {
             kv,
             path,
             op,
-            buf: VectorCursor::new(),
+            buf: None,
         }
     }
 
     fn build(&self) -> Value {
+        let value = self.buf.map(Bytes::from).unwrap_or_default();
+
         let mut metadata = Metadata::new(EntryMode::FILE);
         if let Some(v) = self.op.cache_control() {
             metadata.set_cache_control(v);
@@ -390,29 +392,29 @@ impl<S> KvWriter<S> {
         if let Some(v) = self.op.content_length() {
             metadata.set_content_length(v);
         } else {
-            metadata.set_content_length(self.buf.len() as u64);
+            metadata.set_content_length(value.len() as u64);
         }
 
-        Value {
-            metadata,
-            value: self.buf.peak_all(),
-        }
+        Value { metadata, value }
     }
 }
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
     // TODO: we need to support append in the future.
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-        self.buf.push(bs);
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.chunk().len();
+
+        let mut buf = self.buf.unwrap_or_else(|| Vec::with_capacity(size));
+        buf.extend_from_slice(bs.chunk());
+
+        self.buf = Some(buf);
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
-        self.buf.clear();
-
+        self.buf = None;
         Ok(())
     }
 
@@ -423,11 +425,15 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-        self.buf.push(bs);
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.chunk().len();
+
+        let mut buf = self.buf.unwrap_or_else(|| Vec::with_capacity(size));
+        buf.extend_from_slice(bs.chunk());
+
+        self.buf = Some(buf);
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index b08a0d222..1b24bec9c 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -41,6 +41,3 @@ pub use cursor::VectorCursor;
 
 mod entry;
 pub use entry::Entry;
-
-mod buf;
-pub use buf::Buf;
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index eaf3ba0de..606f7e9bc 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -21,6 +21,7 @@ use std::fmt::Formatter;
 use async_trait::async_trait;
 use bytes::Bytes;
 
+use crate::raw::oio;
 use crate::*;
 
 /// WriteOperation is the name for APIs of Writer.
@@ -81,7 +82,7 @@ pub trait Write: Unpin + Send + Sync {
     ///
     /// It's possible that `n < bs.len()`, caller should pass the remaining 
bytes
     /// repeatedly until all bytes has been written.
-    async fn write(&mut self, bs: Bytes) -> Result<u64>;
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize>;
 
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
@@ -92,7 +93,7 @@ pub trait Write: Unpin + Send + Sync {
 
 #[async_trait]
 impl Write for () {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         let _ = bs;
 
         unimplemented!("write is required to be implemented for oio::Write")
@@ -118,7 +119,7 @@ impl Write for () {
 /// To make Writer work as expected, we must add this impl.
 #[async_trait]
 impl<T: Write + ?Sized> Write for Box<T> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         (**self).write(bs).await
     }
 
@@ -137,14 +138,14 @@ pub type BlockingWriter = Box<dyn BlockingWrite>;
 /// BlockingWrite is the trait that OpenDAL returns to callers.
 pub trait BlockingWrite: Send + Sync + 'static {
     /// Write whole content at once.
-    fn write(&mut self, bs: Bytes) -> Result<u64>;
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize>;
 
     /// Close the writer and make sure all data has been flushed.
     fn close(&mut self) -> Result<()>;
 }
 
 impl BlockingWrite for () {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         let _ = bs;
 
         unimplemented!("write is required to be implemented for 
oio::BlockingWrite")
@@ -162,7 +163,7 @@ impl BlockingWrite for () {
 ///
 /// To make BlockingWriter work as expected, we must add this impl.
 impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         (**self).write(bs)
     }
 
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index d6bb819df..7f9e8c043 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -78,15 +78,19 @@ impl<W> oio::Write for AppendObjectWriter<W>
 where
     W: AppendObjectWrite,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         let offset = self.offset().await?;
 
-        let size = bs.len() as u64;
+        let size = bs.remaining();
 
         self.inner
-            .append(offset, size, AsyncBody::Bytes(bs))
+            .append(
+                offset,
+                size as u64,
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
+            )
             .await
-            .map(|_| self.offset = Some(offset + size))?;
+            .map(|_| self.offset = Some(offset + size as u64))?;
 
         Ok(size)
     }
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index dac833fcf..44c2289bf 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -56,7 +56,7 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
 
 #[async_trait]
 impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         match self {
             Self::One(one) => one.write(bs).await,
             Self::Two(two) => two.write(bs).await,
@@ -94,7 +94,7 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, 
THREE: oio::Write> {
 impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
     for ThreeWaysWriter<ONE, TWO, THREE>
 {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         match self {
             Self::One(one) => one.write(bs).await,
             Self::Two(two) => two.write(bs).await,
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 7ba788e3d..4b3fe6eee 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -18,7 +18,7 @@
 use std::cmp::min;
 
 use async_trait::async_trait;
-use bytes::{Buf, Bytes, BytesMut};
+use bytes::{Bytes, BytesMut};
 
 use crate::raw::*;
 use crate::*;
@@ -61,21 +61,20 @@ enum Buffer {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
-    async fn write(&mut self, mut bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         loop {
             match &mut self.buffer {
                 Buffer::Empty => {
-                    if bs.len() >= self.buffer_size {
-                        bs.truncate(self.buffer_size);
-                        self.buffer = Buffer::Consuming(bs);
-                        return Ok(self.buffer_size as u64);
+                    if bs.remaining() >= self.buffer_size {
+                        self.buffer = 
Buffer::Consuming(bs.copy_to_bytes(self.buffer_size));
+                        return Ok(self.buffer_size);
                     }
 
-                    let size = bs.len() as u64;
-                    let mut fill = BytesMut::with_capacity(bs.len());
-                    fill.extend_from_slice(&bs);
+                    let chunk = bs.chunk();
+                    let mut fill = BytesMut::with_capacity(chunk.len());
+                    fill.extend_from_slice(chunk);
                     self.buffer = Buffer::Filling(fill);
-                    return Ok(size);
+                    return Ok(chunk.len());
                 }
                 Buffer::Filling(fill) => {
                     if fill.len() >= self.buffer_size {
@@ -83,18 +82,17 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                         continue;
                     }
 
-                    let size = min(self.buffer_size - fill.len(), bs.len());
-                    fill.extend_from_slice(&bs[..size]);
-                    bs.advance(size);
-                    return Ok(size as u64);
+                    let size = min(self.buffer_size - fill.len(), 
bs.chunk().len());
+                    fill.extend_from_slice(&bs.chunk()[..size]);
+                    return Ok(size);
                 }
                 Buffer::Consuming(consume) => {
                     // Make sure filled buffer has been flushed.
                     //
                     // TODO: maybe we can re-fill it after a successful write.
                     while !consume.is_empty() {
-                        let n = self.inner.write(consume.clone()).await?;
-                        consume.advance(n as usize);
+                        let n = self.inner.write(consume).await?;
+                        consume.advance(n);
                     }
                     self.buffer = Buffer::Empty;
                 }
@@ -120,8 +118,8 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                     //
                     // TODO: maybe we can re-fill it after a successful write.
                     while !consume.is_empty() {
-                        let n = self.inner.write(consume.clone()).await?;
-                        consume.advance(n as usize);
+                        let n = self.inner.write(&consume).await?;
+                        consume.advance(n);
                     }
                     self.buffer = Buffer::Empty;
                     break;
@@ -152,11 +150,14 @@ mod tests {
 
     #[async_trait]
     impl Write for MockWriter {
-        async fn write(&mut self, bs: Bytes) -> Result<u64> {
-            debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len());
-
-            self.buf.extend_from_slice(&bs);
-            Ok(bs.len() as u64)
+        async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+            debug!(
+                "test_fuzz_exact_buf_writer: flush size: {}",
+                bs.chunk().len()
+            );
+
+            self.buf.extend_from_slice(bs.chunk());
+            Ok(bs.chunk().len())
         }
 
         async fn abort(&mut self) -> Result<()> {
@@ -184,8 +185,8 @@ mod tests {
 
         let mut bs = Bytes::from(expected.clone());
         while !bs.is_empty() {
-            let n = w.write(bs.clone()).await?;
-            bs.advance(n as usize);
+            let n = w.write(&bs).await?;
+            bs.advance(n);
         }
 
         w.close().await?;
@@ -223,7 +224,7 @@ mod tests {
 
             let mut bs = Bytes::from(content.clone());
             while !bs.is_empty() {
-                let n = writer.write(bs.clone()).await?;
+                let n = writer.write(&bs).await?;
                 bs.advance(n as usize);
             }
         }
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 13d1b4aa2..e5a9aedba 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -120,22 +120,22 @@ impl<W> oio::Write for MultipartUploadWriter<W>
 where
     W: MultipartUploadWrite,
 {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         let upload_id = self.upload_id().await?;
 
-        let size = bs.len();
+        let size = bs.remaining();
 
         self.inner
             .write_part(
                 &upload_id,
                 self.parts.len(),
                 size as u64,
-                AsyncBody::Bytes(bs),
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
             )
             .await
             .map(|v| self.parts.push(v))?;
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index 3681242ce..5845784d1 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -32,7 +32,7 @@ pub trait OneShotWrite: Send + Sync + Unpin {
     /// write_once write all data at once.
     ///
     /// Implementations should make sure that the data is written correctly at 
once.
-    async fn write_once(&self, size: u64, stream: oio::Streamer) -> Result<()>;
+    async fn write_once(&self, body: &dyn Buf) -> Result<()>;
 }
 
 /// OneShotWrite is used to implement [`Write`] based on one shot.
@@ -49,12 +49,9 @@ impl<W: OneShotWrite> OneShotWriter<W> {
 
 #[async_trait]
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let cursor = oio::Cursor::from(bs);
-
-        let size = cursor.len() as u64;
-        self.inner.write_once(size, Box::new(cursor)).await?;
-
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
+        self.inner.write_once(bs).await?;
         Ok(size)
     }
 
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index b088aa57a..423449014 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -161,11 +161,12 @@ impl AzblobWriter {
 
 #[async_trait]
 impl oio::Write for AzblobWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
 
         if self.op.append() {
-            self.append_oneshot(size, AsyncBody::Bytes(bs)).await?;
+            self.append_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+                .await?;
         } else {
             if self.op.content_length().is_none() {
                 return Err(Error::new(
@@ -174,7 +175,8 @@ impl oio::Write for AzblobWriter {
                 ));
             }
 
-            self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+            self.write_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+                .await?;
         }
 
         Ok(size)
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index e56f9eca9..7d1e22c65 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -41,9 +41,7 @@ impl AzdfsWriter {
 
 #[async_trait]
 impl oio::Write for AzdfsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
-
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
         let mut req = self.core.azdfs_create_request(
             &self.path,
             "file",
@@ -68,9 +66,13 @@ impl oio::Write for AzdfsWriter {
             }
         }
 
-        let mut req =
-            self.core
-                .azdfs_update_request(&self.path, Some(bs.len()), 
AsyncBody::Bytes(bs))?;
+        let size = bs.remaining();
+
+        let mut req = self.core.azdfs_update_request(
+            &self.path,
+            Some(size),
+            AsyncBody::Bytes(bs.copy_to_bytes(size)),
+        )?;
 
         self.core.sign(&mut req).await?;
 
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index af2a6ebd0..2c187fc31 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -18,12 +18,10 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Buf;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -52,14 +50,15 @@ impl CosWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for CosWriter {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, buf: &dyn Buf) -> Result<()> {
+        let size = buf.remaining();
         let mut req = self.core.cos_put_object_request(
             &self.path,
-            Some(size),
+            Some(size as u64),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            AsyncBody::Stream(stream),
+            AsyncBody::Bytes(buf.copy_to_bytes(size)),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -98,7 +97,8 @@ impl oio::MultipartUploadWrite for CosWriter {
                 let bs = resp.into_body().bytes().await?;
 
                 let result: InitiateMultipartUploadResult =
-                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
+                        .map_err(new_xml_deserialize_error)?;
 
                 Ok(result.upload_id)
             }
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 00998c77d..2c51efc41 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -40,8 +40,8 @@ impl DropboxWriter {
 
 #[async_trait]
 impl oio::Write for DropboxWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
 
         let resp = self
             .core
@@ -49,14 +49,14 @@ impl oio::Write for DropboxWriter {
                 &self.path,
                 Some(size),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
             )
             .await?;
         let status = resp.status();
         match status {
             StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(size as u64)
+                Ok(size)
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 4a9a0471d..b317bf6ab 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -49,21 +49,8 @@ impl<F> FsWriter<F> {
 
 #[async_trait]
 impl oio::Write for FsWriter<tokio::fs::File> {
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
-
-        self.f
-            .seek(SeekFrom::Start(self.pos))
-            .await
-            .map_err(parse_io_error)?;
-        self.f.write_all(&bs).await.map_err(parse_io_error)?;
-        self.pos += size;
-
-        Ok(size)
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        self.f.write(&bs.chunk()).await.map_err(parse_io_error)
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -87,18 +74,8 @@ impl oio::Write for FsWriter<tokio::fs::File> {
 }
 
 impl oio::BlockingWrite for FsWriter<std::fs::File> {
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        self.f
-            .seek(SeekFrom::Start(self.pos))
-            .map_err(parse_io_error)?;
-        self.f.write_all(&bs).map_err(parse_io_error)?;
-        self.pos += bs.len() as u64;
-
-        Ok(bs.len() as u64)
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        self.f.write(&bs.chunk()).map_err(parse_io_error)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 3488255f9..d77703c57 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -41,8 +41,9 @@ impl FtpWriter {
 
 #[async_trait]
 impl oio::Write for FtpWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
+        let bs = bs.copy_to_bytes(size);
 
         let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
         let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
@@ -52,7 +53,7 @@ impl oio::Write for FtpWriter {
 
         ftp_stream.finalize_put_stream(data_stream).await?;
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 68e13b27f..317c6b1a0 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -118,16 +118,16 @@ impl GcsWriter {
 
 #[async_trait]
 impl oio::Write for GcsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
 
         let location = match &self.location {
             Some(location) => location,
             None => {
-                if self.op.content_length().unwrap_or_default() == bs.len() as 
u64
-                    && self.written == 0
+                if self.op.content_length().unwrap_or_default() == size as u64 
&& self.written == 0
                 {
-                    self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+                    self.write_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+                        .await?;
 
                     return Ok(size);
                 } else {
@@ -138,7 +138,7 @@ impl oio::Write for GcsWriter {
             }
         };
 
-        self.buffer.push(bs);
+        self.buffer.push(bs.copy_to_bytes(size));
         // Return directly if the buffer is not full
         if self.buffer.len() <= self.write_fixed_size {
             return Ok(size);
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index 8d69bf8e2..606b3bf10 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -95,12 +95,14 @@ impl GdriveWriter {
 
 #[async_trait]
 impl oio::Write for GdriveWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
         if self.file_id.is_none() {
-            self.write_create(size, bs).await?;
+            self.write_create(size as u64, bs.copy_to_bytes(size))
+                .await?;
         } else {
-            self.write_overwrite(size, bs).await?;
+            self.write_overwrite(size as u64, bs.copy_to_bytes(size))
+                .await?;
         }
 
         Ok(size)
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index a7db6acab..faad830c2 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -42,18 +42,23 @@ impl GhacWriter {
 
 #[async_trait]
 impl oio::Write for GhacWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
+
         let req = self
             .backend
-            .ghac_upload(self.cache_id, size, AsyncBody::Bytes(bs))
+            .ghac_upload(
+                self.cache_id,
+                size as u64,
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
+            )
             .await?;
 
         let resp = self.backend.client.send(req).await?;
 
         if resp.status().is_success() {
             resp.into_body().consume().await?;
-            self.size += size;
+            self.size += size as u64;
             Ok(size)
         } else {
             Err(parse_error(resp)
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 90499e47c..12e4eb557 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -41,21 +41,8 @@ impl<F> HdfsWriter<F> {
 
 #[async_trait]
 impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-
-        while self.pos < bs.len() {
-            let n = self
-                .f
-                .write(&bs[self.pos..])
-                .await
-                .map_err(parse_io_error)?;
-            self.pos += n;
-        }
-        // Reset pos to 0 for next write.
-        self.pos = 0;
-
-        Ok(size as u64)
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        self.f.write(bs.chunk()).await.map_err(parse_io_error)
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -73,17 +60,8 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
 }
 
 impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
-    fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
-
-        while self.pos < bs.len() {
-            let n = self.f.write(&bs[self.pos..]).map_err(parse_io_error)?;
-            self.pos += n;
-        }
-        // Reset pos to 0 for next write.
-        self.pos = 0;
-
-        Ok(size as u64)
+    fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        self.f.write(bs.chunk()).map_err(parse_io_error)
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index d3f98e77b..7e5199b2a 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -38,9 +38,12 @@ impl IpmfsWriter {
 
 #[async_trait]
 impl oio::Write for IpmfsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
-        let resp = self.backend.ipmfs_write(&self.path, bs).await?;
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
+        let resp = self
+            .backend
+            .ipmfs_write(&self.path, bs.copy_to_bytes(size))
+            .await?;
 
         let status = resp.status();
 
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index d3b1e119f..e37c51a45 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -18,13 +18,11 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Buf;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
 use crate::raw::oio::MultipartUploadPart;
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -53,13 +51,14 @@ impl ObsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for ObsWriter {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, bs: &dyn Buf) -> Result<()> {
+        let size = bs.remaining();
         let mut req = self.core.obs_put_object_request(
             &self.path,
-            Some(size),
+            Some(size as u64),
             self.op.content_type(),
             self.op.cache_control(),
-            AsyncBody::Stream(stream),
+            AsyncBody::Bytes(bs.copy_to_bytes(size)),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -93,7 +92,8 @@ impl oio::MultipartUploadWrite for ObsWriter {
                 let bs = resp.into_body().bytes().await?;
 
                 let result: InitiateMultipartUploadResult =
-                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
+                        .map_err(new_xml_deserialize_error)?;
 
                 Ok(result.upload_id)
             }
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index abb846871..49337718a 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Buf;
 use bytes::Bytes;
 use http::StatusCode;
 
@@ -46,8 +45,9 @@ impl OneDriveWriter {
 
 #[async_trait]
 impl oio::Write for OneDriveWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
+        let bs = bs.copy_to_bytes(size);
 
         if size <= Self::MAX_SIMPLE_SIZE {
             self.write_simple(bs).await?;
@@ -55,7 +55,7 @@ impl oio::Write for OneDriveWriter {
             self.write_chunked(bs).await?;
         }
 
-        Ok(size as u64)
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -167,7 +167,7 @@ impl OneDriveWriter {
             StatusCode::OK => {
                 let bs = resp.into_body().bytes().await?;
                 let result: OneDriveUploadSessionCreationResponseBody =
-                    
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
                 Ok(result)
             }
             _ => Err(parse_error(resp).await?),
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 27aa09011..95935ba96 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -18,12 +18,10 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Buf;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -52,14 +50,15 @@ impl OssWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for OssWriter {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, bs: &dyn Buf) -> Result<()> {
+        let size = bs.remaining();
         let mut req = self.core.oss_put_object_request(
             &self.path,
-            Some(size),
+            Some(size as u64),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            AsyncBody::Stream(stream),
+            AsyncBody::Bytes(bs.copy_to_bytes(size)),
             false,
         )?;
 
@@ -100,7 +99,8 @@ impl oio::MultipartUploadWrite for OssWriter {
                 let bs = resp.into_body().bytes().await?;
 
                 let result: InitiateMultipartUploadResult =
-                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
+                        .map_err(new_xml_deserialize_error)?;
 
                 Ok(result.upload_id)
             }
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index a27341d71..4b8df2246 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -18,12 +18,10 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Buf;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -49,14 +47,16 @@ impl S3Writer {
 
 #[async_trait]
 impl oio::OneShotWrite for S3Writer {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, bs: &dyn Buf) -> Result<()> {
+        let size = bs.remaining();
+
         let mut req = self.core.s3_put_object_request(
             &self.path,
-            Some(size),
+            Some(size as u64),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            AsyncBody::Stream(stream),
+            AsyncBody::Bytes(bs.copy_to_bytes(size)),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -95,7 +95,8 @@ impl oio::MultipartUploadWrite for S3Writer {
                 let bs = resp.into_body().bytes().await?;
 
                 let result: InitiateMultipartUploadResult =
-                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
+                        .map_err(new_xml_deserialize_error)?;
 
                 Ok(result.upload_id)
             }
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 2df725ab2..5ddbd5ea0 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -20,9 +20,7 @@ use bytes::Bytes;
 use openssh_sftp_client::file::File;
 
 use crate::raw::oio;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Result;
+use crate::*;
 
 pub struct SftpWriter {
     file: File,
@@ -36,9 +34,8 @@ impl SftpWriter {
 
 #[async_trait]
 impl oio::Write for SftpWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
-        self.file.write_all(&bs).await?;
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = self.file.write(bs.chunk()).await?;
 
         Ok(size)
     }
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index 5c57d7a95..6d592512f 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -67,14 +67,10 @@ impl SupabaseWriter {
 
 #[async_trait]
 impl oio::Write for SupabaseWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        if bs.is_empty() {
-            return Ok(9);
-        }
-
-        let size = bs.len();
-        self.upload(bs).await?;
-        Ok(size as u64)
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
+        self.upload(bs.copy_to_bytes(size)).await?;
+        Ok(size)
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index efb223df6..a6d9b24b9 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -39,15 +39,15 @@ impl VercelArtifactsWriter {
 
 #[async_trait]
 impl oio::Write for VercelArtifactsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
 
         let resp = self
             .backend
             .vercel_artifacts_put(
                 self.path.as_str(),
                 self.op.content_length().unwrap(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
             )
             .await?;
 
@@ -56,7 +56,7 @@ impl oio::Write for VercelArtifactsWriter {
         match status {
             StatusCode::OK | StatusCode::ACCEPTED => {
                 resp.into_body().consume().await?;
-                Ok(size as u64)
+                Ok(size)
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 55d898ccc..b2825291b 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -41,8 +41,8 @@ impl WasabiWriter {
 
 #[async_trait]
 impl oio::Write for WasabiWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
 
         let resp = self
             .core
@@ -52,14 +52,14 @@ impl oio::Write for WasabiWriter {
                 self.op.content_type(),
                 self.op.content_disposition(),
                 self.op.cache_control(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
             )
             .await?;
 
         match resp.status() {
             StatusCode::CREATED | StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(size as u64)
+                Ok(size)
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 30b9827f2..ad9744ea0 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -62,10 +62,11 @@ impl WebdavWriter {
 
 #[async_trait]
 impl oio::Write for WebdavWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len() as u64;
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
 
-        self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+        self.write_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+            .await?;
 
         Ok(size)
     }
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 8dcbc9dc3..6357a9e3a 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -39,8 +39,8 @@ impl WebhdfsWriter {
 
 #[async_trait]
 impl oio::Write for WebhdfsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        let size = bs.len();
+    async fn write(&mut self, bs: &dyn Buf) -> Result<usize> {
+        let size = bs.remaining();
 
         let req = self
             .backend
@@ -48,7 +48,7 @@ impl oio::Write for WebhdfsWriter {
                 &self.path,
                 Some(size),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Bytes(bs.copy_to_bytes(size)),
             )
             .await?;
 
@@ -58,7 +58,7 @@ impl oio::Write for WebhdfsWriter {
         match status {
             StatusCode::CREATED | StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(size as u64)
+                Ok(size)
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/raw/oio/buf.rs b/core/src/types/buf.rs
similarity index 89%
rename from core/src/raw/oio/buf.rs
rename to core/src/types/buf.rs
index 9b7aa7135..18024641a 100644
--- a/core/src/raw/oio/buf.rs
+++ b/core/src/types/buf.rs
@@ -22,7 +22,7 @@ use std::{cmp, ptr};
 ///
 /// The biggest difference is that `Buf`'s `copy_to_slice` and `copy_to_bytes` 
only needs `&self`
 /// instead of `&mut self`.
-pub trait Buf {
+pub trait Buf: Send + Sync {
     /// Returns the number of bytes between the current position and the end 
of the buffer.
     ///
     /// This value is greater than or equal to the length of the slice 
returned by chunk().
@@ -81,11 +81,12 @@ pub trait Buf {
     /// # Notes
     ///
     /// Users should not assume the returned bytes is the same as the 
Buf::remaining().
-    fn copy_to_bytes(&self) -> Bytes {
+    fn copy_to_bytes(&self, len: usize) -> Bytes {
         let src = self.chunk();
+        let size = cmp::min(src.len(), len);
 
-        let mut ret = BytesMut::with_capacity(src.len());
-        ret.extend_from_slice(src);
+        let mut ret = BytesMut::with_capacity(size);
+        ret.extend_from_slice(&src[..size]);
         ret.freeze()
     }
 }
@@ -108,8 +109,8 @@ macro_rules! deref_forward_buf {
             (**self).copy_to_slice(dst)
         }
 
-        fn copy_to_bytes(&self) -> Bytes {
-            (**self).copy_to_bytes()
+        fn copy_to_bytes(&self, len: usize) -> Bytes {
+            (**self).copy_to_bytes(len)
         }
     };
 }
@@ -139,7 +140,7 @@ impl Buf for &[u8] {
     }
 }
 
-impl<T: AsRef<[u8]>> Buf for std::io::Cursor<T> {
+impl<T: AsRef<[u8]> + Send + Sync> Buf for std::io::Cursor<T> {
     fn remaining(&self) -> usize {
         let len = self.get_ref().as_ref().len();
         let pos = self.position();
@@ -189,8 +190,9 @@ impl Buf for Bytes {
     }
 
     #[inline]
-    fn copy_to_bytes(&self) -> Bytes {
-        self.clone()
+    fn copy_to_bytes(&self, len: usize) -> Bytes {
+        let size = cmp::min(self.len(), len);
+        self.slice(..size)
     }
 }
 
@@ -211,7 +213,8 @@ impl Buf for BytesMut {
     }
 
     #[inline]
-    fn copy_to_bytes(&self) -> Bytes {
-        self.clone().freeze()
+    fn copy_to_bytes(&self, len: usize) -> Bytes {
+        let size = cmp::min(self.len(), len);
+        Bytes::from(&self[..size])
     }
 }
diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs
index 0f75fa50c..dff3ea2cb 100644
--- a/core/src/types/mod.rs
+++ b/core/src/types/mod.rs
@@ -58,3 +58,6 @@ pub use scheme::Scheme;
 
 mod capability;
 pub use capability::Capability;
+
+mod buf;
+pub use buf::Buf;
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index c1d34f87d..c17054b67 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -550,7 +550,7 @@ impl BlockingOperator {
             self.inner().clone(),
             path,
             (OpWrite::default().with_content_length(bs.len() as u64), bs),
-            |inner, path, (args, bs)| {
+            |inner, path, (args, mut bs)| {
                 if !validate_path(&path, EntryMode::FILE) {
                     return Err(
                         Error::new(ErrorKind::IsADirectory, "write path is a 
directory")
@@ -561,7 +561,10 @@ impl BlockingOperator {
                 }
 
                 let (_, mut w) = inner.blocking_write(&path, args)?;
-                w.write(bs)?;
+                while bs.remaining() > 0 {
+                    let n = w.write(&bs)?;
+                    bs.advance(n);
+                }
                 w.close()?;
 
                 Ok(())
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index ab97a2df2..b51c2bc60 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -17,7 +17,7 @@
 
 use std::time::Duration;
 
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
 use futures::stream;
 use futures::AsyncReadExt;
 use futures::Stream;
@@ -716,7 +716,7 @@ impl Operator {
             self.inner().clone(),
             path,
             (OpWrite::default().with_content_length(bs.len() as u64), bs),
-            |inner, path, (args, bs)| {
+            |inner, path, (args, mut bs)| {
                 let fut = async move {
                     if !validate_path(&path, EntryMode::FILE) {
                         return Err(Error::new(
@@ -729,7 +729,11 @@ impl Operator {
                     }
 
                     let (_, mut w) = inner.write(&path, args).await?;
-                    w.write(bs).await?;
+                    while bs.remaining() > 0 {
+                        let n = w.write(&bs).await?;
+                        bs.advance(n);
+                    }
+
                     w.close().await?;
 
                     Ok(())
diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs
index 6efc8cdad..1688700d5 100644
--- a/core/src/types/reader.rs
+++ b/core/src/types/reader.rs
@@ -266,9 +266,7 @@ mod tests {
         let path = "test_file";
 
         let content = gen_random_bytes();
-        op.write(path, content.clone())
-            .await
-            .expect("write must succeed");
+        op.write(path, content).await.expect("write must succeed");
 
         let mut reader = op.reader(path).await.unwrap();
         let mut buf = Vec::new();
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 3ae7f6f4c..e78c83212 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -22,7 +22,7 @@ use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
-use bytes::{Buf, Bytes};
+use bytes::Bytes;
 use futures::future::BoxFuture;
 use futures::AsyncWrite;
 use futures::FutureExt;
@@ -80,7 +80,7 @@ impl Writer {
     }
 
     /// Write into inner writer.
-    pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
+    pub async fn write(&mut self, mut bs: impl Buf) -> Result<()> {
         let w = if let State::Idle(Some(w)) = &mut self.state {
             w
         } else {
@@ -90,11 +90,9 @@ impl Writer {
             );
         };
 
-        let mut bs = bs.into();
-
-        while !bs.is_empty() {
-            let n = w.write(bs.clone()).await?;
-            bs.advance(n as usize);
+        while bs.remaining() > 0 {
+            let n = w.write(&bs).await?;
+            bs.advance(n);
         }
 
         Ok(())
@@ -149,10 +147,10 @@ impl Writer {
         let mut written = 0;
         while let Some(bs) = sink_from.try_next().await? {
             let mut bs = bs.into();
-            while bs.has_remaining() {
-                let n = w.write(bs.clone()).await?;
-                bs.advance(n as usize);
-                written += n;
+            while bs.remaining() > 0 {
+                let n = w.write(&bs).await?;
+                bs.advance(n);
+                written += n as u64;
             }
         }
         Ok(written)
@@ -262,10 +260,9 @@ impl AsyncWrite for Writer {
                     let mut w = w
                         .take()
                         .expect("invalid state of writer: Idle state with 
empty write");
-                    let bs = Bytes::from(buf.to_vec());
                     let fut = async move {
-                        let n = w.write(bs).await?;
-                        Ok((n as usize, w))
+                        let n = w.write(&buf).await?;
+                        Ok((n, w))
                     };
                     self.state = State::Write(Box::pin(fut));
                 }
@@ -334,9 +331,8 @@ impl tokio::io::AsyncWrite for Writer {
                     let mut w = w
                         .take()
                         .expect("invalid state of writer: Idle state with 
empty write");
-                    let bs = Bytes::from(buf.to_vec());
                     let fut = async move {
-                        let n = w.write(bs).await?;
+                        let n = w.write(&buf).await?;
                         Ok((n as usize, w))
                     };
                     self.state = State::Write(Box::pin(fut));
@@ -414,12 +410,10 @@ impl BlockingWriter {
     }
 
     /// Write into inner writer.
-    pub fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
-        let mut bs = bs.into();
-
-        while !bs.is_empty() {
-            let n = self.inner.write(bs.clone())?;
-            bs.advance(n as usize);
+    pub fn write(&mut self, mut bs: impl Buf) -> Result<()> {
+        while bs.remaining() > 0 {
+            let n = self.inner.write(&bs)?;
+            bs.advance(n);
         }
 
         Ok(())
@@ -434,8 +428,7 @@ impl BlockingWriter {
 impl io::Write for BlockingWriter {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
         self.inner
-            .write(Bytes::from(buf.to_vec()))
-            .map(|n| n as usize)
+            .write(&buf)
             .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
     }
 

Reply via email to