This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 43d0327f4 refactor: Refactor oio::Write by accepting oio::Reader 
instead (#3008)
43d0327f4 is described below

commit 43d0327f488f26169454cd124fe34253eeb2dfe2
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 6 11:30:25 2023 +0800

    refactor: Refactor oio::Write by accepting oio::Reader instead (#3008)
    
    * Rename to pipe
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Save work
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Save code
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Remove at least writer
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix gcs test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/main.rs                         |   6 +-
 core/benches/oio/utils.rs                        |   3 +-
 core/benches/oio/write.rs                        |  35 +---
 core/src/layers/complete.rs                      |  10 +-
 core/src/layers/concurrent_limit.rs              |   4 +-
 core/src/layers/error_context.rs                 |   6 +-
 core/src/layers/logging.rs                       |   8 +-
 core/src/layers/madsim.rs                        |   2 +-
 core/src/layers/metrics.rs                       |   4 +-
 core/src/layers/minitrace.rs                     |   6 +-
 core/src/layers/oteltrace.rs                     |   4 +-
 core/src/layers/prometheus.rs                    |   4 +-
 core/src/layers/retry.rs                         |  18 +-
 core/src/layers/throttle.rs                      |   5 +-
 core/src/layers/timeout.rs                       |   6 +-
 core/src/layers/tracing.rs                       |   4 +-
 core/src/raw/adapters/kv/backend.rs              |   2 +-
 core/src/raw/adapters/typed_kv/backend.rs        |   2 +-
 core/src/raw/oio/cursor.rs                       |   2 +-
 core/src/raw/oio/read/cloneable_read.rs          | 137 ++++++++++++++
 core/src/raw/oio/read/into_read_from_file.rs     |   4 +-
 core/src/raw/oio/read/into_read_from_stream.rs   |  83 +++++++++
 core/src/raw/oio/read/mod.rs                     |  10 ++
 core/src/raw/oio/stream/api.rs                   |  12 ++
 core/src/raw/oio/write/api.rs                    |  14 +-
 core/src/raw/oio/write/append_object_write.rs    |   5 +-
 core/src/raw/oio/write/at_least_buf_write.rs     | 140 ---------------
 core/src/raw/oio/write/compose_write.rs          |  15 +-
 core/src/raw/oio/write/exact_buf_write.rs        | 218 +++++++++++------------
 core/src/raw/oio/write/mod.rs                    |   3 -
 core/src/raw/oio/write/multipart_upload_write.rs |   9 +-
 core/src/raw/oio/write/one_shot_write.rs         |   4 +-
 core/src/services/azblob/writer.rs               |   8 +-
 core/src/services/azdfs/writer.rs                |   2 +-
 core/src/services/cos/backend.rs                 |   5 +-
 core/src/services/dropbox/writer.rs              |   2 +-
 core/src/services/fs/writer.rs                   |   2 +-
 core/src/services/ftp/writer.rs                  |   2 +-
 core/src/services/gcs/writer.rs                  |  11 +-
 core/src/services/gdrive/writer.rs               |   2 +-
 core/src/services/ghac/writer.rs                 |   2 +-
 core/src/services/hdfs/writer.rs                 |   2 +-
 core/src/services/ipmfs/writer.rs                |   2 +-
 core/src/services/obs/backend.rs                 |   5 +-
 core/src/services/onedrive/writer.rs             |   2 +-
 core/src/services/oss/backend.rs                 |   5 +-
 core/src/services/s3/backend.rs                  |  17 +-
 core/src/services/sftp/writer.rs                 |   2 +-
 core/src/services/supabase/writer.rs             |   2 +-
 core/src/services/vercel_artifacts/writer.rs     |   2 +-
 core/src/services/wasabi/writer.rs               |   2 +-
 core/src/services/webdav/writer.rs               |   5 +-
 core/src/services/webhdfs/writer.rs              |   2 +-
 core/src/types/writer.rs                         |  26 +--
 core/tests/behavior/write.rs                     |  11 +-
 55 files changed, 478 insertions(+), 428 deletions(-)

diff --git a/core/benches/oio/main.rs b/core/benches/oio/main.rs
index 982d29dfb..85ca2ebbe 100644
--- a/core/benches/oio/main.rs
+++ b/core/benches/oio/main.rs
@@ -21,9 +21,5 @@ mod write;
 use criterion::criterion_group;
 use criterion::criterion_main;
 
-criterion_group!(
-    benches,
-    write::bench_at_least_buf_write,
-    write::bench_exact_buf_write,
-);
+criterion_group!(benches, write::bench_exact_buf_write,);
 criterion_main!(benches);
diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 9a14442c2..53aa665ca 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -18,7 +18,6 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 use opendal::raw::oio;
-use opendal::raw::oio::Streamer;
 use rand::prelude::ThreadRng;
 use rand::RngCore;
 
@@ -31,7 +30,7 @@ impl oio::Write for BlackHoleWriter {
         Ok(bs.len() as u64)
     }
 
-    async fn sink(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> {
+    async fn pipe(&mut self, size: u64, _: oio::Reader) -> 
opendal::Result<u64> {
         Ok(size)
     }
 
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 6e26ce7e0..a227d4901 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use bytes::Buf;
 use criterion::Criterion;
 use once_cell::sync::Lazy;
-use opendal::raw::oio::AtLeastBufWriter;
 use opendal::raw::oio::ExactBufWriter;
 use opendal::raw::oio::Write;
 use rand::thread_rng;
@@ -28,32 +28,6 @@ use super::utils::*;
 pub static TOKIO: Lazy<tokio::runtime::Runtime> =
     Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime"));
 
-pub fn bench_at_least_buf_write(c: &mut Criterion) {
-    let mut group = c.benchmark_group("at_least_buf_write");
-
-    let mut rng = thread_rng();
-
-    for size in [
-        Size::from_kibibytes(4),
-        Size::from_kibibytes(256),
-        Size::from_mebibytes(4),
-        Size::from_mebibytes(16),
-    ] {
-        let content = gen_bytes(&mut rng, size.bytes() as usize);
-
-        group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
-        group.bench_with_input(size.to_string(), &content, |b, content| {
-            b.to_async(&*TOKIO).iter(|| async {
-                let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024);
-                w.write(content.clone()).await.unwrap();
-                w.close().await.unwrap();
-            })
-        });
-    }
-
-    group.finish()
-}
-
 pub fn bench_exact_buf_write(c: &mut Criterion) {
     let mut group = c.benchmark_group("exact_buf_write");
 
@@ -71,7 +45,12 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
         group.bench_with_input(size.to_string(), &content, |b, content| {
             b.to_async(&*TOKIO).iter(|| async {
                 let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024);
-                w.write(content.clone()).await.unwrap();
+
+                let mut bs = content.clone();
+                while !bs.is_empty() {
+                    let n = w.write(bs.clone()).await.unwrap();
+                    bs.advance(n as usize);
+                }
                 w.close().await.unwrap();
             })
         });
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index e986d1a47..db470334c 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -729,12 +729,12 @@ where
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        w.write(bs).await?;
-        self.written += n as u64;
-        Ok(n as u64)
+        let n = w.write(bs).await?;
+        self.written += n;
+        Ok(n)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         if let Some(total_size) = self.size {
             if self.written + size > total_size {
                 return Err(Error::new(
@@ -750,7 +750,7 @@ where
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        let n = w.sink(size, s).await?;
+        let n = w.pipe(size, s).await?;
         self.written += n;
         Ok(n)
     }
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 96a682d61..c1504e6a2 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -293,8 +293,8 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
         self.inner.abort().await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.inner.sink(size, s).await
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.pipe(size, s).await
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 2acd6dd7d..33cebe92c 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -419,9 +419,9 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.inner.sink(size, s).await.map_err(|err| {
-            err.with_operation(WriteOperation::Sink)
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.pipe(size, s).await.map_err(|err| {
+            err.with_operation(WriteOperation::Pipe)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
         })
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 6c63f466f..5ed80a28b 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1285,15 +1285,15 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        match self.inner.sink(size, s).await {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        match self.inner.pipe(size, s).await {
             Ok(n) => {
                 self.written += n;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} written={} -> data sink 
{}B",
                     self.ctx.scheme,
-                    WriteOperation::Sink,
+                    WriteOperation::Pipe,
                     self.path,
                     self.written,
                     n
@@ -1307,7 +1307,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                         lvl,
                         "service={} operation={} path={} written={} -> data 
sink failed: {}",
                         self.ctx.scheme,
-                        WriteOperation::Sink,
+                        WriteOperation::Pipe,
                         self.path,
                         self.written,
                         self.ctx.error_print(&err),
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 17835e5ba..55741389d 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> 
crate::Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> crate::Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "will be supported in the future",
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 181ebb3c0..a96e83f8a 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -861,9 +861,9 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
             })
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner
-            .sink(size, s)
+            .pipe(size, s)
             .await
             .map(|n| {
                 self.bytes += n;
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 75c852c3a..f6487aa98 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -347,11 +347,11 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
             .await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner
-            .sink(size, s)
+            .pipe(size, s)
             .in_span(Span::enter_with_parent(
-                WriteOperation::Sink.into_static(),
+                WriteOperation::Pipe.into_static(),
                 &self.span,
             ))
             .await
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index fde87e9ba..9bd464be5 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -317,8 +317,8 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
         self.inner.write(bs).await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.inner.sink(size, s).await
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.pipe(size, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 644532bf6..70aae731e 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -679,9 +679,9 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner
-            .sink(size, s)
+            .pipe(size, s)
             .await
             .map(|n| {
                 self.stats
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 07b92e24c..d5c8a0738 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -34,7 +34,6 @@ use backon::Retryable;
 use bytes::Bytes;
 use futures::FutureExt;
 use log::warn;
-use tokio::sync::Mutex;
 
 use crate::raw::oio::PageOperation;
 use crate::raw::oio::ReadOperation;
@@ -919,26 +918,27 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
     /// The overhead is constant, which means the overhead will not increase 
with the size of
     /// stream. For example, if every `next` call cost 1ms, then the overhead 
will only take 0.005%
     /// which is acceptable.
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        let s = Arc::new(Mutex::new(s));
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        let s = oio::into_cloneable_reader_within_tokio(s);
 
         let mut backoff = self.builder.build();
 
         loop {
-            match self.inner.sink(size, Box::new(s.clone())).await {
+            match self.inner.pipe(size, Box::new(s.clone())).await {
                 Ok(n) => return Ok(n),
                 Err(e) if !e.is_temporary() => return Err(e),
                 Err(e) => match backoff.next() {
                     None => return Err(e),
                     Some(dur) => {
                         {
-                            use oio::StreamExt;
+                            use oio::ReadExt;
 
+                            let s = s.clone().into_inner();
                             let mut stream = s.lock().await;
-                            // Try to reset this stream.
+                            // Try to reset this reader.
                             //
-                            // If error happened, we will return the sink 
error directly and stop retry.
-                            if stream.reset().await.is_err() {
+                            // If error happened, we will return the pipe 
error directly and stop retry.
+                            if 
stream.seek(io::SeekFrom::Start(0)).await.is_err() {
                                 return Err(e);
                             }
                         }
@@ -947,7 +947,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
                             &e,
                             dur,
                             &[
-                                ("operation", 
WriteOperation::Sink.into_static()),
+                                ("operation", 
WriteOperation::Pipe.into_static()),
                                 ("path", &self.path),
                             ],
                         );
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index a88d1c701..d821aacbf 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -33,7 +33,6 @@ use governor::NegativeMultiDecision;
 use governor::Quota;
 use governor::RateLimiter;
 
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -242,8 +241,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
-        self.inner.sink(size, s).await
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.pipe(size, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index be2289d04..642b8a49b 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -335,14 +335,14 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
             })?
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         let timeout = self.io_timeout(size);
 
-        tokio::time::timeout(timeout, self.inner.sink(size, s))
+        tokio::time::timeout(timeout, self.inner.pipe(size, s))
             .await
             .map_err(|_| {
                 Error::new(ErrorKind::Unexpected, "operation timeout")
-                    .with_operation(WriteOperation::Sink)
+                    .with_operation(WriteOperation::Pipe)
                     .with_context("timeout", timeout.as_secs_f64().to_string())
                     .set_temporary()
             })?
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 33dcbdebc..467d0a153 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -332,8 +332,8 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.inner.sink(size, s).await
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.pipe(size, s).await
     }
 
     #[tracing::instrument(
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index be4913ff9..ce9ad3005 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -397,7 +397,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 48232c1fc..494118e84 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -410,7 +410,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index c9b670ead..796b2a349 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -180,7 +180,7 @@ impl oio::Stream for Cursor {
 /// ChunkedCursor is used represents a non-contiguous bytes in memory.
 ///
 /// This is useful when we buffer users' random writes without copy. 
ChunkedCursor implements
-/// [`oio::Stream`] so it can be used in [`oio::Write::sink`] directly.
+/// [`oio::Stream`] so it can be used in [`oio::Write::pipe`] directly.
 ///
 /// # TODO
 ///
diff --git a/core/src/raw/oio/read/cloneable_read.rs 
b/core/src/raw/oio/read/cloneable_read.rs
new file mode 100644
index 000000000..2ccd50042
--- /dev/null
+++ b/core/src/raw/oio/read/cloneable_read.rs
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use bytes::Bytes;
+use std::io::SeekFrom;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+/// Convert given reader into a wrapper with `std::sync::Mutex` for `Send + 
Sync + Clone`.
+pub fn into_cloneable_reader_within_std<R>(reader: R) -> 
CloneableReaderWithinStd<R> {
+    CloneableReaderWithinStd(Arc::new(std::sync::Mutex::new(reader)))
+}
+
+/// CloneableReaderWithinStd is a Send + Sync + Clone with `std::sync::Mutex` 
wrapper of input
+/// reader.
+///
+/// Caller can clone this reader but only one thread can calling `oio::Read` 
API at the
+/// same time, otherwise, we will return error if lock block happened.
+pub struct CloneableReaderWithinStd<R>(Arc<std::sync::Mutex<R>>);
+
+impl<R> CloneableReaderWithinStd<R> {
+    /// Consume self to get inner reader.
+    pub fn into_inner(self) -> Arc<std::sync::Mutex<R>> {
+        self.0
+    }
+}
+
+impl<R> Clone for CloneableReaderWithinStd<R> {
+    fn clone(&self) -> Self {
+        Self(self.0.clone())
+    }
+}
+
+impl<R: oio::Read> oio::Read for CloneableReaderWithinStd<R> {
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_read(cx, buf),
+            Err(_) => Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            ))),
+        }
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_seek(cx, pos),
+            Err(_) => Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            ))),
+        }
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_next(cx),
+            Err(_) => Poll::Ready(Some(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            )))),
+        }
+    }
+}
+
+/// Convert given reader into a wrapper with `tokio::sync::Mutex` for `Send + 
Sync + Clone`.
+pub fn into_cloneable_reader_within_tokio<R>(reader: R) -> 
CloneableReaderWithinTokio<R> {
+    CloneableReaderWithinTokio(Arc::new(tokio::sync::Mutex::new(reader)))
+}
+
+/// CloneableReaderWithinTokio is a Send + Sync + Clone with 
`tokio::sync::Mutex` wrapper of input
+/// reader.
+///
+/// Caller can clone this reader but only one thread can calling `oio::Read` 
API at the
+/// same time, otherwise, we will return error if lock block happened.
+pub struct CloneableReaderWithinTokio<R>(Arc<tokio::sync::Mutex<R>>);
+
+impl<R> CloneableReaderWithinTokio<R> {
+    /// Consume self to get inner reader.
+    pub fn into_inner(self) -> Arc<tokio::sync::Mutex<R>> {
+        self.0
+    }
+}
+
+impl<R> Clone for CloneableReaderWithinTokio<R> {
+    fn clone(&self) -> Self {
+        Self(self.0.clone())
+    }
+}
+
+impl<R: oio::Read> oio::Read for CloneableReaderWithinTokio<R> {
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_read(cx, buf),
+            Err(_) => Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            ))),
+        }
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_seek(cx, pos),
+            Err(_) => Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            ))),
+        }
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_next(cx),
+            Err(_) => Poll::Ready(Some(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            )))),
+        }
+    }
+}
diff --git a/core/src/raw/oio/read/into_read_from_file.rs 
b/core/src/raw/oio/read/into_read_from_file.rs
index 96e80f515..f005ac737 100644
--- a/core/src/raw/oio/read/into_read_from_file.rs
+++ b/core/src/raw/oio/read/into_read_from_file.rs
@@ -41,7 +41,7 @@ pub fn into_read_from_file<R>(fd: R, start: u64, end: u64) -> 
FromFileReader<R>
     }
 }
 
-/// FdReader is a wrapper of input fd to implement [`oio::Read`].
+/// FromFileReader is a wrapper of input fd to implement [`oio::Read`].
 pub struct FromFileReader<R> {
     inner: R,
 
@@ -126,7 +126,7 @@ where
 
         Poll::Ready(Some(Err(Error::new(
             ErrorKind::Unsupported,
-            "output reader doesn't support seeking",
+            "output reader doesn't support next",
         ))))
     }
 }
diff --git a/core/src/raw/oio/read/into_read_from_stream.rs 
b/core/src/raw/oio/read/into_read_from_stream.rs
new file mode 100644
index 000000000..0a231de2b
--- /dev/null
+++ b/core/src/raw/oio/read/into_read_from_stream.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use bytes::{Buf, Bytes};
+use futures::StreamExt;
+use std::io::SeekFrom;
+use std::task::{Context, Poll};
+
+/// Convert given stream `futures::Stream<Item = Result<Bytes>>` into 
[`oio::Reader`].
+pub fn into_read_from_stream<S>(stream: S) -> FromStreamReader<S> {
+    FromStreamReader {
+        inner: stream,
+        buf: Bytes::new(),
+    }
+}
+
+/// FromStreamReader will convert a `futures::Stream<Item = Result<Bytes>>` 
into `oio::Read`
+pub struct FromStreamReader<S> {
+    inner: S,
+    buf: Bytes,
+}
+
+impl<S, T> oio::Read for FromStreamReader<S>
+where
+    S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static,
+    T: Into<Bytes>,
+{
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        if !self.buf.is_empty() {
+            let len = std::cmp::min(buf.len(), self.buf.len());
+            buf[..len].copy_from_slice(&self.buf[..len]);
+            self.buf.advance(len);
+            return Poll::Ready(Ok(len));
+        }
+
+        match futures::ready!(self.inner.poll_next_unpin(cx)) {
+            Some(Ok(bytes)) => {
+                let bytes = bytes.into();
+                let len = std::cmp::min(buf.len(), bytes.len());
+                buf[..len].copy_from_slice(&bytes[..len]);
+                self.buf = bytes.slice(len..);
+                Poll::Ready(Ok(len))
+            }
+            Some(Err(err)) => Poll::Ready(Err(err)),
+            None => Poll::Ready(Ok(0)),
+        }
+    }
+
+    fn poll_seek(&mut self, _: &mut Context<'_>, _: SeekFrom) -> 
Poll<Result<u64>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "FromStreamReader can't support operation",
+        )))
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if !self.buf.is_empty() {
+            return Poll::Ready(Some(Ok(std::mem::take(&mut self.buf))));
+        }
+
+        match futures::ready!(self.inner.poll_next_unpin(cx)) {
+            Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.into()))),
+            Some(Err(err)) => Poll::Ready(Some(Err(err))),
+            None => Poll::Ready(None),
+        }
+    }
+}
diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs
index 64466bbdd..841dfdd1a 100644
--- a/core/src/raw/oio/read/mod.rs
+++ b/core/src/raw/oio/read/mod.rs
@@ -34,3 +34,13 @@ pub use into_seekable_read_by_range::ByRangeSeekableReader;
 mod into_read_from_file;
 pub use into_read_from_file::into_read_from_file;
 pub use into_read_from_file::FromFileReader;
+
+mod into_read_from_stream;
+pub use into_read_from_stream::into_read_from_stream;
+pub use into_read_from_stream::FromStreamReader;
+
+mod cloneable_read;
+pub use cloneable_read::into_cloneable_reader_within_std;
+pub use cloneable_read::into_cloneable_reader_within_tokio;
+pub use cloneable_read::CloneableReaderWithinStd;
+pub use cloneable_read::CloneableReaderWithinTokio;
diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs
index 132fb78fc..9edfceb6a 100644
--- a/core/src/raw/oio/stream/api.rs
+++ b/core/src/raw/oio/stream/api.rs
@@ -69,6 +69,18 @@ impl<T: Stream + ?Sized> Stream for Box<T> {
     }
 }
 
+impl Stream for dyn raw::oio::Read {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        raw::oio::Read::poll_next(self, cx)
+    }
+
+    fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        let _ = raw::oio::Read::poll_seek(self, cx, 
std::io::SeekFrom::Start(0))?;
+
+        Poll::Ready(Ok(()))
+    }
+}
+
 impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> {
     fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
         match self.try_lock() {
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index 8ced843da..ba5a2b675 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -30,8 +30,8 @@ use crate::*;
 pub enum WriteOperation {
     /// Operation for [`Write::write`]
     Write,
-    /// Operation for [`Write::sink`]
-    Sink,
+    /// Operation for [`Write::pipe`]
+    Pipe,
     /// Operation for [`Write::abort`]
     Abort,
     /// Operation for [`Write::close`]
@@ -61,7 +61,7 @@ impl From<WriteOperation> for &'static str {
 
         match v {
             Write => "Writer::write",
-            Sink => "Writer::sink",
+            Pipe => "Writer::pipe",
             Abort => "Writer::abort",
             Close => "Writer::close",
             BlockingWrite => "BlockingWriter::write",
@@ -96,7 +96,7 @@ pub trait Write: Unpin + Send + Sync {
     ///
     /// It's possible that `n < size`, caller should pass the remaining bytes
     /// repeatedly until all bytes has been written.
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64>;
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64>;
 
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
@@ -113,7 +113,7 @@ impl Write for () {
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
-    async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _: u64, _: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support sink",
@@ -144,8 +144,8 @@ impl<T: Write + ?Sized> Write for Box<T> {
         (**self).write(bs).await
     }
 
-    async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<u64> {
-        (**self).sink(n, s).await
+    async fn pipe(&mut self, n: u64, s: oio::Reader) -> Result<u64> {
+        (**self).pipe(n, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index b047ef43d..0e5fd9ed3 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -18,7 +18,6 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -92,11 +91,11 @@ where
         Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         let offset = self.offset().await?;
 
         self.inner
-            .append(offset, size, AsyncBody::Stream(s))
+            .append(offset, size, AsyncBody::Stream(Box::new(s)))
             .await
             .map(|_| self.offset = Some(offset + size))?;
 
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
deleted file mode 100644
index 51f5d2645..000000000
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ /dev/null
@@ -1,140 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use async_trait::async_trait;
-use bytes::Bytes;
-
-use crate::raw::oio::StreamExt;
-use crate::raw::oio::Streamer;
-use crate::raw::*;
-use crate::*;
-
-/// AtLeastBufWriter is used to implement [`oio::Write`] based on at least 
buffer strategy: flush
-/// the underlying storage when the buffered size is larger.
-///
-/// AtLeastBufWriter makes sure that the size of the data written to the 
underlying storage is at
-/// least `buffer_size` bytes. It's useful when the underlying storage has a 
minimum size limit.
-///
-/// For example, S3 requires at least 5MiB for multipart uploads.
-pub struct AtLeastBufWriter<W: oio::Write> {
-    inner: W,
-
-    /// The total size of the data.
-    ///
-    /// If the total size is known, we will write to underlying storage 
directly without buffer it
-    /// when possible.
-    total_size: Option<u64>,
-
-    /// The size for buffer, we will flush the underlying storage if the 
buffer is full.
-    buffer_size: usize,
-    buffer: oio::ChunkedCursor,
-}
-
-impl<W: oio::Write> AtLeastBufWriter<W> {
-    /// Create a new at least buf writer.
-    pub fn new(inner: W, buffer_size: usize) -> Self {
-        Self {
-            inner,
-            total_size: None,
-            buffer_size,
-            buffer: oio::ChunkedCursor::new(),
-        }
-    }
-
-    /// Configure the total size for writer.
-    pub fn with_total_size(mut self, total_size: Option<u64>) -> Self {
-        self.total_size = total_size;
-        self
-    }
-}
-
-#[async_trait]
-impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        // If total size is known and equals to given bytes, we can write it 
directly.
-        if let Some(total_size) = self.total_size {
-            if total_size == bs.len() as u64 {
-                return self.inner.write(bs).await;
-            }
-        }
-
-        // Push the bytes into the buffer if the buffer is not full.
-        if self.buffer.len() + bs.len() < self.buffer_size {
-            let size = bs.len();
-            self.buffer.push(bs);
-            return Ok(size as u64);
-        }
-
-        let mut buf = self.buffer.clone();
-        buf.push(bs);
-
-        self.inner
-            .sink(buf.len() as u64, Box::new(buf))
-            .await
-            // Clear buffer if the write is successful.
-            .map(|v| {
-                self.buffer.clear();
-                v
-            })
-    }
-
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
-        // If total size is known and equals to given stream, we can write it 
directly.
-        if let Some(total_size) = self.total_size {
-            if total_size == size {
-                return self.inner.sink(size, s).await;
-            }
-        }
-
-        // Push the bytes into the buffer if the buffer is not full.
-        if self.buffer.len() as u64 + size < self.buffer_size as u64 {
-            let bs = s.collect().await?;
-            let size = bs.len() as u64;
-            self.buffer.push(bs);
-            return Ok(size);
-        }
-
-        let buf = self.buffer.clone();
-        let buffer_size = buf.len() as u64;
-        let stream = buf.chain(s);
-
-        self.inner
-            .sink(buffer_size + size, Box::new(stream))
-            .await
-            // Clear buffer if the write is successful.
-            .map(|v| {
-                self.buffer.clear();
-                v
-            })
-    }
-
-    async fn abort(&mut self) -> Result<()> {
-        self.buffer.clear();
-        self.inner.abort().await
-    }
-
-    async fn close(&mut self) -> Result<()> {
-        if !self.buffer.is_empty() {
-            self.inner
-                .sink(self.buffer.len() as u64, Box::new(self.buffer.clone()))
-                .await?;
-            self.buffer.clear();
-        }
-
-        self.inner.close().await
-    }
-}
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index 79ddfc5ed..05dfdd775 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -41,7 +41,6 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -64,10 +63,10 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for 
TwoWaysWriter<ONE, TWO> {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         match self {
-            Self::One(one) => one.sink(size, s).await,
-            Self::Two(two) => two.sink(size, s).await,
+            Self::One(one) => one.pipe(size, s).await,
+            Self::Two(two) => two.pipe(size, s).await,
         }
     }
 
@@ -110,11 +109,11 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> 
oio::Write
         }
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         match self {
-            Self::One(one) => one.sink(size, s).await,
-            Self::Two(two) => two.sink(size, s).await,
-            Self::Three(three) => three.sink(size, s).await,
+            Self::One(one) => one.pipe(size, s).await,
+            Self::Two(two) => two.pipe(size, s).await,
+            Self::Three(three) => three.pipe(size, s).await,
         }
     }
 
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 8e2d8a922..b2209ed73 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -18,10 +18,10 @@
 use std::cmp::min;
 
 use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use tokio::io::ReadBuf;
 
-use crate::raw::oio::StreamExt;
-use crate::raw::oio::Streamer;
+use crate::raw::oio::ReadExt;
 use crate::raw::*;
 use crate::*;
 
@@ -41,9 +41,7 @@ pub struct ExactBufWriter<W: oio::Write> {
 
     /// The size for buffer, we will flush the underlying storage at the size 
of this buffer.
     buffer_size: usize,
-    buffer: oio::ChunkedCursor,
-
-    buffer_stream: Option<Streamer>,
+    buffer: Buffer,
 }
 
 impl<W: oio::Write> ExactBufWriter<W> {
@@ -52,133 +50,108 @@ impl<W: oio::Write> ExactBufWriter<W> {
         Self {
             inner,
             buffer_size,
-            buffer: oio::ChunkedCursor::new(),
-            buffer_stream: None,
-        }
-    }
-
-    /// Next bytes is used to fetch bytes from buffer or input streamer.
-    ///
-    /// We need this function because we need to make sure our write is 
reentrant.
-    /// We can't mutate state unless we are sure that the write is successful.
-    async fn next_bytes(&mut self, s: &mut Streamer) -> Option<Result<Bytes>> {
-        match self.buffer_stream.as_mut() {
-            None => s.next().await,
-            Some(bs) => match bs.next().await {
-                None => {
-                    self.buffer_stream = None;
-                    s.next().await
-                }
-                Some(v) => Some(v),
-            },
+            buffer: Buffer::Filling(BytesMut::new()),
         }
     }
+}
 
-    fn chain_stream(&mut self, s: Streamer) {
-        self.buffer_stream = match self.buffer_stream.take() {
-            Some(stream) => Some(Box::new(stream.chain(s))),
-            None => Some(s),
-        }
-    }
+enum Buffer {
+    Filling(BytesMut),
+    Consuming(Bytes),
 }
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
-            .await
+    async fn write(&mut self, mut bs: Bytes) -> Result<u64> {
+        loop {
+            match &mut self.buffer {
+                Buffer::Filling(fill) => {
+                    if fill.len() >= self.buffer_size {
+                        self.buffer = Buffer::Consuming(fill.split().freeze());
+                        continue;
+                    }
+
+                    let size = min(self.buffer_size - fill.len(), bs.len());
+                    fill.extend_from_slice(&bs[..size]);
+                    bs.advance(size);
+                    return Ok(size as u64);
+                }
+                Buffer::Consuming(consume) => {
+                    // Make sure filled buffer has been flushed.
+                    //
+                    // TODO: maybe we can re-fill it after a successful write.
+                    while !consume.is_empty() {
+                        let n = self.inner.write(consume.clone()).await?;
+                        consume.advance(n as usize);
+                    }
+                    self.buffer = Buffer::Filling(BytesMut::new());
+                }
+            }
+        }
     }
 
-    /// # TODO
-    ///
-    /// We know every stream size, we can collect them into a buffer without 
chain them every time.
-    async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<u64> {
-        if self.buffer.len() >= self.buffer_size {
-            let mut buf = self.buffer.clone();
-            let to_write = buf.split_to(self.buffer_size);
-            return self
-                .inner
-                .sink(to_write.len() as u64, Box::new(to_write))
-                .await
-                // Replace buffer with remaining if the write is successful.
-                .map(|v| {
-                    self.buffer = buf;
-                    self.chain_stream(s);
-                    v
-                });
-        }
+    async fn pipe(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
+        loop {
+            match &mut self.buffer {
+                Buffer::Filling(fill) => {
+                    if fill.len() >= self.buffer_size {
+                        self.buffer = Buffer::Consuming(fill.split().freeze());
+                        continue;
+                    }
 
-        let mut buf = self.buffer.clone();
-        while buf.len() < self.buffer_size {
-            let bs = self.next_bytes(&mut s).await.transpose()?;
-            match bs {
-                None => break,
-                Some(bs) => buf.push(bs),
-            }
-        }
+                    // Reserve to buffer size.
+                    fill.reserve(self.buffer_size - fill.len());
+                    let dst = fill.spare_capacity_mut();
+                    let dst_len = dst.len();
+                    let mut buf = ReadBuf::uninit(dst);
 
-        // Return directly if the buffer is not full.
-        //
-        // We don't need to chain stream here because it must be consumed.
-        if buf.len() < self.buffer_size {
-            let size = buf.len() as u64;
-            self.buffer = buf;
-            return Ok(size);
-        }
+                    // Safety: the input buffer is created 
with_capacity(length).
+                    unsafe { buf.assume_init(dst_len) };
 
-        let to_write = buf.split_to(self.buffer_size);
-        self.inner
-            .sink(to_write.len() as u64, Box::new(to_write))
-            .await
-            // Replace buffer with remaining if the write is successful.
-            .map(|v| {
-                self.buffer = buf;
-                self.chain_stream(s);
-                v
-            })
+                    let n = s.read(buf.initialize_unfilled()).await?;
+
+                    // Safety: read makes sure this buffer has been filled.
+                    unsafe { fill.advance_mut(n) };
+
+                    return Ok(n as u64);
+                }
+                Buffer::Consuming(consume) => {
+                    // Make sure filled buffer has been flushed.
+                    //
+                    // TODO: maybe we can re-fill it after a successful write.
+                    while !consume.is_empty() {
+                        let n = self.inner.write(consume.clone()).await?;
+                        consume.advance(n as usize);
+                    }
+                    self.buffer = Buffer::Filling(BytesMut::new());
+                }
+            }
+        }
     }
 
     async fn abort(&mut self) -> Result<()> {
-        self.buffer.clear();
-        self.buffer_stream = None;
-
+        self.buffer = Buffer::Filling(BytesMut::new());
         self.inner.abort().await
     }
 
     async fn close(&mut self) -> Result<()> {
-        while let Some(stream) = self.buffer_stream.as_mut() {
-            let bs = stream.next().await.transpose()?;
-            match bs {
-                None => {
-                    self.buffer_stream = None;
+        loop {
+            match &mut self.buffer {
+                Buffer::Filling(fill) => {
+                    self.buffer = Buffer::Consuming(fill.split().freeze());
+                    continue;
                 }
-                Some(bs) => {
-                    self.buffer.push(bs);
+                Buffer::Consuming(consume) => {
+                    // Make sure filled buffer has been flushed.
+                    //
+                    // TODO: maybe we can re-fill it after a successful write.
+                    while !consume.is_empty() {
+                        let n = self.inner.write(consume.clone()).await?;
+                        consume.advance(n as usize);
+                    }
+                    break;
                 }
             }
-
-            if self.buffer.len() >= self.buffer_size {
-                let mut buf = self.buffer.clone();
-                let to_write = buf.split_to(self.buffer_size);
-                self.inner
-                    .sink(to_write.len() as u64, Box::new(to_write))
-                    .await
-                    // Replace buffer with remaining if the write is 
successful.
-                    .map(|_| {
-                        self.buffer = buf;
-                    })?;
-            }
-        }
-
-        while !self.buffer.is_empty() {
-            let mut buf = self.buffer.clone();
-            let to_write = buf.split_to(min(self.buffer_size, buf.len()));
-
-            self.inner
-                .sink(to_write.len() as u64, Box::new(to_write))
-                .await
-                // Replace buffer with remaining if the write is successful.
-                .map(|_| self.buffer = buf)?;
         }
 
         self.inner.close().await
@@ -187,6 +160,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
 
 #[cfg(test)]
 mod tests {
+    use futures::AsyncReadExt;
     use log::debug;
     use pretty_assertions::assert_eq;
     use rand::thread_rng;
@@ -196,7 +170,6 @@ mod tests {
     use sha2::Sha256;
 
     use super::*;
-    use crate::raw::oio::StreamExt;
     use crate::raw::oio::Write;
 
     struct MockWriter {
@@ -212,10 +185,11 @@ mod tests {
             Ok(bs.len() as u64)
         }
 
-        async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
-            let bs = s.collect().await?;
+        async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> 
{
+            let mut bs = vec![];
+            s.read_to_end(&mut bs).await.unwrap();
             assert_eq!(bs.len() as u64, size);
-            self.write(bs).await
+            self.write(bs.into()).await
         }
 
         async fn abort(&mut self) -> Result<()> {
@@ -241,7 +215,12 @@ mod tests {
 
         let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10);
 
-        w.write(Bytes::from(expected.clone())).await?;
+        let mut bs = Bytes::from(expected.clone());
+        while !bs.is_empty() {
+            let n = w.write(bs.clone()).await?;
+            bs.advance(n as usize);
+        }
+
         w.close().await?;
 
         assert_eq!(w.inner.buf.len(), expected.len());
@@ -274,7 +253,12 @@ mod tests {
             rng.fill_bytes(&mut content);
 
             expected.extend_from_slice(&content);
-            writer.write(Bytes::from(content)).await?;
+
+            let mut bs = Bytes::from(content.clone());
+            while !bs.is_empty() {
+                let n = writer.write(bs.clone()).await?;
+                bs.advance(n as usize);
+            }
         }
         writer.close().await?;
 
diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs
index d06bacb2c..dfaf6c4ee 100644
--- a/core/src/raw/oio/write/mod.rs
+++ b/core/src/raw/oio/write/mod.rs
@@ -39,8 +39,5 @@ mod one_shot_write;
 pub use one_shot_write::OneShotWrite;
 pub use one_shot_write::OneShotWriter;
 
-mod at_least_buf_write;
-pub use at_least_buf_write::AtLeastBufWriter;
-
 mod exact_buf_write;
 pub use exact_buf_write::ExactBufWriter;
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 7bfd0342c..0b314c163 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -138,11 +138,16 @@ where
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         let upload_id = self.upload_id().await?;
 
         self.inner
-            .write_part(&upload_id, self.parts.len(), size, 
AsyncBody::Stream(s))
+            .write_part(
+                &upload_id,
+                self.parts.len(),
+                size,
+                AsyncBody::Stream(Box::new(s)),
+            )
             .await
             .map(|v| self.parts.push(v))?;
 
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index e6fe47616..a85feecf3 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -58,8 +58,8 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
         Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.inner.write_once(size, s).await?;
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.write_once(size, Box::new(s)).await?;
         Ok(size)
     }
 
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index a3b8abe30..11776755f 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -180,9 +180,10 @@ impl oio::Write for AzblobWriter {
         Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         if self.op.append() {
-            self.append_oneshot(size, AsyncBody::Stream(s)).await?;
+            self.append_oneshot(size, AsyncBody::Stream(Box::new(s)))
+                .await?;
         } else {
             if self.op.content_length().is_none() {
                 return Err(Error::new(
@@ -191,7 +192,8 @@ impl oio::Write for AzblobWriter {
                 ));
             }
 
-            self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+            self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
+                .await?;
         }
 
         Ok(size)
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index ff1125bfa..4b9b90f9f 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -88,7 +88,7 @@ impl oio::Write for AzdfsWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 3360d9dd5..ba162fae9 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -249,7 +249,7 @@ pub struct CosBackend {
 impl Accessor for CosBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<CosWriters, 
oio::AtLeastBufWriter<CosWriters>>;
+    type Writer = oio::TwoWaysWriter<CosWriters, 
oio::ExactBufWriter<CosWriters>>;
     type BlockingWriter = ();
     type Pager = CosPager;
     type BlockingPager = ();
@@ -348,8 +348,7 @@ impl Accessor for CosBackend {
         let w = if let Some(buffer_size) = args.buffer_size() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
-            let w =
-                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
+            let w = oio::ExactBufWriter::new(w, buffer_size);
 
             oio::TwoWaysWriter::Two(w)
         } else {
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 1b5b6b17d..37abab6a9 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for DropboxWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 9ca571077..ab27f2b65 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -67,7 +67,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
         Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> {
         while let Some(bs) = s.next().await {
             let bs = bs?;
             self.f
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 18dd6fed9..3bb11582d 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -55,7 +55,7 @@ impl oio::Write for FtpWriter {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 8c4431302..2e95f0b79 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -138,11 +138,6 @@ impl oio::Write for GcsWriter {
             }
         };
 
-        // Ignore empty bytes
-        if bs.is_empty() {
-            return Ok(0);
-        }
-
         self.buffer.push(bs);
         // Return directly if the buffer is not full
         if self.buffer.len() <= self.write_fixed_size {
@@ -150,7 +145,6 @@ impl oio::Write for GcsWriter {
         }
 
         let bs = self.buffer.peak_exact(self.write_fixed_size);
-        let size = bs.len() as u64;
 
         match self.write_part(location, bs).await {
             Ok(_) => {
@@ -167,8 +161,9 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
+            .await?;
         Ok(size)
     }
 
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index b33137137..df974d940 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -106,7 +106,7 @@ impl oio::Write for GdriveWriter {
         Ok(size)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
     }
 
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 6bd4bf057..bf9116ec2 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for GhacWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 011c8352e..3b60a4df3 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 43a46e500..1434e980a 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for IpmfsWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 6600ddd9b..4c3176cce 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -256,7 +256,7 @@ pub struct ObsBackend {
 impl Accessor for ObsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<ObsWriters, 
oio::AtLeastBufWriter<ObsWriters>>;
+    type Writer = oio::TwoWaysWriter<ObsWriters, 
oio::ExactBufWriter<ObsWriters>>;
     type BlockingWriter = ();
     type Pager = ObsPager;
     type BlockingPager = ();
@@ -386,8 +386,7 @@ impl Accessor for ObsBackend {
         let w = if let Some(buffer_size) = args.buffer_size() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
-            let w =
-                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
+            let w = oio::ExactBufWriter::new(w, buffer_size);
 
             oio::TwoWaysWriter::Two(w)
         } else {
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 1086f3fde..75a5e023a 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for OneDriveWriter {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index ee1131ee1..d924e4cc9 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -381,7 +381,7 @@ pub struct OssBackend {
 impl Accessor for OssBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::TwoWaysWriter<OssWriters, 
oio::AtLeastBufWriter<OssWriters>>;
+    type Writer = oio::TwoWaysWriter<OssWriters, 
oio::ExactBufWriter<OssWriters>>;
     type BlockingWriter = ();
     type Pager = OssPager;
     type BlockingPager = ();
@@ -484,8 +484,7 @@ impl Accessor for OssBackend {
         let w = if let Some(buffer_size) = args.buffer_size() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
-            let w =
-                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length());
+            let w = oio::ExactBufWriter::new(w, buffer_size);
 
             oio::TwoWaysWriter::Two(w)
         } else {
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 600fac3f3..2c82cc03c 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -888,11 +888,7 @@ pub struct S3Backend {
 impl Accessor for S3Backend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = oio::ThreeWaysWriter<
-        S3Writers,
-        oio::AtLeastBufWriter<S3Writers>,
-        oio::ExactBufWriter<S3Writers>,
-    >;
+    type Writer = oio::TwoWaysWriter<S3Writers, 
oio::ExactBufWriter<S3Writers>>;
     type BlockingWriter = ();
     type Pager = S3Pager;
     type BlockingPager = ();
@@ -991,16 +987,9 @@ impl Accessor for S3Backend {
         let w = if let Some(buffer_size) = args.buffer_size() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
 
-            if self.core.enable_exact_buf_write {
-                oio::ThreeWaysWriter::Three(oio::ExactBufWriter::new(w, 
buffer_size))
-            } else {
-                oio::ThreeWaysWriter::Two(
-                    oio::AtLeastBufWriter::new(w, buffer_size)
-                        .with_total_size(args.content_length()),
-                )
-            }
+            oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
         } else {
-            oio::ThreeWaysWriter::One(w)
+            oio::TwoWaysWriter::One(w)
         };
 
         Ok((RpWrite::default(), w))
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 71ac41d7c..5ee5d84ac 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -43,7 +43,7 @@ impl oio::Write for SftpWriter {
         Ok(size)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index b786896c2..36a572bfe 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -77,7 +77,7 @@ impl oio::Write for SupabaseWriter {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index 1db2d230f..936cddf01 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for VercelArtifactsWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 130e8e911..a39c835dc 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -65,7 +65,7 @@ impl oio::Write for WasabiWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 8dc093e65..bbe79eb31 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -70,8 +70,9 @@ impl oio::Write for WebdavWriter {
         Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
+            .await?;
 
         Ok(size)
     }
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 1b055f122..ae3396dc3 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -64,7 +64,7 @@ impl oio::Write for WebhdfsWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index f4b93dfa6..9479b7c73 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -138,8 +138,8 @@ impl Writer {
         T: Into<Bytes>,
     {
         if let State::Idle(Some(w)) = &mut self.state {
-            let s = Box::new(oio::into_stream(sink_from.map_ok(|v| v.into())));
-            w.sink(size, s).await
+            let r = Box::new(oio::into_read_from_stream(sink_from.map_ok(|v| 
v.into())));
+            w.pipe(size, r).await
         } else {
             unreachable!(
                 "writer state invalid while sink, expect Idle, actual {}",
@@ -180,11 +180,14 @@ impl Writer {
     /// ```
     pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<u64>
     where
-        R: futures::AsyncRead + Send + Sync + Unpin + 'static,
+        R: futures::AsyncRead + futures::AsyncSeek + Send + Sync + Unpin + 
'static,
     {
         if let State::Idle(Some(w)) = &mut self.state {
-            let s = Box::new(oio::into_stream_from_reader(read_from));
-            w.sink(size, s).await
+            let r = Box::new(oio::into_streamable_read(
+                oio::into_read_from_file(read_from, 0, size),
+                64 * 1024,
+            ));
+            w.pipe(size, r).await
         } else {
             unreachable!(
                 "writer state invalid while copy, expect Idle, actual {}",
@@ -257,10 +260,9 @@ impl AsyncWrite for Writer {
                         .take()
                         .expect("invalid state of writer: Idle state with 
empty write");
                     let bs = Bytes::from(buf.to_vec());
-                    let size = bs.len();
                     let fut = async move {
-                        w.write(bs).await?;
-                        Ok((size, w))
+                        let n = w.write(bs).await?;
+                        Ok((n as usize, w))
                     };
                     self.state = State::Write(Box::pin(fut));
                 }
@@ -324,10 +326,9 @@ impl tokio::io::AsyncWrite for Writer {
                         .take()
                         .expect("invalid state of writer: Idle state with 
empty write");
                     let bs = Bytes::from(buf.to_vec());
-                    let size = bs.len();
                     let fut = async move {
-                        w.write(bs).await?;
-                        Ok((size, w))
+                        let n = w.write(bs).await?;
+                        Ok((n as usize, w))
                     };
                     self.state = State::Write(Box::pin(fut));
                 }
@@ -417,10 +418,9 @@ impl BlockingWriter {
 
 impl io::Write for BlockingWriter {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        let size = buf.len();
         self.inner
             .write(Bytes::from(buf.to_vec()))
-            .map(|_| size)
+            .map(|n| n as usize)
             .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
     }
 
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 0ad042295..fe4166d78 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -19,6 +19,7 @@ use std::str::FromStr;
 use std::time::Duration;
 
 use anyhow::Result;
+use bytes::{Buf, Bytes};
 use futures::io::BufReader;
 use futures::io::Cursor;
 use futures::stream;
@@ -1194,13 +1195,19 @@ pub async fn test_writer_copy(op: Operator) -> 
Result<()> {
     let size = 5 * 1024 * 1024; // write file with 5 MiB
     let content_a = gen_fixed_bytes(size);
     let content_b = gen_fixed_bytes(size);
-    let reader = Cursor::new([content_a.clone(), content_b.clone()].concat());
 
     let mut w = op
         .writer_with(&path)
         .content_length(2 * size as u64)
         .await?;
-    w.copy(2 * size as u64, reader).await?;
+
+    let mut content = Bytes::from([content_a.clone(), 
content_b.clone()].concat());
+    while !content.is_empty() {
+        let reader = Cursor::new(content.clone());
+        let n = w.copy(2 * size as u64, reader).await?;
+        content.advance(n as usize);
+    }
+
     w.close().await?;
 
     let meta = op.stat(&path).await.expect("stat must succeed");

Reply via email to