This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-writer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit ff42dfae99cdea8218970a3db85e4a6f3cfe9189
Author: Xuanwo <[email protected]>
AuthorDate: Mon Sep 4 15:55:16 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/utils.rs                        |   3 +-
 core/src/layers/complete.rs                      |   2 +-
 core/src/layers/concurrent_limit.rs              |   2 +-
 core/src/layers/error_context.rs                 |   2 +-
 core/src/layers/logging.rs                       |   2 +-
 core/src/layers/madsim.rs                        |   2 +-
 core/src/layers/metrics.rs                       |   2 +-
 core/src/layers/minitrace.rs                     |   2 +-
 core/src/layers/oteltrace.rs                     |   2 +-
 core/src/layers/prometheus.rs                    |   2 +-
 core/src/layers/retry.rs                         |  14 +--
 core/src/layers/throttle.rs                      |   3 +-
 core/src/layers/timeout.rs                       |   2 +-
 core/src/layers/tracing.rs                       |   2 +-
 core/src/raw/adapters/kv/backend.rs              |   2 +-
 core/src/raw/adapters/typed_kv/backend.rs        |   2 +-
 core/src/raw/oio/cursor.rs                       |  14 +++
 core/src/raw/oio/read/cloneable_read.rs          | 137 +++++++++++++++++++++++
 core/src/raw/oio/read/into_read_from_file.rs     |   2 +-
 core/src/raw/oio/read/into_read_from_stream.rs   |  83 ++++++++++++++
 core/src/raw/oio/read/mod.rs                     |  10 ++
 core/src/raw/oio/stream/api.rs                   |  12 ++
 core/src/raw/oio/write/api.rs                    |   6 +-
 core/src/raw/oio/write/append_object_write.rs    |   5 +-
 core/src/raw/oio/write/at_least_buf_write.rs     |  32 +-----
 core/src/raw/oio/write/compose_write.rs          |   5 +-
 core/src/raw/oio/write/exact_buf_write.rs        |  55 ++-------
 core/src/raw/oio/write/multipart_upload_write.rs |   9 +-
 core/src/raw/oio/write/one_shot_write.rs         |   4 +-
 core/src/services/azblob/writer.rs               |   8 +-
 core/src/services/azdfs/writer.rs                |   2 +-
 core/src/services/dropbox/writer.rs              |   2 +-
 core/src/services/fs/writer.rs                   |   2 +-
 core/src/services/ftp/writer.rs                  |   2 +-
 core/src/services/gcs/writer.rs                  |   5 +-
 core/src/services/gdrive/writer.rs               |   2 +-
 core/src/services/ghac/writer.rs                 |   2 +-
 core/src/services/hdfs/writer.rs                 |   2 +-
 core/src/services/ipmfs/writer.rs                |   2 +-
 core/src/services/onedrive/writer.rs             |   2 +-
 core/src/services/sftp/writer.rs                 |   2 +-
 core/src/services/supabase/writer.rs             |   2 +-
 core/src/services/vercel_artifacts/writer.rs     |   2 +-
 core/src/services/wasabi/writer.rs               |   2 +-
 core/src/services/webdav/writer.rs               |   5 +-
 core/src/services/webhdfs/writer.rs              |   2 +-
 core/src/types/writer.rs                         |  10 +-
 47 files changed, 334 insertions(+), 142 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 88e764c87..53aa665ca 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -18,7 +18,6 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 use opendal::raw::oio;
-use opendal::raw::oio::Streamer;
 use rand::prelude::ThreadRng;
 use rand::RngCore;
 
@@ -31,7 +30,7 @@ impl oio::Write for BlackHoleWriter {
         Ok(bs.len() as u64)
     }
 
