This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 098ef52ef refactor: Remove oio::Write::copy_from (#3018)
098ef52ef is described below

commit 098ef52ef6fb94cbe100a2da582c7541880e1e92
Author: Xuanwo <[email protected]>
AuthorDate: Thu Sep 7 13:27:08 2023 +0800

    refactor: Remove oio::Write::copy_from (#3018)
    
    * refactor: Remove oio::Write::copy_from
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Remove write can sink
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/utils.rs                        |  4 --
 core/src/layers/complete.rs                      | 21 --------
 core/src/layers/concurrent_limit.rs              |  4 --
 core/src/layers/error_context.rs                 |  8 ---
 core/src/layers/logging.rs                       | 33 -------------
 core/src/layers/madsim.rs                        |  7 ---
 core/src/layers/metrics.rs                       | 14 ------
 core/src/layers/minitrace.rs                     | 10 ----
 core/src/layers/oteltrace.rs                     |  4 --
 core/src/layers/prometheus.rs                    | 17 -------
 core/src/layers/retry.rs                         | 61 -----------------------
 core/src/layers/throttle.rs                      |  4 --
 core/src/layers/timeout.rs                       | 13 -----
 core/src/layers/tracing.rs                       |  8 ---
 core/src/raw/adapters/kv/backend.rs              |  7 ---
 core/src/raw/adapters/typed_kv/backend.rs        |  7 ---
 core/src/raw/oio/write/api.rs                    | 26 ----------
 core/src/raw/oio/write/append_object_write.rs    | 11 -----
 core/src/raw/oio/write/compose_write.rs          | 15 ------
 core/src/raw/oio/write/exact_buf_write.rs        | 56 +--------------------
 core/src/raw/oio/write/multipart_upload_write.rs | 16 ------
 core/src/raw/oio/write/one_shot_write.rs         |  5 --
 core/src/services/azblob/backend.rs              |  1 -
 core/src/services/azblob/writer.rs               | 19 -------
 core/src/services/azdfs/writer.rs                |  7 ---
 core/src/services/cos/backend.rs                 |  1 -
 core/src/services/dropbox/writer.rs              |  7 ---
 core/src/services/fs/backend.rs                  |  1 -
 core/src/services/fs/writer.rs                   | 15 ------
 core/src/services/ftp/writer.rs                  |  7 ---
 core/src/services/gcs/backend.rs                 |  1 -
 core/src/services/gcs/writer.rs                  |  6 ---
 core/src/services/gdrive/writer.rs               |  4 --
 core/src/services/ghac/writer.rs                 |  7 ---
 core/src/services/hdfs/writer.rs                 |  7 ---
 core/src/services/ipmfs/writer.rs                |  7 ---
 core/src/services/obs/backend.rs                 |  1 -
 core/src/services/onedrive/writer.rs             |  7 ---
 core/src/services/oss/backend.rs                 |  1 -
 core/src/services/s3/backend.rs                  |  1 -
 core/src/services/sftp/writer.rs                 |  7 ---
 core/src/services/supabase/writer.rs             |  7 ---
 core/src/services/vercel_artifacts/writer.rs     |  7 ---
 core/src/services/wasabi/writer.rs               |  7 ---
 core/src/services/webdav/backend.rs              |  1 -
 core/src/services/webdav/writer.rs               |  7 ---
 core/src/services/webhdfs/writer.rs              |  7 ---
 core/src/types/capability.rs                     |  2 -
 core/src/types/writer.rs                         | 63 +++++++++++++++---------
 core/tests/behavior/write.rs                     | 19 +++----
 50 files changed, 46 insertions(+), 532 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 0171b9abf..bde28481e 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -30,10 +30,6 @@ impl oio::Write for BlackHoleWriter {
         Ok(bs.len() as u64)
     }
 
-    async fn copy_from(&mut self, size: u64, _: oio::Reader) -> 
opendal::Result<u64> {
-        Ok(size)
-    }
-
     async fn abort(&mut self) -> opendal::Result<()> {
         Ok(())
     }
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 63ff3aa0d..83bf12d6a 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -734,27 +734,6 @@ where
         Ok(n)
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        if let Some(total_size) = self.size {
-            if self.written + size > total_size {
-                return Err(Error::new(
-                    ErrorKind::ContentTruncated,
-                    &format!(
-                        "writer got too much data, expect: {size}, actual: {}",
-                        self.written + size
-                    ),
-                ));
-            }
-        }
-
-        let w = self.inner.as_mut().ok_or_else(|| {
-            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
-        })?;
-        let n = w.copy_from(size, s).await?;
-        self.written += n;
-        Ok(n)
-    }
-
     async fn abort(&mut self) -> Result<()> {
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 22f5d4c52..99fa4e413 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -293,10 +293,6 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
         self.inner.abort().await
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.copy_from(size, s).await
-    }
-
     async fn close(&mut self) -> Result<()> {
         self.inner.close().await
     }
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 925f1d191..e325cd0f7 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -419,14 +419,6 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.copy_from(size, s).await.map_err(|err| {
-            err.with_operation(WriteOperation::CopyFrom)
-                .with_context("service", self.scheme)
-                .with_context("path", &self.path)
-        })
-    }
-
     async fn close(&mut self) -> Result<()> {
         self.inner.close().await.map_err(|err| {
             err.with_operation(WriteOperation::Close)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 1b0a6d6c0..1f6d7456a 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1285,39 +1285,6 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
         }
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        match self.inner.copy_from(size, s).await {
-            Ok(n) => {
-                self.written += n;
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={} -> data sink 
{}B",
-                    self.ctx.scheme,
-                    WriteOperation::CopyFrom,
-                    self.path,
-                    self.written,
-                    n
-                );
-                Ok(n)
-            }
-            Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={} -> data 
sink failed: {}",
-                        self.ctx.scheme,
-                        WriteOperation::CopyFrom,
-                        self.path,
-                        self.written,
-                        self.ctx.error_print(&err),
-                    )
-                }
-                Err(err)
-            }
-        }
-    }
-
     async fn abort(&mut self) -> Result<()> {
         match self.inner.abort().await {
             Ok(_) => {
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 92f1925a8..432e4bcf4 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -318,13 +318,6 @@ impl oio::Write for MadsimWriter {
         }
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> 
crate::Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "will be supported in the future",
-        ))
-    }
-
     async fn abort(&mut self) -> crate::Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index d4d65f839..36e4957aa 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -861,20 +861,6 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
             })
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner
-            .copy_from(size, s)
-            .await
-            .map(|n| {
-                self.bytes += n;
-                n
-            })
-            .map_err(|err| {
-                self.handle.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             self.handle.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 4daf0c056..149410ac8 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -347,16 +347,6 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
             .await
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner
-            .copy_from(size, s)
-            .in_span(Span::enter_with_parent(
-                WriteOperation::CopyFrom.into_static(),
-                &self.span,
-            ))
-            .await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner
             .abort()
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index b574e7b98..0722a11a7 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -317,10 +317,6 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
         self.inner.write(bs).await
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.copy_from(size, s).await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await
     }
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 35ce5144c..0d8f99c22 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -679,23 +679,6 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner
-            .copy_from(size, s)
-            .await
-            .map(|n| {
-                self.stats
-                    .bytes_total
-                    .with_label_values(&[&self.scheme, 
Operation::Write.into_static()])
-                    .observe(n as f64);
-                n
-            })
-            .map_err(|err| {
-                self.stats.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             self.stats.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 9d3408866..3e207df9a 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -898,67 +898,6 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
         }
     }
 
-    /// > Ooooooooooops, are you crazy!? Why we need to do `Arc<Mutex<S>>` 
here? Adding a lock has
-    /// a lot overhead!
-    ///
-    /// Yes, you are right. But we have no choice. This is the only safe way 
for us to add retry
-    /// support for stream.
-    ///
-    /// And the overhead is acceptable. Based on our benchmark, adding a lock
-    /// that has no conflicts will only cost 5ns.
-    ///
-    /// ```shell
-    /// stream/without_arc_mutex
-    ///                         time:   [10.715 ns 10.729 ns 10.744 ns]
-    ///                         thrpt:  [ 90896 GiB/s  91019 GiB/s  91139 
GiB/s]
-    /// stream/with_arc_mutex   time:   [14.891 ns 14.905 ns 14.928 ns]
-    ///                         thrpt:  [ 65418 GiB/s  65517 GiB/s  65581 
GiB/s]
-    /// ```
-    ///
-    /// The overhead is constant, which means the overhead will not increase 
with the size of
-    /// stream. For example, if every `next` call cost 1ms, then the overhead 
will only take 0.005%
-    /// which is acceptable.
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        let s = oio::into_cloneable_reader_within_tokio(s);
-
-        let mut backoff = self.builder.build();
-
-        loop {
-            match self.inner.copy_from(size, Box::new(s.clone())).await {
-                Ok(n) => return Ok(n),
-                Err(e) if !e.is_temporary() => return Err(e),
-                Err(e) => match backoff.next() {
-                    None => return Err(e),
-                    Some(dur) => {
-                        {
-                            use oio::ReadExt;
-
-                            let s = s.clone().into_inner();
-                            let mut stream = s.lock().await;
-                            // Try to reset this reader.
-                            //
-                            // If error happened, we will return the pipe 
error directly and stop retry.
-                            if 
stream.seek(io::SeekFrom::Start(0)).await.is_err() {
-                                return Err(e);
-                            }
-                        }
-
-                        self.notify.intercept(
-                            &e,
-                            dur,
-                            &[
-                                ("operation", 
WriteOperation::CopyFrom.into_static()),
-                                ("path", &self.path),
-                            ],
-                        );
-                        tokio::time::sleep(dur).await;
-                        continue;
-                    }
-                },
-            }
-        }
-    }
-
     async fn abort(&mut self) -> Result<()> {
         let mut backoff = self.builder.build();
 
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 4edec592e..021d1cad9 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -241,10 +241,6 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
         }
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.copy_from(size, s).await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await
     }
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index e15cfeac9..68ac5a41d 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -335,19 +335,6 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
             })?
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        let timeout = self.io_timeout(size);
-
-        tokio::time::timeout(timeout, self.inner.copy_from(size, s))
-            .await
-            .map_err(|_| {
-                Error::new(ErrorKind::Unexpected, "operation timeout")
-                    .with_operation(WriteOperation::CopyFrom)
-                    .with_context("timeout", timeout.as_secs_f64().to_string())
-                    .set_temporary()
-            })?
-    }
-
     async fn abort(&mut self) -> Result<()> {
         tokio::time::timeout(self.timeout, self.inner.abort())
             .await
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index ca277fbd6..c5d006980 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -328,14 +328,6 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         self.inner.write(bs).await
     }
 
