This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 84539ecd2b2382d229e02372d441cf2aadc13850
Author: Xuanwo <[email protected]>
AuthorDate: Tue Aug 29 20:32:01 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/utils.rs                          |  2 +-
 core/benches/oio/write.rs                          | 18 ++++++----------
 core/src/layers/blocking.rs                        |  6 ++----
 core/src/layers/complete.rs                        | 13 +++++------
 core/src/layers/concurrent_limit.rs                |  4 ++--
 core/src/layers/error_context.rs                   |  4 ++--
 core/src/layers/logging.rs                         |  5 +++--
 core/src/layers/madsim.rs                          |  2 +-
 core/src/layers/metrics.rs                         |  5 +++--
 core/src/layers/minitrace.rs                       |  4 ++--
 core/src/layers/oteltrace.rs                       |  4 ++--
 core/src/layers/prometheus.rs                      |  5 +++--
 core/src/layers/retry.rs                           |  4 ++--
 core/src/layers/throttle.rs                        |  4 ++--
 core/src/layers/timeout.rs                         |  6 +++---
 core/src/layers/tracing.rs                         |  4 ++--
 core/src/raw/adapters/kv/backend.rs                |  2 +-
 core/src/raw/adapters/typed_kv/backend.rs          |  2 +-
 core/src/raw/http_util/multipart.rs                |  6 +++---
 core/src/raw/oio/cursor.rs                         |  4 ++--
 core/src/raw/oio/stream/api.rs                     | 19 ++++++----------
 core/src/raw/oio/stream/into_stream.rs             |  2 +-
 core/src/raw/oio/stream/into_stream_from_reader.rs |  2 +-
 core/src/raw/oio/write/api.rs                      | 15 ++++---------
 core/src/raw/oio/write/append_object_write.rs      |  7 +++---
 core/src/raw/oio/write/at_least_buf_write.rs       | 16 ++++++--------
 core/src/raw/oio/write/compose_write.rs            | 14 ++++++------
 core/src/raw/oio/write/exact_buf_write.rs          | 25 ++++++++--------------
 core/src/raw/oio/write/multipart_upload_write.rs   |  7 +++---
 core/src/raw/oio/write/one_shot_write.rs           |  6 +++---
 core/src/services/azblob/writer.rs                 |  7 +++---
 core/src/services/azdfs/writer.rs                  |  2 +-
 core/src/services/cos/writer.rs                    | 17 +++++++++------
 core/src/services/dropbox/writer.rs                |  2 +-
 core/src/services/fs/writer.rs                     |  2 +-
 core/src/services/ftp/writer.rs                    |  2 +-
 core/src/services/gcs/writer.rs                    |  5 +++--
 core/src/services/gdrive/writer.rs                 |  2 +-
 core/src/services/ghac/writer.rs                   |  2 +-
 core/src/services/hdfs/writer.rs                   |  2 +-
 core/src/services/ipmfs/writer.rs                  |  2 +-
 core/src/services/obs/writer.rs                    | 17 +++++++++------
 core/src/services/onedrive/writer.rs               |  2 +-
 core/src/services/oss/writer.rs                    | 18 ++++++++++------
 core/src/services/s3/writer.rs                     | 19 +++++++++-------
 core/src/services/sftp/writer.rs                   |  2 +-
 core/src/services/supabase/writer.rs               |  2 +-
 core/src/services/vercel_artifacts/writer.rs       |  2 +-
 core/src/services/wasabi/writer.rs                 |  2 +-
 core/src/services/webdav/writer.rs                 |  5 +++--
 core/src/services/webhdfs/writer.rs                |  2 +-
 core/src/types/operator/operator.rs                |  3 +--
 core/src/types/writer.rs                           | 13 +++++------
 53 files changed, 171 insertions(+), 178 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index e90c439ca..efbf15330 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -27,7 +27,7 @@ pub struct BlackHoleWriter;
 
 #[async_trait]
 impl oio::Write for BlackHoleWriter {
-    async fn write(&mut self, _: u64, _: Streamer) -> opendal::Result<()> {
+    async fn write(&mut self, _: Streamer) -> opendal::Result<()> {
         Ok(())
     }
 
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 2fd5392f5..ac0ed15dc 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -47,12 +47,9 @@ pub fn bench_at_least_buf_write(c: &mut Criterion) {
             b.to_async(&*TOKIO).iter(|| async {
                 let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024);
 
-                w.write(
-                    content.len() as u64,
-                    Box::new(oio::Cursor::from(content.clone())),
-                )
-                .await
-                .unwrap();
+                w.write(Box::new(oio::Cursor::from(content.clone())))
+                    .await
+                    .unwrap();
                 w.close().await.unwrap();
             })
         });