-    async fn pipe(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> {
+    async fn pipe(&mut self, size: u64, _: oio::Reader) -> 
opendal::Result<u64> {
         Ok(size)
     }
 
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 21efc47f8..6ebd5c23c 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -734,7 +734,7 @@ where
         Ok(n as u64)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         if let Some(total_size) = self.size {
             if self.written + size > total_size {
                 return Err(Error::new(
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 7384547f6..c1504e6a2 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -293,7 +293,7 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
         self.inner.abort().await
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner.pipe(size, s).await
     }
 
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 536b7c956..33cebe92c 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -419,7 +419,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner.pipe(size, s).await.map_err(|err| {
             err.with_operation(WriteOperation::Pipe)
                 .with_context("service", self.scheme)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 66708ec57..5ed80a28b 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1285,7 +1285,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         match self.inner.pipe(size, s).await {
             Ok(n) => {
                 self.written += n;
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 7dc193cb7..55741389d 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter {
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> 
crate::Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> crate::Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "will be supported in the future",
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 2439f0221..a96e83f8a 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -861,7 +861,7 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
             })
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner
             .pipe(size, s)
             .await
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 65f73ecd0..f6487aa98 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -347,7 +347,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
             .await
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner
             .pipe(size, s)
             .in_span(Span::enter_with_parent(
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index a439d79b3..9bd464be5 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -317,7 +317,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
         self.inner.write(bs).await
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner.pipe(size, s).await
     }
 
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 4fec394de..70aae731e 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -679,7 +679,7 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner
             .pipe(size, s)
             .await
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 536860cde..d5c8a0738 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -34,7 +34,6 @@ use backon::Retryable;
 use bytes::Bytes;
 use futures::FutureExt;
 use log::warn;
-use tokio::sync::Mutex;
 
 use crate::raw::oio::PageOperation;
 use crate::raw::oio::ReadOperation;
@@ -919,8 +918,8 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
     /// The overhead is constant, which means the overhead will not increase 
with the size of
     /// stream. For example, if every `next` call cost 1ms, then the overhead 
will only take 0.005%
     /// which is acceptable.
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        let s = Arc::new(Mutex::new(s));
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        let s = oio::into_cloneable_reader_within_tokio(s);
 
         let mut backoff = self.builder.build();
 
@@ -932,13 +931,14 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
                     None => return Err(e),
                     Some(dur) => {
                         {
-                            use oio::StreamExt;
+                            use oio::ReadExt;
 
+                            let s = s.clone().into_inner();
                             let mut stream = s.lock().await;
-                            // Try to reset this stream.
+                            // Try to reset this reader.
                             //
-                            // If error happened, we will return the sink 
error directly and stop retry.
-                            if stream.reset().await.is_err() {
+                            // If error happened, we will return the pipe 
error directly and stop retry.
+                            if 
stream.seek(io::SeekFrom::Start(0)).await.is_err() {
                                 return Err(e);
                             }
                         }
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index aea598c3a..d821aacbf 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -33,7 +33,6 @@ use governor::NegativeMultiDecision;
 use governor::Quota;
 use governor::RateLimiter;
 
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -242,7 +241,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner.pipe(size, s).await
     }
 
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 421202023..642b8a49b 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -335,7 +335,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
             })?
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         let timeout = self.io_timeout(size);
 
         tokio::time::timeout(timeout, self.inner.pipe(size, s))
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 002fe314f..467d0a153 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -332,7 +332,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner.pipe(size, s).await
     }
 
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index f3406b8ae..ce9ad3005 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -397,7 +397,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index aeaab9864..494118e84 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -410,7 +410,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 796b2a349..88a713068 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -343,6 +343,20 @@ impl oio::Stream for ChunkedCursor {
     }
 }
 
+impl oio::Read for ChunkedCursor {
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        todo!()
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        todo!()
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        todo!()
+    }
+}
+
 /// VectorCursor is the cursor for [`Vec<Bytes>`] that implements 
[`oio::Stream`]
 pub struct VectorCursor {
     inner: VecDeque<Bytes>,
diff --git a/core/src/raw/oio/read/cloneable_read.rs 
b/core/src/raw/oio/read/cloneable_read.rs
new file mode 100644
index 000000000..2ccd50042
--- /dev/null
+++ b/core/src/raw/oio/read/cloneable_read.rs
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use bytes::Bytes;
+use std::io::SeekFrom;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+/// Convert given reader into a wrapper with `std::sync::Mutex` for `Send + 
Sync + Clone`.
+pub fn into_cloneable_reader_within_std<R>(reader: R) -> 
CloneableReaderWithinStd<R> {
+    CloneableReaderWithinStd(Arc::new(std::sync::Mutex::new(reader)))
+}
+
+/// CloneableReaderWithinStd is a Send + Sync + Clone with `std::sync::Mutex` 
wrapper of input
+/// reader.
+///
+/// Caller can clone this reader but only one thread can calling `oio::Read` 
API at the
+/// same time, otherwise, we will return error if lock block happened.
+pub struct CloneableReaderWithinStd<R>(Arc<std::sync::Mutex<R>>);
+
+impl<R> CloneableReaderWithinStd<R> {
+    /// Consume self to get inner reader.
+    pub fn into_inner(self) -> Arc<std::sync::Mutex<R>> {
+        self.0
+    }
+}
+
+impl<R> Clone for CloneableReaderWithinStd<R> {
+    fn clone(&self) -> Self {
+        Self(self.0.clone())
+    }
+}
+
+impl<R: oio::Read> oio::Read for CloneableReaderWithinStd<R> {
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_read(cx, buf),
+            Err(_) => Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            ))),
+        }
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_seek(cx, pos),
+            Err(_) => Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            ))),
+        }
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_next(cx),
+            Err(_) => Poll::Ready(Some(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            )))),
+        }
+    }
+}
+
+/// Convert given reader into a wrapper with `tokio::sync::Mutex` for `Send + 
Sync + Clone`.
+pub fn into_cloneable_reader_within_tokio<R>(reader: R) -> 
CloneableReaderWithinTokio<R> {
+    CloneableReaderWithinTokio(Arc::new(tokio::sync::Mutex::new(reader)))
+}
+
+/// CloneableReaderWithinTokio is a Send + Sync + Clone with 
`tokio::sync::Mutex` wrapper of input
+/// reader.
+///
+/// Caller can clone this reader but only one thread can calling `oio::Read` 
API at the
+/// same time, otherwise, we will return error if lock block happened.
+pub struct CloneableReaderWithinTokio<R>(Arc<tokio::sync::Mutex<R>>);
+
+impl<R> CloneableReaderWithinTokio<R> {
+    /// Consume self to get inner reader.
+    pub fn into_inner(self) -> Arc<tokio::sync::Mutex<R>> {
+        self.0
+    }
+}
+
+impl<R> Clone for CloneableReaderWithinTokio<R> {
+    fn clone(&self) -> Self {
+        Self(self.0.clone())
+    }
+}
+
+impl<R: oio::Read> oio::Read for CloneableReaderWithinTokio<R> {
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_read(cx, buf),
+            Err(_) => Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            ))),
+        }
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_seek(cx, pos),
+            Err(_) => Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            ))),
+        }
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        match self.0.try_lock() {
+            Ok(mut this) => this.poll_next(cx),
+            Err(_) => Poll::Ready(Some(Err(Error::new(
+                ErrorKind::Unexpected,
+                "the cloneable reader is expected to have only one owner, but 
it's not",
+            )))),
+        }
+    }
+}
diff --git a/core/src/raw/oio/read/into_read_from_file.rs 
b/core/src/raw/oio/read/into_read_from_file.rs
index 96e80f515..6cc28f2d8 100644
--- a/core/src/raw/oio/read/into_read_from_file.rs
+++ b/core/src/raw/oio/read/into_read_from_file.rs
@@ -41,7 +41,7 @@ pub fn into_read_from_file<R>(fd: R, start: u64, end: u64) -> 
FromFileReader<R>
     }
 }
 
-/// FdReader is a wrapper of input fd to implement [`oio::Read`].
+/// FromFileReader is a wrapper of input fd to implement [`oio::Read`].
 pub struct FromFileReader<R> {
     inner: R,
 
diff --git a/core/src/raw/oio/read/into_read_from_stream.rs 
b/core/src/raw/oio/read/into_read_from_stream.rs
new file mode 100644
index 000000000..0a231de2b
--- /dev/null
+++ b/core/src/raw/oio/read/into_read_from_stream.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use bytes::{Buf, Bytes};
+use futures::StreamExt;
+use std::io::SeekFrom;
+use std::task::{Context, Poll};
+
+/// Convert given stream `futures::Stream<Item = Result<Bytes>>` into 
[`oio::Reader`].
+pub fn into_read_from_stream<S>(stream: S) -> FromStreamReader<S> {
+    FromStreamReader {
+        inner: stream,
+        buf: Bytes::new(),
+    }
+}
+
+/// FromStreamReader will convert a `futures::Stream<Item = Result<Bytes>>` 
into `oio::Read`
+pub struct FromStreamReader<S> {
+    inner: S,
+    buf: Bytes,
+}
+
+impl<S, T> oio::Read for FromStreamReader<S>
+where
+    S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static,
+    T: Into<Bytes>,
+{
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        if !self.buf.is_empty() {
+            let len = std::cmp::min(buf.len(), self.buf.len());
+            buf[..len].copy_from_slice(&self.buf[..len]);
+            self.buf.advance(len);
+            return Poll::Ready(Ok(len));
+        }
+
+        match futures::ready!(self.inner.poll_next_unpin(cx)) {
+            Some(Ok(bytes)) => {
+                let bytes = bytes.into();
+                let len = std::cmp::min(buf.len(), bytes.len());
+                buf[..len].copy_from_slice(&bytes[..len]);
+                self.buf = bytes.slice(len..);
+                Poll::Ready(Ok(len))
+            }
+            Some(Err(err)) => Poll::Ready(Err(err)),
+            None => Poll::Ready(Ok(0)),
+        }
+    }
+
+    fn poll_seek(&mut self, _: &mut Context<'_>, _: SeekFrom) -> 
Poll<Result<u64>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "FromStreamReader can't support operation",
+        )))
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if !self.buf.is_empty() {
+            return Poll::Ready(Some(Ok(std::mem::take(&mut self.buf))));
+        }
+
+        match futures::ready!(self.inner.poll_next_unpin(cx)) {
+            Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.into()))),
+            Some(Err(err)) => Poll::Ready(Some(Err(err))),
+            None => Poll::Ready(None),
+        }
+    }
+}
diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs
index 64466bbdd..841dfdd1a 100644
--- a/core/src/raw/oio/read/mod.rs
+++ b/core/src/raw/oio/read/mod.rs
@@ -34,3 +34,13 @@ pub use into_seekable_read_by_range::ByRangeSeekableReader;
 mod into_read_from_file;
 pub use into_read_from_file::into_read_from_file;
 pub use into_read_from_file::FromFileReader;
+
+mod into_read_from_stream;
+pub use into_read_from_stream::into_read_from_stream;
+pub use into_read_from_stream::FromStreamReader;
+
+mod cloneable_read;
+pub use cloneable_read::into_cloneable_reader_within_std;
+pub use cloneable_read::into_cloneable_reader_within_tokio;
+pub use cloneable_read::CloneableReaderWithinStd;
+pub use cloneable_read::CloneableReaderWithinTokio;
diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs
index 132fb78fc..9edfceb6a 100644
--- a/core/src/raw/oio/stream/api.rs
+++ b/core/src/raw/oio/stream/api.rs
@@ -69,6 +69,18 @@ impl<T: Stream + ?Sized> Stream for Box<T> {
     }
 }
 
+impl Stream for dyn raw::oio::Read {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        raw::oio::Read::poll_next(self, cx)
+    }
+
+    fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        let _ = raw::oio::Read::poll_seek(self, cx, 
std::io::SeekFrom::Start(0))?;
+
+        Poll::Ready(Ok(()))
+    }
+}
+
 impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> {
     fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
         match self.try_lock() {
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index 2bdcd426b..ba5a2b675 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -96,7 +96,7 @@ pub trait Write: Unpin + Send + Sync {
     ///
     /// It's possible that `n < size`, caller should pass the remaining bytes
     /// repeatedly until all bytes has been written.
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64>;
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64>;
 
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
@@ -113,7 +113,7 @@ impl Write for () {
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
-    async fn pipe(&mut self, _: u64, _: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _: u64, _: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support sink",
@@ -144,7 +144,7 @@ impl<T: Write + ?Sized> Write for Box<T> {
         (**self).write(bs).await
     }
 
-    async fn pipe(&mut self, n: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, n: u64, s: oio::Reader) -> Result<u64> {
         (**self).pipe(n, s).await
     }
 
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index 473d4feac..0e5fd9ed3 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -18,7 +18,6 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -92,11 +91,11 @@ where
         Ok(size)
     }
 
-    async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         let offset = self.offset().await?;
 
         self.inner
-            .append(offset, size, AsyncBody::Stream(s))
+            .append(offset, size, AsyncBody::Stream(Box::new(s)))
             .await
             .map(|_| self.offset = Some(offset + size))?;
 
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
index 62159ea28..99c20ee46 100644
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -18,8 +18,6 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 
-use crate::raw::oio::StreamExt;
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -92,34 +90,8 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
             })
     }
 
-    async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
-        // If total size is known and equals to given stream, we can write it 
directly.
-        if let Some(total_size) = self.total_size {
-            if total_size == size {
-                return self.inner.pipe(size, s).await;
-            }
-        }
-
-        // Push the bytes into the buffer if the buffer is not full.
-        if self.buffer.len() as u64 + size < self.buffer_size as u64 {
-            let bs = s.collect().await?;
-            let size = bs.len() as u64;
-            self.buffer.push(bs);
-            return Ok(size);
-        }
-
-        let buf = self.buffer.clone();
-        let buffer_size = buf.len() as u64;
-        let stream = buf.chain(s);
-
-        self.inner
-            .pipe(buffer_size + size, Box::new(stream))
-            .await
-            // Clear buffer if the write is successful.
-            .map(|v| {
-                self.buffer.clear();
-                v
-            })
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        todo!()
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index 287db9bad..05dfdd775 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -41,7 +41,6 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -64,7 +63,7 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for 
TwoWaysWriter<ONE, TWO> {
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         match self {
             Self::One(one) => one.pipe(size, s).await,
             Self::Two(two) => two.pipe(size, s).await,
@@ -110,7 +109,7 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> 
oio::Write
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         match self {
             Self::One(one) => one.pipe(size, s).await,
             Self::Two(two) => two.pipe(size, s).await,
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 7c597e7d1..e77330e0b 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -92,50 +92,8 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
     /// # TODO
     ///
     /// We know every stream size, we can collect them into a buffer without 
chain them every time.
-    async fn pipe(&mut self, _: u64, mut s: Streamer) -> Result<u64> {
-        if self.buffer.len() >= self.buffer_size {
-            let mut buf = self.buffer.clone();
-            let to_write = buf.split_to(self.buffer_size);
-            return self
-                .inner
-                .pipe(to_write.len() as u64, Box::new(to_write))
-                .await
-                // Replace buffer with remaining if the write is successful.
-                .map(|v| {
-                    self.buffer = buf;
-                    self.chain_stream(s);
-                    v
-                });
-        }
-
-        let mut buf = self.buffer.clone();
-        while buf.len() < self.buffer_size {
-            let bs = self.next_bytes(&mut s).await.transpose()?;
-            match bs {
-                None => break,
-                Some(bs) => buf.push(bs),
-            }
-        }
-
-        // Return directly if the buffer is not full.
-        //
-        // We don't need to chain stream here because it must be consumed.
-        if buf.len() < self.buffer_size {
-            let size = buf.len() as u64;
-            self.buffer = buf;
-            return Ok(size);
-        }
-
-        let to_write = buf.split_to(self.buffer_size);
-        self.inner
-            .pipe(to_write.len() as u64, Box::new(to_write))
-            .await
-            // Replace buffer with remaining if the write is successful.
-            .map(|v| {
-                self.buffer = buf;
-                self.chain_stream(s);
-                v
-            })
+    async fn pipe(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
+        todo!()
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -187,6 +145,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
 
 #[cfg(test)]
 mod tests {
+    use futures::AsyncReadExt;
     use log::debug;
     use pretty_assertions::assert_eq;
     use rand::thread_rng;
@@ -196,7 +155,6 @@ mod tests {
     use sha2::Sha256;
 
     use super::*;
-    use crate::raw::oio::StreamExt;
     use crate::raw::oio::Write;
 
     struct MockWriter {
@@ -212,10 +170,11 @@ mod tests {
             Ok(bs.len() as u64)
         }
 
-        async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
-            let bs = s.collect().await?;
+        async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> 
{
+            let mut bs = vec![];
+            s.read_to_end(&mut bs).await?;
             assert_eq!(bs.len() as u64, size);
-            self.write(bs).await
+            self.write(bs.into()).await
         }
 
         async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 232550607..0b314c163 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -138,11 +138,16 @@ where
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         let upload_id = self.upload_id().await?;
 
         self.inner
-            .write_part(&upload_id, self.parts.len(), size, 
AsyncBody::Stream(s))
+            .write_part(
+                &upload_id,
+                self.parts.len(),
+                size,
+                AsyncBody::Stream(Box::new(s)),
+            )
             .await
             .map(|v| self.parts.push(v))?;
 
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index 11dbb501e..a85feecf3 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -58,8 +58,8 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
         Ok(size)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.inner.write_once(size, s).await?;
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.write_once(size, Box::new(s)).await?;
         Ok(size)
     }
 
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index dfd4e378a..11776755f 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -180,9 +180,10 @@ impl oio::Write for AzblobWriter {
         Ok(size)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         if self.op.append() {
-            self.append_oneshot(size, AsyncBody::Stream(s)).await?;
+            self.append_oneshot(size, AsyncBody::Stream(Box::new(s)))
+                .await?;
         } else {
             if self.op.content_length().is_none() {
                 return Err(Error::new(
@@ -191,7 +192,8 @@ impl oio::Write for AzblobWriter {
                 ));
             }
 
-            self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+            self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
+                .await?;
         }
 
         Ok(size)
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index a03cec2a1..4b9b90f9f 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -88,7 +88,7 @@ impl oio::Write for AzdfsWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 1405fe224..37abab6a9 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for DropboxWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 80637f068..ab27f2b65 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -67,7 +67,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
         Ok(size)
     }
 
-    async fn pipe(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> {
         while let Some(bs) = s.next().await {
             let bs = bs?;
             self.f
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 3ce495795..3bb11582d 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -55,7 +55,7 @@ impl oio::Write for FtpWriter {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 305624ed8..e85605fa4 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -167,8 +167,9 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
+            .await?;
         Ok(size)
     }
 
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index cfe027b6c..df974d940 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -106,7 +106,7 @@ impl oio::Write for GdriveWriter {
         Ok(size)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
     }
 
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 59084c70e..bf9116ec2 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for GhacWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 4b05c08de..3b60a4df3 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 108f67e39..1434e980a 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for IpmfsWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index edf55c127..75a5e023a 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for OneDriveWriter {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 9f9e1f3dc..5ee5d84ac 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -43,7 +43,7 @@ impl oio::Write for SftpWriter {
         Ok(size)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index ee3ee8251..36a572bfe 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -77,7 +77,7 @@ impl oio::Write for SupabaseWriter {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index b2cf603de..936cddf01 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for VercelArtifactsWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index c0ddf30b1..a39c835dc 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -65,7 +65,7 @@ impl oio::Write for WasabiWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 1b6e7cfa5..bbe79eb31 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -70,8 +70,9 @@ impl oio::Write for WebdavWriter {
         Ok(size)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
+            .await?;
 
         Ok(size)
     }
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index dbb4b409d..ae3396dc3 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -64,7 +64,7 @@ impl oio::Write for WebhdfsWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index eef54f164..212aba528 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -138,8 +138,8 @@ impl Writer {
         T: Into<Bytes>,
     {
         if let State::Idle(Some(w)) = &mut self.state {
-            let s = Box::new(oio::into_stream(sink_from.map_ok(|v| v.into())));
-            w.pipe(size, s).await
+            let r = Box::new(oio::into_read_from_stream(sink_from.map_ok(|v| 
v.into())));
+            w.pipe(size, r).await
         } else {
             unreachable!(
                 "writer state invalid while sink, expect Idle, actual {}",
@@ -180,11 +180,11 @@ impl Writer {
     /// ```
     pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<u64>
     where
-        R: futures::AsyncRead + Send + Sync + Unpin + 'static,
+        R: futures::AsyncRead + futures::AsyncSeek + Send + Sync + Unpin + 
'static,
     {
         if let State::Idle(Some(w)) = &mut self.state {
-            let s = Box::new(oio::into_stream_from_reader(read_from));
-            w.pipe(size, s).await
+            let r = Box::new(oio::into_read_from_file(read_from, 0, size));
+            w.pipe(size, r).await
         } else {
             unreachable!(
                 "writer state invalid while copy, expect Idle, actual {}",


Reply via email to