-    #[tracing::instrument(
-        parent = &self.span,
-        level = "trace",
-        skip_all)]
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.copy_from(size, s).await
-    }
-
     #[tracing::instrument(
         parent = &self.span,
         level = "trace",
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 38b7db998..c0823d59e 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -397,13 +397,6 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Ok(size as u64)
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 07b919f79..948c2b5ad 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -410,13 +410,6 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Ok(size as u64)
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.buf.clear();
 
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index 952c073ae..eaf3ba0de 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -21,7 +21,6 @@ use std::fmt::Formatter;
 use async_trait::async_trait;
 use bytes::Bytes;
 
-use crate::raw::*;
 use crate::*;
 
 /// WriteOperation is the name for APIs of Writer.
@@ -30,8 +29,6 @@ use crate::*;
 pub enum WriteOperation {
     /// Operation for [`Write::write`]
     Write,
-    /// Operation for [`Write::copy_from`]
-    CopyFrom,
     /// Operation for [`Write::abort`]
     Abort,
     /// Operation for [`Write::close`]
@@ -61,7 +58,6 @@ impl From<WriteOperation> for &'static str {
 
         match v {
             Write => "Writer::write",
-            CopyFrom => "Writer::copy_from",
             Abort => "Writer::abort",
             Close => "Writer::close",
             BlockingWrite => "BlockingWriter::write",
@@ -87,17 +83,6 @@ pub trait Write: Unpin + Send + Sync {
     /// repeatedly until all bytes has been written.
     async fn write(&mut self, bs: Bytes) -> Result<u64>;
 
-    /// Copy from given reader into the writer.
-    ///
-    /// # Behavior
-    ///
-    /// - `Ok(n)` means `n` bytes has been written successfully.
-    /// - `Err(err)` means error happens and no bytes has been written.
-    ///
-    /// It's possible that `n < size`, caller should pass the remaining bytes
-    /// repeatedly until all bytes has been written.
-    async fn copy_from(&mut self, size: u64, src: oio::Reader) -> Result<u64>;
-
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
 
@@ -113,13 +98,6 @@ impl Write for () {
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
-    async fn copy_from(&mut self, _: u64, _: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support sink",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
@@ -144,10 +122,6 @@ impl<T: Write + ?Sized> Write for Box<T> {
         (**self).write(bs).await
     }
 
-    async fn copy_from(&mut self, n: u64, s: oio::Reader) -> Result<u64> {
-        (**self).copy_from(n, s).await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         (**self).abort().await
     }
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index b7f1ff5c6..d6bb819df 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -91,17 +91,6 @@ where
         Ok(size)
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        let offset = self.offset().await?;
-
-        self.inner
-            .append(offset, size, AsyncBody::Stream(Box::new(s)))
-            .await
-            .map(|_| self.offset = Some(offset + size))?;
-
-        Ok(size)
-    }
-
     async fn close(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index 1afb7735e..dac833fcf 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -63,13 +63,6 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for 
TwoWaysWriter<ONE, TWO> {
         }
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        match self {
-            Self::One(one) => one.copy_from(size, s).await,
-            Self::Two(two) => two.copy_from(size, s).await,
-        }
-    }
-
     async fn abort(&mut self) -> Result<()> {
         match self {
             Self::One(one) => one.abort().await,
@@ -109,14 +102,6 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> 
oio::Write
         }
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        match self {
-            Self::One(one) => one.copy_from(size, s).await,
-            Self::Two(two) => two.copy_from(size, s).await,
-            Self::Three(three) => three.copy_from(size, s).await,
-        }
-    }
-
     async fn abort(&mut self) -> Result<()> {
         match self {
             Self::One(one) => one.abort().await,
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index a944f3365..7ba788e3d 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -18,10 +18,8 @@
 use std::cmp::min;
 
 use async_trait::async_trait;
-use bytes::{Buf, BufMut, Bytes, BytesMut};
-use tokio::io::ReadBuf;
+use bytes::{Buf, Bytes, BytesMut};
 
-use crate::raw::oio::ReadExt;
 use crate::raw::*;
 use crate::*;
 
@@ -104,50 +102,6 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
         }
     }
 
-    async fn copy_from(&mut self, size: u64, mut s: oio::Reader) -> 
Result<u64> {
-        loop {
-            match &mut self.buffer {
-                Buffer::Empty => {
-                    self.buffer = Buffer::Filling(BytesMut::new());
-                }
-                Buffer::Filling(fill) => {
-                    if fill.len() >= self.buffer_size {
-                        self.buffer = Buffer::Consuming(fill.split().freeze());
-                        continue;
-                    }
-
-                    // Reserve to enough size.
-                    if size > fill.remaining_mut() as u64 {
-                        fill.reserve(self.buffer_size - fill.len());
-                    }
-                    let dst = fill.spare_capacity_mut();
-                    let dst_len = dst.len();
-                    let mut buf = ReadBuf::uninit(dst);
-
-                    // Safety: the input buffer is created 
with_capacity(length).
-                    unsafe { buf.assume_init(dst_len) };
-
-                    let n = s.read(buf.initialize_unfilled()).await?;
-
-                    // Safety: read makes sure this buffer has been filled.
-                    unsafe { fill.advance_mut(n) };
-
-                    return Ok(n as u64);
-                }
-                Buffer::Consuming(consume) => {
-                    // Make sure filled buffer has been flushed.
-                    //
-                    // TODO: maybe we can re-fill it after a successful write.
-                    while !consume.is_empty() {
-                        let n = self.inner.write(consume.clone()).await?;
-                        consume.advance(n as usize);
-                    }
-                    self.buffer = Buffer::Filling(BytesMut::new());
-                }
-            }
-        }
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.buffer = Buffer::Empty;
         self.inner.abort().await
@@ -181,7 +135,6 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
 
 #[cfg(test)]
 mod tests {
-    use futures::AsyncReadExt;
     use log::debug;
     use pretty_assertions::assert_eq;
     use rand::thread_rng;
@@ -206,13 +159,6 @@ mod tests {
             Ok(bs.len() as u64)
         }
 
-        async fn copy_from(&mut self, size: u64, mut s: oio::Reader) -> 
Result<u64> {
-            let mut bs = vec![];
-            s.read_to_end(&mut bs).await.unwrap();
-            assert_eq!(bs.len() as u64, size);
-            self.write(bs.into()).await
-        }
-
         async fn abort(&mut self) -> Result<()> {
             Ok(())
         }
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index e39d59ca5..13d1b4aa2 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -138,22 +138,6 @@ where
         Ok(size as u64)
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        let upload_id = self.upload_id().await?;
-
-        self.inner
-            .write_part(
-                &upload_id,
-                self.parts.len(),
-                size,
-                AsyncBody::Stream(Box::new(s)),
-            )
-            .await
-            .map(|v| self.parts.push(v))?;
-
-        Ok(size)
-    }
-
     async fn close(&mut self) -> Result<()> {
         let upload_id = if let Some(upload_id) = &self.upload_id {
             upload_id
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index 65ae8a113..3681242ce 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -58,11 +58,6 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
         Ok(size)
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.write_once(size, Box::new(s)).await?;
-        Ok(size)
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index fbd480de6..4b04d5c7e 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -530,7 +530,6 @@ impl Accessor for AzblobBackend {
 
                 write: true,
                 write_can_append: true,
-                write_can_sink: true,
                 write_with_cache_control: true,
                 write_with_content_type: true,
 
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index fbe509de9..b088aa57a 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -180,25 +180,6 @@ impl oio::Write for AzblobWriter {
         Ok(size)
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        if self.op.append() {
-            self.append_oneshot(size, AsyncBody::Stream(Box::new(s)))
-                .await?;
-        } else {
-            if self.op.content_length().is_none() {
-                return Err(Error::new(
-                    ErrorKind::Unsupported,
-                    "write without content length is not supported",
-                ));
-            }
-
-            self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
-                .await?;
-        }
-
-        Ok(size)
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 11c822fb1..e56f9eca9 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -88,13 +88,6 @@ impl oio::Write for AzdfsWriter {
         }
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index ba162fae9..46e3b3064 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -272,7 +272,6 @@ impl Accessor for CosBackend {
 
                 write: true,
                 write_can_append: true,
-                write_can_sink: true,
                 write_with_content_type: true,
                 write_with_cache_control: true,
                 write_with_content_disposition: true,
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 7cbf1a4d7..00998c77d 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -62,13 +62,6 @@ impl oio::Write for DropboxWriter {
         }
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index f94f59d96..950cfd76e 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -263,7 +263,6 @@ impl Accessor for FsBackend {
 
                 write: true,
                 write_can_append: true,
-                write_can_sink: true,
                 write_without_content_length: true,
                 create_dir: true,
                 delete: true,
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 78b1c99cb..4a9a0471d 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -22,7 +22,6 @@ use std::path::PathBuf;
 
 use async_trait::async_trait;
 use bytes::Bytes;
-use futures::StreamExt;
 use tokio::io::AsyncSeekExt;
 use tokio::io::AsyncWriteExt;
 
@@ -67,20 +66,6 @@ impl oio::Write for FsWriter<tokio::fs::File> {
         Ok(size)
     }
 
-    async fn copy_from(&mut self, size: u64, mut s: oio::Reader) -> 
Result<u64> {
-        while let Some(bs) = s.next().await {
-            let bs = bs?;
-            self.f
-                .seek(SeekFrom::Start(self.pos))
-                .await
-                .map_err(parse_io_error)?;
-            self.f.write_all(&bs).await.map_err(parse_io_error)?;
-            self.pos += bs.len() as u64;
-        }
-
-        Ok(size)
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index ffc08fbf1..3488255f9 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -55,13 +55,6 @@ impl oio::Write for FtpWriter {
         Ok(size as u64)
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index dc760cb15..77549908e 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -360,7 +360,6 @@ impl Accessor for GcsBackend {
                 read_with_if_none_match: true,
 
                 write: true,
-                write_can_sink: true,
                 write_with_content_type: true,
                 write_without_content_length: true,
                 delete: true,
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index e9152f66c..68e13b27f 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -161,12 +161,6 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
-            .await?;
-        Ok(size)
-    }
-
     async fn abort(&mut self) -> Result<()> {
         let location = if let Some(location) = &self.location {
             location
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index 174923c90..8d69bf8e2 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -106,10 +106,6 @@ impl oio::Write for GdriveWriter {
         Ok(size)
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 5738f5d58..a7db6acab 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,13 +62,6 @@ impl oio::Write for GhacWriter {
         }
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 3353059e6..90499e47c 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -58,13 +58,6 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
         Ok(size as u64)
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 98afb6cbb..d3f98e77b 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -53,13 +53,6 @@ impl oio::Write for IpmfsWriter {
         }
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 4c3176cce..cc14eedd0 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -279,7 +279,6 @@ impl Accessor for ObsBackend {
 
                 write: true,
                 write_can_append: true,
-                write_can_sink: true,
                 write_with_content_type: true,
                 write_with_cache_control: true,
                 write_without_content_length: true,
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 8a0c84493..abb846871 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -58,13 +58,6 @@ impl oio::Write for OneDriveWriter {
         Ok(size as u64)
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index d924e4cc9..212c42ce0 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -404,7 +404,6 @@ impl Accessor for OssBackend {
 
                 write: true,
                 write_can_append: true,
-                write_can_sink: true,
                 write_with_cache_control: true,
                 write_with_content_type: true,
                 write_with_content_disposition: true,
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 2c82cc03c..d2d8bb039 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -913,7 +913,6 @@ impl Accessor for S3Backend {
                 read_with_override_content_type: true,
 
                 write: true,
-                write_can_sink: true,
                 write_with_cache_control: true,
                 write_with_content_type: true,
                 write_without_content_length: true,
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 1c8fc9ba9..2df725ab2 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -43,13 +43,6 @@ impl oio::Write for SftpWriter {
         Ok(size)
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index 406b9e5cd..5c57d7a95 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -77,13 +77,6 @@ impl oio::Write for SupabaseWriter {
         Ok(size as u64)
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index 68edbbfed..efb223df6 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -62,13 +62,6 @@ impl oio::Write for VercelArtifactsWriter {
         }
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 3b358a15f..55d898ccc 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -65,13 +65,6 @@ impl oio::Write for WasabiWriter {
         }
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index 0010cd2c5..509d4c612 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -238,7 +238,6 @@ impl Accessor for WebdavBackend {
                 read_with_range: true,
 
                 write: true,
-                write_can_sink: true,
 
                 create_dir: true,
                 delete: true,
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 084fc08a7..30b9827f2 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -70,13 +70,6 @@ impl oio::Write for WebdavWriter {
         Ok(size)
     }
 
-    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
-            .await?;
-
-        Ok(size)
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 2116a30e8..8dcbc9dc3 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -64,13 +64,6 @@ impl oio::Write for WebhdfsWriter {
         }
     }
 
-    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs
index d8baab96c..6d3389c2b 100644
--- a/core/src/types/capability.rs
+++ b/core/src/types/capability.rs
@@ -78,8 +78,6 @@ pub struct Capability {
     pub write: bool,
     /// If operator supports write by append, it will be true.
     pub write_can_append: bool,
-    /// If operator supports write by sink a stream into, it will be true.
-    pub write_can_sink: bool,
     /// If operator supports write with without content length, it will
     /// be true.
     ///
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 9173e4611..3ae7f6f4c 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -127,25 +127,35 @@ impl Writer {
     ///         .content_length(2 * 4096)
     ///         .await?;
     ///     let stream = stream::iter(vec![vec![0; 4096], vec![1; 
4096]]).map(Ok);
-    ///     w.sink(2 * 4096, stream).await?;
+    ///     w.sink(stream).await?;
     ///     w.close().await?;
     ///     Ok(())
     /// }
     /// ```
-    pub async fn sink<S, T>(&mut self, size: u64, sink_from: S) -> Result<u64>
+    pub async fn sink<S, T>(&mut self, mut sink_from: S) -> Result<u64>
     where
         S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static,
         T: Into<Bytes>,
     {
-        if let State::Idle(Some(w)) = &mut self.state {
-            let r = Box::new(oio::into_read_from_stream(sink_from.map_ok(|v| 
v.into())));
-            w.copy_from(size, r).await
+        let w = if let State::Idle(Some(w)) = &mut self.state {
+            w
         } else {
             unreachable!(
                 "writer state invalid while sink, expect Idle, actual {}",
                 self.state
             );
+        };
+
+        let mut written = 0;
+        while let Some(bs) = sink_from.try_next().await? {
+            let mut bs = bs.into();
+            while bs.has_remaining() {
+                let n = w.write(bs.clone()).await?;
+                bs.advance(n as usize);
+                written += n;
+            }
         }
+        Ok(written)
     }
 
     /// Copy into writer.
@@ -173,27 +183,20 @@ impl Writer {
     /// async fn copy_example(op: Operator) -> Result<()> {
     ///     let mut w = 
op.writer_with("path/to/file").content_length(4096).await?;
     ///     let reader = Cursor::new(vec![0; 4096]);
-    ///     w.copy(4096, reader).await?;
+    ///     w.copy(reader).await?;
     ///     w.close().await?;
     ///     Ok(())
     /// }
     /// ```
-    pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<u64>
+    pub async fn copy<R>(&mut self, read_from: R) -> Result<u64>
     where
-        R: futures::AsyncRead + futures::AsyncSeek + Send + Sync + Unpin + 
'static,
+        R: futures::AsyncRead + Send + Sync + Unpin + 'static,
     {
-        if let State::Idle(Some(w)) = &mut self.state {
-            let r = Box::new(oio::into_streamable_read(
-                oio::into_read_from_file(read_from, 0, size),
-                64 * 1024,
-            ));
-            w.copy_from(size, r).await
-        } else {
-            unreachable!(
-                "writer state invalid while copy, expect Idle, actual {}",
-                self.state
-            );
-        }
+        futures::io::copy(read_from, self).await.map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "copy into writer failed")
+                .with_operation("copy")
+                .set_source(err)
+        })
     }
 
     /// Abort the writer and clean up all written data.
@@ -271,7 +274,10 @@ impl AsyncWrite for Writer {
                         self.state = State::Idle(Some(w));
                         return Poll::Ready(Ok(size));
                     }
-                    Err(err) => return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+                    Err(err) => {
+                        self.state = State::Idle(None);
+                        return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
+                    }
                 },
                 State::Close(_) => {
                     unreachable!("invalid state of writer: poll_write with 
State::Close")
@@ -306,7 +312,10 @@ impl AsyncWrite for Writer {
                         self.state = State::Idle(Some(w));
                         return Poll::Ready(Ok(()));
                     }
-                    Err(err) => return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+                    Err(err) => {
+                        self.state = State::Idle(None);
+                        return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
+                    }
                 },
             }
         }
@@ -337,7 +346,10 @@ impl tokio::io::AsyncWrite for Writer {
                         self.state = State::Idle(Some(w));
                         return Poll::Ready(Ok(size));
                     }
-                    Err(err) => return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+                    Err(err) => {
+                        self.state = State::Idle(None);
+                        return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
+                    }
                 },
                 State::Close(_) => {
                     unreachable!("invalid state of writer: poll_write with 
State::Close")
@@ -371,7 +383,10 @@ impl tokio::io::AsyncWrite for Writer {
                         self.state = State::Idle(Some(w));
                         return Poll::Ready(Ok(()));
                     }
-                    Err(err) => return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+                    Err(err) => {
+                        self.state = State::Idle(None);
+                        return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
+                    }
                 },
             }
         }
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index fe4166d78..10c9e8c1a 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -1147,7 +1147,7 @@ pub async fn test_writer_write(op: Operator) -> 
Result<()> {
 /// Streaming data into writer
 pub async fn test_writer_sink(op: Operator) -> Result<()> {
     let cap = op.info().full_capability();
-    if !(cap.write && cap.write_can_sink) {
+    if !(cap.write && cap.write_without_content_length) {
         return Ok(());
     }
 
@@ -1157,11 +1157,8 @@ pub async fn test_writer_sink(op: Operator) -> 
Result<()> {
     let content_b = gen_fixed_bytes(size);
     let stream = stream::iter(vec![content_a.clone(), 
content_b.clone()]).map(Ok);
 
-    let mut w = op
-        .writer_with(&path)
-        .content_length(2 * size as u64)
-        .await?;
-    w.sink(2 * size as u64, stream).await?;
+    let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?;
+    w.sink(stream).await?;
     w.close().await?;
 
     let meta = op.stat(&path).await.expect("stat must succeed");
@@ -1187,7 +1184,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> 
{
 /// Reading data into writer
 pub async fn test_writer_copy(op: Operator) -> Result<()> {
     let cap = op.info().full_capability();
-    if !(cap.write && cap.write_can_sink) {
+    if !(cap.write && cap.write_without_content_length) {
         return Ok(());
     }
 
@@ -1196,18 +1193,14 @@ pub async fn test_writer_copy(op: Operator) -> 
Result<()> {
     let content_a = gen_fixed_bytes(size);
     let content_b = gen_fixed_bytes(size);
 
-    let mut w = op
-        .writer_with(&path)
-        .content_length(2 * size as u64)
-        .await?;
+    let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?;
 
     let mut content = Bytes::from([content_a.clone(), 
content_b.clone()].concat());
     while !content.is_empty() {
         let reader = Cursor::new(content.clone());
-        let n = w.copy(2 * size as u64, reader).await?;
+        let n = w.copy(reader).await?;
         content.advance(n as usize);
     }
-
     w.close().await?;
 
     let meta = op.stat(&path).await.expect("stat must succeed");

Reply via email to