@@ -78,12 +75,9 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
         group.bench_with_input(size.to_string(), &content, |b, content| {
             b.to_async(&*TOKIO).iter(|| async {
                 let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024);
-                w.write(
-                    content.len() as u64,
-                    Box::new(oio::Cursor::from(content.clone())),
-                )
-                .await
-                .unwrap();
+                w.write(Box::new(oio::Cursor::from(content.clone())))
+                    .await
+                    .unwrap();
                 w.close().await.unwrap();
             })
         });
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index f5f264395..41c3e75d2 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -197,10 +197,8 @@ impl<I: oio::Read + 'static> oio::BlockingRead for 
BlockingWrapper<I> {
 
 impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
     fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.handle.block_on(
-            self.inner
-                .write(bs.len() as u64, Box::new(oio::Cursor::from(bs))),
-        )
+        self.handle
+            .block_on(self.inner.write(Box::new(oio::Cursor::from(bs))))
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index d0d39784c..65ada747f 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -25,13 +25,13 @@ use std::task::Poll;
 use async_trait::async_trait;
 use bytes::Bytes;
 
-use crate::raw::oio::into_flat_page;
 use crate::raw::oio::into_hierarchy_page;
 use crate::raw::oio::ByRangeSeekableReader;
 use crate::raw::oio::Entry;
 use crate::raw::oio::FlatPager;
 use crate::raw::oio::HierarchyPager;
 use crate::raw::oio::StreamableReader;
+use crate::raw::oio::{into_flat_page, Stream};
 use crate::raw::*;
 use crate::*;
 
@@ -711,14 +711,14 @@ impl<W> oio::Write for CompleteWriter<W>
 where
     W: oio::Write,
 {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
         if let Some(total_size) = self.size {
-            if self.written + size > total_size {
+            if self.written + s.size() > total_size {
                 return Err(Error::new(
                     ErrorKind::ContentTruncated,
                     &format!(
-                        "writer got too much data, expect: {size}, actual: {}",
-                        self.written + size
+                        "writer got too much data, expect: {total_size}, 
actual: {}",
+                        self.written + s.size()
                     ),
                 ));
             }
@@ -727,7 +727,8 @@ where
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        w.write(size, s).await?;
+        let size = s.size();
+        w.write(s).await?;
         self.written += size;
         Ok(())
     }
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index e6521cfbb..d08bcd6b3 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -285,8 +285,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ConcurrentLimitWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.write(size, s).await
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        self.inner.write(s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 0e588fddc..adc768a2b 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -411,8 +411,8 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.write(size, s).await.map_err(|err| {
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        self.inner.write(s).await.map_err(|err| {
             err.with_operation(WriteOperation::Sink)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 316a2c073..b1fc1ade4 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1252,8 +1252,9 @@ impl<W> LoggingWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        match self.inner.write(size, s).await {
+    async fn write(&mut self, mut s: oio::Streamer) -> Result<()> {
+        let size = s.size();
+        match self.inner.write(s).await {
             Ok(_) => {
                 self.written += size;
                 trace!(
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 9ca4105ff..eee47e10e 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -302,7 +302,7 @@ pub struct MadsimWriter {
 
 #[async_trait]
 impl oio::Write for MadsimWriter {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> 
crate::Result<()> {
+    async fn write(&mut self, s: oio::Streamer) -> crate::Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "will be supported in the future",
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 580e42f6b..0b7ceb872 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -847,9 +847,10 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        let size = s.size();
         self.inner
-            .write(size, s)
+            .write(s)
             .await
             .map(|_| self.bytes += size)
             .map_err(|err| {
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index b40d567c9..a5355460b 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,9 +337,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
         self.inner
-            .write(size, s)
+            .write(s)
             .in_span(Span::enter_with_parent(
                 WriteOperation::Sink.into_static(),
                 &self.span,
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 903ab098d..f0a8240fd 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -313,8 +313,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
OtelTraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.write(size, s).await
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        self.inner.write(s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 82f249f03..68194a67b 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -662,9 +662,10 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        let size = s.size();
         self.inner
-            .write(size, s)
+            .write(s)
             .await
             .map(|_| {
                 self.stats
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 38511f527..85cf1b2a0 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -893,13 +893,13 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
     /// The overhead is constant, which means the overhead will not increase 
with the size of
     /// stream. For example, if every `next` call cost 1ms, then the overhead 
will only take 0.005%
     /// which is acceptable.
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
         let s = Arc::new(Mutex::new(s));
 
         let mut backoff = self.builder.build();
 
         loop {
-            match self.inner.write(size, Box::new(s.clone())).await {
+            match self.inner.write(Box::new(s.clone())).await {
                 Ok(_) => return Ok(()),
                 Err(e) if !e.is_temporary() => return Err(e),
                 Err(e) => match backoff.next() {
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 3429c14a4..28a5f8351 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -217,8 +217,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ThrottleWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
-    async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
-        self.inner.write(size, s).await
+    async fn write(&mut self, s: Streamer) -> Result<()> {
+        self.inner.write(s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 0dfec0212..802940ecf 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -322,10 +322,10 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        let timeout = self.io_timeout(size);
+    async fn write(&mut self, mut s: oio::Streamer) -> Result<()> {
+        let timeout = self.io_timeout(s.size());
 
-        tokio::time::timeout(timeout, self.inner.write(size, s))
+        tokio::time::timeout(timeout, self.inner.write(s))
             .await
             .map_err(|_| {
                 Error::new(ErrorKind::Unexpected, "operation timeout")
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index c11574dba..b1c248974 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -324,8 +324,8 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.write(size, s).await
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        self.inner.write(s).await
     }
 
     #[tracing::instrument(
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index da402e86b..8ed445db6 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -389,7 +389,7 @@ impl<S> KvWriter<S> {
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 6500a2f1a..fad9373a7 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -402,7 +402,7 @@ impl<S> KvWriter<S> {
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/http_util/multipart.rs 
b/core/src/raw/http_util/multipart.rs
index 6ec4e9c09..77b0709f0 100644
--- a/core/src/raw/http_util/multipart.rs
+++ b/core/src/raw/http_util/multipart.rs
@@ -182,7 +182,7 @@ pub struct MultipartStream<T: Part> {
 }
 
 impl<T: Part> Stream for MultipartStream<T> {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         self.size
     }
 
@@ -338,7 +338,7 @@ pub struct FormDataPartStream {
 
 #[async_trait]
 impl Stream for FormDataPartStream {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         self.size
     }
 
@@ -701,7 +701,7 @@ pub struct MixedPartStream {
 }
 
 impl Stream for MixedPartStream {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         self.size
     }
 
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 74f97ef4d..5e7a665f0 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -161,7 +161,7 @@ impl oio::BlockingRead for Cursor {
 }
 
 impl oio::Stream for Cursor {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         self.inner.len() as u64 - self.pos
     }
 
@@ -331,7 +331,7 @@ impl ChunkedCursor {
 }
 
 impl oio::Stream for ChunkedCursor {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         self.len() as u64
     }
 
diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs
index fcf7b8b5c..79946d847 100644
--- a/core/src/raw/oio/stream/api.rs
+++ b/core/src/raw/oio/stream/api.rs
@@ -37,12 +37,7 @@ pub type Streamer = Box<dyn Stream>;
 /// `Unpin` + `Send` + `Sync`. And the item is `Result<Bytes>`.
 pub trait Stream: Unpin + Send + Sync {
     /// Fetch remaining size of this stream.
-    ///
-    /// # NOTES
-    ///
-    /// It's by design that we take `&mut self` here to make sure we don't 
have other
-    /// threads reading the same stream at the same time.
-    fn size(&mut self) -> u64;
+    fn size(&self) -> u64;
 
     /// Fetch next item `Result<Bytes>` from the stream.
     fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>>;
@@ -52,7 +47,7 @@ pub trait Stream: Unpin + Send + Sync {
 }
 
 impl Stream for () {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         unimplemented!("size is required to be implemented for oio::Stream")
     }
 
@@ -72,7 +67,7 @@ impl Stream for () {
 /// `Box<dyn Stream>` won't implement `Stream` automatically.
 /// To make Streamer work as expected, we must add this impl.
 impl<T: Stream + ?Sized> Stream for Box<T> {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         (**self).size()
     }
 
@@ -86,7 +81,7 @@ impl<T: Stream + ?Sized> Stream for Box<T> {
 }
 
 impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         match self.try_lock() {
             Ok(mut this) => this.size(),
             Err(_) => panic!("the stream is expected to have only one 
consumer, but it's not"),
@@ -115,7 +110,7 @@ impl<T: Stream + ?Sized> Stream for 
Arc<std::sync::Mutex<T>> {
 }
 
 impl<T: Stream + ?Sized> Stream for Arc<tokio::sync::Mutex<T>> {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         match self.try_lock() {
             Ok(mut this) => this.size(),
             Err(_) => panic!("the stream is expected to have only one 
consumer, but it's not"),
@@ -238,8 +233,8 @@ pub struct Chain<S1: Stream, S2: Stream> {
 }
 
 impl<S1: Stream, S2: Stream> Stream for Chain<S1, S2> {
-    fn size(&mut self) -> u64 {
-        self.first.as_mut().map(|v| v.size()).unwrap_or_default() + 
self.second.size()
+    fn size(&self) -> u64 {
+        self.first.as_ref().map(|v| v.size()).unwrap_or_default() + 
self.second.size()
     }
 
     fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
diff --git a/core/src/raw/oio/stream/into_stream.rs 
b/core/src/raw/oio/stream/into_stream.rs
index 81e2301ae..6b073ffd1 100644
--- a/core/src/raw/oio/stream/into_stream.rs
+++ b/core/src/raw/oio/stream/into_stream.rs
@@ -44,7 +44,7 @@ impl<S> oio::Stream for IntoStream<S>
 where
     S: futures::Stream<Item = Result<Bytes>> + Send + Sync + Unpin,
 {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         self.size
     }
 
diff --git a/core/src/raw/oio/stream/into_stream_from_reader.rs 
b/core/src/raw/oio/stream/into_stream_from_reader.rs
index a038695ef..958d4b91c 100644
--- a/core/src/raw/oio/stream/into_stream_from_reader.rs
+++ b/core/src/raw/oio/stream/into_stream_from_reader.rs
@@ -54,7 +54,7 @@ impl<S> oio::Stream for FromReaderStream<S>
 where
     S: AsyncRead + Send + Sync + Unpin,
 {
-    fn size(&mut self) -> u64 {
+    fn size(&self) -> u64 {
         self.size
     }
 
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index ba4705679..12d6ea3c4 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -88,14 +88,7 @@ pub type Writer = Box<dyn Write>;
 #[async_trait]
 pub trait Write: Unpin + Send + Sync {
     /// Sink given stream into writer.
-    ///
-    /// # Notes
-    ///
-    /// It's possible that the given bs length is less than the total
-    /// content length. And users will call write multiple times.
-    ///
-    /// Please make sure `write` is safe to re-enter.
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()>;
+    async fn write(&mut self, s: oio::Streamer) -> Result<()>;
 
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
@@ -106,7 +99,7 @@ pub trait Write: Unpin + Send + Sync {
 
 #[async_trait]
 impl Write for () {
-    async fn write(&mut self, _: u64, _: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support sink",
@@ -133,8 +126,8 @@ impl Write for () {
 /// To make Writer work as expected, we must add this impl.
 #[async_trait]
 impl<T: Write + ?Sized> Write for Box<T> {
-    async fn write(&mut self, n: u64, s: oio::Streamer) -> Result<()> {
-        (**self).write(n, s).await
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        (**self).write(s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index d1380c584..f4fffc704 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -17,7 +17,7 @@
 
 use async_trait::async_trait;
 
-use crate::raw::oio::Streamer;
+use crate::raw::oio::{Stream, Streamer};
 use crate::raw::*;
 use crate::*;
 
@@ -78,11 +78,12 @@ impl<W> oio::Write for AppendObjectWriter<W>
 where
     W: AppendObjectWrite,
 {
-    async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn write(&mut self, s: Streamer) -> Result<()> {
         let offset = self.offset().await?;
 
+        let size = s.size();
         self.inner
-            .append(offset, size, AsyncBody::Stream(s))
+            .append(offset, s.size(), AsyncBody::Stream(s))
             .await
             .map(|_| self.offset = Some(offset + size))
     }
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
index 2c269cc25..87c5a523f 100644
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -17,8 +17,8 @@
 
 use async_trait::async_trait;
 
-use crate::raw::oio::StreamExt;
 use crate::raw::oio::Streamer;
+use crate::raw::oio::{Stream, StreamExt};
 use crate::raw::*;
 use crate::*;
 
@@ -63,16 +63,16 @@ impl<W: oio::Write> AtLeastBufWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
-    async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn write(&mut self, s: Streamer) -> Result<()> {
         // If total size is known and equals to given stream, we can write it 
directly.
         if let Some(total_size) = self.total_size {
-            if total_size == size {
-                return self.inner.write(size, s).await;
+            if total_size == s.size() {
+                return self.inner.write(s).await;
             }
         }
 
         // Push the bytes into the buffer if the buffer is not full.
-        if self.buffer.len() as u64 + size < self.buffer_size as u64 {
+        if self.buffer.len() as u64 + s.size() < self.buffer_size as u64 {
             self.buffer.push(s.collect().await?);
             return Ok(());
         }
@@ -82,7 +82,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
         let stream = buf.chain(s);
 
         self.inner
-            .write(buffer_size + size, Box::new(stream))
+            .write(Box::new(stream))
             .await
             // Clear buffer if the write is successful.
             .map(|_| self.buffer.clear())
@@ -95,9 +95,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
 
     async fn close(&mut self) -> Result<()> {
         if !self.buffer.is_empty() {
-            self.inner
-                .write(self.buffer.len() as u64, Box::new(self.buffer.clone()))
-                .await?;
+            self.inner.write(Box::new(self.buffer.clone())).await?;
             self.buffer.clear();
         }
 
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index 1cd7c6f72..bd6cc9b77 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -56,10 +56,10 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
 
 #[async_trait]
 impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
-    async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn write(&mut self, s: Streamer) -> Result<()> {
         match self {
-            Self::One(one) => one.write(size, s).await,
-            Self::Two(two) => two.write(size, s).await,
+            Self::One(one) => one.write(s).await,
+            Self::Two(two) => two.write(s).await,
         }
     }
 
@@ -94,11 +94,11 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, 
THREE: oio::Write> {
 impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
     for ThreeWaysWriter<ONE, TWO, THREE>
 {
-    async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn write(&mut self, s: Streamer) -> Result<()> {
         match self {
-            Self::One(one) => one.write(size, s).await,
-            Self::Two(two) => two.write(size, s).await,
-            Self::Three(three) => three.write(size, s).await,
+            Self::One(one) => one.write(s).await,
+            Self::Two(two) => two.write(s).await,
+            Self::Three(three) => three.write(s).await,
         }
     }
 
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 1d25751a6..0fa358c91 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -87,13 +87,13 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
     /// # TODO
     ///
     /// We know every stream size, we can collect them into a buffer without 
chain them every time.
-    async fn write(&mut self, _: u64, mut s: Streamer) -> Result<()> {
+    async fn write(&mut self, mut s: Streamer) -> Result<()> {
         if self.buffer.len() >= self.buffer_size {
             let mut buf = self.buffer.clone();
             let to_write = buf.split_to(self.buffer_size);
             return self
                 .inner
-                .write(to_write.len() as u64, Box::new(to_write))
+                .write(Box::new(to_write))
                 .await
                 // Replace buffer with remaining if the write is successful.
                 .map(|_| {
@@ -121,7 +121,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
 
         let to_write = buf.split_to(self.buffer_size);
         self.inner
-            .write(to_write.len() as u64, Box::new(to_write))
+            .write(Box::new(to_write))
             .await
             // Replace buffer with remaining if the write is successful.
             .map(|_| {
@@ -153,7 +153,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                 let mut buf = self.buffer.clone();
                 let to_write = buf.split_to(self.buffer_size);
                 self.inner
-                    .write(to_write.len() as u64, Box::new(to_write))
+                    .write(Box::new(to_write))
                     .await
                     // Replace buffer with remaining if the write is 
successful.
                     .map(|_| {
@@ -167,7 +167,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
             let to_write = buf.split_to(min(self.buffer_size, buf.len()));
 
             self.inner
-                .write(to_write.len() as u64, Box::new(to_write))
+                .write(Box::new(to_write))
                 .await
                 // Replace buffer with remaining if the write is successful.
                 .map(|_| self.buffer = buf)?;
@@ -197,9 +197,8 @@ mod tests {
 
     #[async_trait]
     impl Write for MockWriter {
-        async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
+        async fn write(&mut self, s: Streamer) -> Result<()> {
             let bs = s.collect().await?;
-            assert_eq!(bs.len() as u64, size);
             self.buf.extend_from_slice(&bs);
 
             Ok(())
@@ -228,11 +227,8 @@ mod tests {
 
         let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10);
 
-        w.write(
-            expected.len() as u64,
-            Box::new(oio::Cursor::from(Bytes::from(expected.clone()))),
-        )
-        .await?;
+        w.write(Box::new(oio::Cursor::from(Bytes::from(expected.clone()))))
+            .await?;
         w.close().await?;
 
         assert_eq!(w.inner.buf.len(), expected.len());
@@ -266,10 +262,7 @@ mod tests {
 
             expected.extend_from_slice(&content);
             writer
-                .write(
-                    expected.len() as u64,
-                    Box::new(oio::Cursor::from(Bytes::from(expected.clone()))),
-                )
+                
.write(Box::new(oio::Cursor::from(Bytes::from(expected.clone()))))
                 .await?;
         }
         writer.close().await?;
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index da8f01695..a0e3b3017 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -51,8 +51,7 @@ pub trait MultipartUploadWrite: Send + Sync + Unpin {
         &self,
         upload_id: &str,
         part_number: usize,
-        size: u64,
-        body: AsyncBody,
+        content: oio::Streamer,
     ) -> Result<MultipartUploadPart>;
 
     /// complete_part will complete the multipart upload to build the final
@@ -119,11 +118,11 @@ impl<W> oio::Write for MultipartUploadWriter<W>
 where
     W: MultipartUploadWrite,
 {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
         let upload_id = self.upload_id().await?;
 
         self.inner
-            .write_part(&upload_id, self.parts.len(), size, 
AsyncBody::Stream(s))
+            .write_part(&upload_id, self.parts.len(), s)
             .await
             .map(|v| self.parts.push(v))
     }
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index 971c59878..b9caae866 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -31,7 +31,7 @@ pub trait OneShotWrite: Send + Sync + Unpin {
     /// write_once write all data at once.
     ///
     /// Implementations should make sure that the data is written correctly at 
once.
-    async fn write_once(&self, size: u64, stream: oio::Streamer) -> Result<()>;
+    async fn write_once(&self, stream: oio::Streamer) -> Result<()>;
 }
 
 /// OneShotWrite is used to implement [`Write`] based on one shot.
@@ -48,8 +48,8 @@ impl<W: OneShotWrite> OneShotWriter<W> {
 
 #[async_trait]
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.write_once(size, s).await
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        self.inner.write_once(s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index 0b243072f..c2519294a 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -22,6 +22,7 @@ use http::StatusCode;
 
 use super::core::AzblobCore;
 use super::error::parse_error;
+use crate::raw::oio::Stream;
 use crate::raw::*;
 use crate::*;
 
@@ -160,9 +161,9 @@ impl AzblobWriter {
 
 #[async_trait]
 impl oio::Write for AzblobWriter {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
         if self.op.append() {
-            self.append_oneshot(size, AsyncBody::Stream(s)).await
+            self.append_oneshot(s.size(), AsyncBody::Stream(s)).await
         } else {
             if self.op.content_length().is_none() {
                 return Err(Error::new(
@@ -171,7 +172,7 @@ impl oio::Write for AzblobWriter {
                 ));
             }
 
-            self.write_oneshot(size, AsyncBody::Stream(s)).await
+            self.write_oneshot(s.size(), AsyncBody::Stream(s)).await
         }
     }
 
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index cf1c68cf4..e6ae3f84a 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -86,7 +86,7 @@ impl AzdfsWriter {
 
 #[async_trait]
 impl oio::Write for AzdfsWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index af2a6ebd0..29f63f78f 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -23,7 +23,7 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::Streamer;
+use crate::raw::oio::{Stream, Streamer};
 use crate::raw::*;
 use crate::*;
 
@@ -52,10 +52,10 @@ impl CosWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for CosWriter {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, stream: Streamer) -> Result<()> {
         let mut req = self.core.cos_put_object_request(
             &self.path,
-            Some(size),
+            Some(stream.size()),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
@@ -110,15 +110,20 @@ impl oio::MultipartUploadWrite for CosWriter {
         &self,
         upload_id: &str,
         part_number: usize,
-        size: u64,
-        body: AsyncBody,
+        s: Streamer,
     ) -> Result<oio::MultipartUploadPart> {
         // COS requires part number must between [1..=10000]
         let part_number = part_number + 1;
 
         let resp = self
             .core
-            .cos_upload_part_request(&self.path, upload_id, part_number, size, 
body)
+            .cos_upload_part_request(
+                &self.path,
+                upload_id,
+                part_number,
+                s.size(),
+                AsyncBody::Stream(s),
+            )
             .await?;
 
         let status = resp.status();
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index cea9cb199..72309ff0d 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -60,7 +60,7 @@ impl DropboxWriter {
 
 #[async_trait]
 impl oio::Write for DropboxWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 567e63bd9..395261c63 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -50,7 +50,7 @@ impl<F> FsWriter<F> {
 
 #[async_trait]
 impl oio::Write for FsWriter<tokio::fs::File> {
-    async fn write(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, mut s: oio::Streamer) -> Result<()> {
         while let Some(bs) = s.next().await {
             let bs = bs?;
             self.f
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 8c56f8e5d..6ac3f38c8 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -53,7 +53,7 @@ impl FtpWriter {
 
 #[async_trait]
 impl oio::Write for FtpWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index e7e68f320..9191d70fe 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -23,6 +23,7 @@ use http::StatusCode;
 
 use super::core::GcsCore;
 use super::error::parse_error;
+use crate::raw::oio::Stream;
 use crate::raw::*;
 use crate::*;
 
@@ -164,8 +165,8 @@ impl GcsWriter {
 
 #[async_trait]
 impl oio::Write for GcsWriter {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.write_oneshot(size, AsyncBody::Stream(s)).await
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        self.write_oneshot(s.size(), AsyncBody::Stream(s)).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index f4bc19b12..12dd1cc43 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -103,7 +103,7 @@ impl GdriveWriter {
 
 #[async_trait]
 impl oio::Write for GdriveWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
     }
 
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 2bdd44507..83bde0bb3 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,7 +62,7 @@ impl GhacWriter {
 
 #[async_trait]
 impl oio::Write for GhacWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 6c9c679dc..a3928ed13 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -58,7 +58,7 @@ impl HdfsWriter<hdrs::AsyncFile> {
 
 #[async_trait]
 impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index a876aac3a..2e2d4c18e 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -52,7 +52,7 @@ impl IpmfsWriter {
 
 #[async_trait]
 impl oio::Write for IpmfsWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index d3b1e119f..3b0e02e7c 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -23,8 +23,8 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::MultipartUploadPart;
 use crate::raw::oio::Streamer;
+use crate::raw::oio::{MultipartUploadPart, Stream};
 use crate::raw::*;
 use crate::*;
 
@@ -53,10 +53,10 @@ impl ObsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for ObsWriter {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, stream: Streamer) -> Result<()> {
         let mut req = self.core.obs_put_object_request(
             &self.path,
-            Some(size),
+            Some(stream.size()),
             self.op.content_type(),
             self.op.cache_control(),
             AsyncBody::Stream(stream),
@@ -105,15 +105,20 @@ impl oio::MultipartUploadWrite for ObsWriter {
         &self,
         upload_id: &str,
         part_number: usize,
-        size: u64,
-        body: AsyncBody,
+        stream: Streamer,
     ) -> Result<MultipartUploadPart> {
         // Obs service requires part number must between [1..=10000]
         let part_number = part_number + 1;
 
         let resp = self
             .core
-            .obs_upload_part_request(&self.path, upload_id, part_number, 
Some(size), body)
+            .obs_upload_part_request(
+                &self.path,
+                upload_id,
+                part_number,
+                Some(stream.size()),
+                AsyncBody::Stream(stream),
+            )
             .await?;
 
         let status = resp.status();
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index a320e3b87..219855dd3 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -56,7 +56,7 @@ impl OneDriveWriter {
 
 #[async_trait]
 impl oio::Write for OneDriveWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 27aa09011..2d965a7cf 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -23,7 +23,7 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::Streamer;
+use crate::raw::oio::{Stream, Streamer};
 use crate::raw::*;
 use crate::*;
 
@@ -52,10 +52,10 @@ impl OssWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for OssWriter {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, stream: Streamer) -> Result<()> {
         let mut req = self.core.oss_put_object_request(
             &self.path,
-            Some(size),
+            Some(stream.size()),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
@@ -112,15 +112,21 @@ impl oio::MultipartUploadWrite for OssWriter {
         &self,
         upload_id: &str,
         part_number: usize,
-        size: u64,
-        body: AsyncBody,
+        stream: Streamer,
     ) -> Result<oio::MultipartUploadPart> {
         // OSS requires part number must between [1..=10000]
         let part_number = part_number + 1;
 
         let resp = self
             .core
-            .oss_upload_part_request(&self.path, upload_id, part_number, 
false, size, body)
+            .oss_upload_part_request(
+                &self.path,
+                upload_id,
+                part_number,
+                false,
+                stream.size(),
+                AsyncBody::Stream(stream),
+            )
             .await?;
 
         let status = resp.status();
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index a27341d71..4f96fece7 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -23,7 +23,7 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::Streamer;
+use crate::raw::oio::{Stream, Streamer};
 use crate::raw::*;
 use crate::*;
 
@@ -49,10 +49,10 @@ impl S3Writer {
 
 #[async_trait]
 impl oio::OneShotWrite for S3Writer {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+    async fn write_once(&self, stream: Streamer) -> Result<()> {
         let mut req = self.core.s3_put_object_request(
             &self.path,
-            Some(size),
+            Some(stream.size()),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
@@ -107,15 +107,18 @@ impl oio::MultipartUploadWrite for S3Writer {
         &self,
         upload_id: &str,
         part_number: usize,
-        size: u64,
-        body: AsyncBody,
+        stream: Streamer,
     ) -> Result<oio::MultipartUploadPart> {
         // AWS S3 requires part number must between [1..=10000]
         let part_number = part_number + 1;
 
-        let mut req =
-            self.core
-                .s3_upload_part_request(&self.path, upload_id, part_number, 
size, body)?;
+        let mut req = self.core.s3_upload_part_request(
+            &self.path,
+            upload_id,
+            part_number,
+            stream.size(),
+            AsyncBody::Stream(stream),
+        )?;
 
         self.core.sign(&mut req).await?;
 
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index c36b02c3b..ad1447d10 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -42,7 +42,7 @@ impl SftpWriter {
 
 #[async_trait]
 impl oio::Write for SftpWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index 5c364ab8d..9eb736c73 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -75,7 +75,7 @@ impl SupabaseWriter {
 
 #[async_trait]
 impl oio::Write for SupabaseWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index 6cb65dca6..71f3e2606 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -60,7 +60,7 @@ impl VercelArtifactsWriter {
 
 #[async_trait]
 impl oio::Write for VercelArtifactsWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 1cf7b740f..04f9bcb80 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -63,7 +63,7 @@ impl WasabiWriter {
 
 #[async_trait]
 impl oio::Write for WasabiWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 2deb0bfec..5050b70cf 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -21,6 +21,7 @@ use http::StatusCode;
 
 use super::backend::WebdavBackend;
 use super::error::parse_error;
+use crate::raw::oio::Stream;
 use crate::raw::*;
 use crate::*;
 
@@ -67,8 +68,8 @@ impl WebdavWriter {
 
 #[async_trait]
 impl oio::Write for WebdavWriter {
-    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.write_oneshot(size, AsyncBody::Stream(s)).await
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        self.write_oneshot(s.size(), AsyncBody::Stream(s)).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 03cbfdf87..d27a84480 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -62,7 +62,7 @@ impl WebhdfsWriter {
 
 #[async_trait]
 impl oio::Write for WebhdfsWriter {
-    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index 442bc2188..fb7735b9f 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -730,8 +730,7 @@ impl Operator {
 
                     let (_, mut w) = inner.write(&path, args).await?;
                     // FIXME: we should bench here to measure the perf.
-                    w.write(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
-                        .await?;
+                    w.write(Box::new(oio::Cursor::from(bs))).await?;
                     w.close().await?;
 
                     Ok(())
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index f3011e69e..3ce085dd8 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -83,8 +83,7 @@ impl Writer {
     pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
         if let State::Idle(Some(w)) = &mut self.state {
             let bs = bs.into();
-            w.write(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
-                .await
+            w.write(Box::new(oio::Cursor::from(bs))).await
         } else {
             unreachable!(
                 "writer state invalid while write, expect Idle, actual {}",
@@ -132,7 +131,7 @@ impl Writer {
     {
         if let State::Idle(Some(w)) = &mut self.state {
             let s = Box::new(oio::into_stream(size, sink_from.map_ok(|v| 
v.into())));
-            w.write(size, s).await
+            w.write(s).await
         } else {
             unreachable!(
                 "writer state invalid while sink, expect Idle, actual {}",
@@ -177,7 +176,7 @@ impl Writer {
     {
         if let State::Idle(Some(w)) = &mut self.state {
             let s = Box::new(oio::into_stream_from_reader(size, read_from));
-            w.write(size, s).await
+            w.write(s).await
         } else {
             unreachable!(
                 "writer state invalid while copy, expect Idle, actual {}",
@@ -253,8 +252,7 @@ impl AsyncWrite for Writer {
                     let size = bs.len();
                     let fut = async move {
                         // FIXME: we should bench here to measure the perf.
-                        w.write(size as u64, Box::new(oio::Cursor::from(bs)))
-                            .await?;
+                        w.write(Box::new(oio::Cursor::from(bs))).await?;
                         Ok((size, w))
                     };
                     self.state = State::Write(Box::pin(fut));
@@ -322,8 +320,7 @@ impl tokio::io::AsyncWrite for Writer {
                     let size = bs.len();
                     let fut = async move {
                         // FIXME: we should bench here to measure the perf.
-                        w.write(size as u64, Box::new(oio::Cursor::from(bs)))
-                            .await?;
+                        w.write(Box::new(oio::Cursor::from(bs))).await?;
                         Ok((size, w))
                     };
                     self.state = State::Write(Box::pin(fut));

Reply via email to