This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ac6d29de refactor(core): Rename confusing pipe into copy_from (#3015)
0ac6d29de is described below

commit 0ac6d29deaa7cddc7e774bed64cb4571ec9c5041
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 6 14:22:36 2023 +0800

    refactor(core): Rename confusing pipe into copy_from (#3015)
---
 core/benches/oio/utils.rs                        |  2 +-
 core/src/layers/complete.rs                      |  4 ++--
 core/src/layers/concurrent_limit.rs              |  4 ++--
 core/src/layers/error_context.rs                 |  6 +++---
 core/src/layers/logging.rs                       |  8 ++++----
 core/src/layers/madsim.rs                        |  2 +-
 core/src/layers/metrics.rs                       |  4 ++--
 core/src/layers/minitrace.rs                     |  6 +++---
 core/src/layers/oteltrace.rs                     |  4 ++--
 core/src/layers/prometheus.rs                    |  4 ++--
 core/src/layers/retry.rs                         |  6 +++---
 core/src/layers/throttle.rs                      |  4 ++--
 core/src/layers/timeout.rs                       |  6 +++---
 core/src/layers/tracing.rs                       |  4 ++--
 core/src/raw/adapters/kv/backend.rs              |  2 +-
 core/src/raw/adapters/typed_kv/backend.rs        |  2 +-
 core/src/raw/oio/cursor.rs                       |  2 +-
 core/src/raw/oio/write/api.rs                    | 16 ++++++++--------
 core/src/raw/oio/write/append_object_write.rs    |  2 +-
 core/src/raw/oio/write/compose_write.rs          | 14 +++++++-------
 core/src/raw/oio/write/exact_buf_write.rs        |  4 ++--
 core/src/raw/oio/write/multipart_upload_write.rs |  2 +-
 core/src/raw/oio/write/one_shot_write.rs         |  2 +-
 core/src/services/azblob/writer.rs               |  2 +-
 core/src/services/azdfs/writer.rs                |  2 +-
 core/src/services/dropbox/writer.rs              |  2 +-
 core/src/services/fs/writer.rs                   |  2 +-
 core/src/services/ftp/writer.rs                  |  2 +-
 core/src/services/gcs/writer.rs                  |  2 +-
 core/src/services/gdrive/writer.rs               |  2 +-
 core/src/services/ghac/writer.rs                 |  2 +-
 core/src/services/hdfs/writer.rs                 |  2 +-
 core/src/services/ipmfs/writer.rs                |  2 +-
 core/src/services/onedrive/writer.rs             |  2 +-
 core/src/services/sftp/writer.rs                 |  2 +-
 core/src/services/supabase/writer.rs             |  2 +-
 core/src/services/vercel_artifacts/writer.rs     |  2 +-
 core/src/services/wasabi/writer.rs               |  2 +-
 core/src/services/webdav/writer.rs               |  2 +-
 core/src/services/webhdfs/writer.rs              |  2 +-
 core/src/types/writer.rs                         |  4 ++--
 41 files changed, 74 insertions(+), 74 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 53aa665ca..0171b9abf 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -30,7 +30,7 @@ impl oio::Write for BlackHoleWriter {
         Ok(bs.len() as u64)
     }
 
-    async fn pipe(&mut self, size: u64, _: oio::Reader) -> 
opendal::Result<u64> {
+    async fn copy_from(&mut self, size: u64, _: oio::Reader) -> 
opendal::Result<u64> {
         Ok(size)
     }
 
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index db470334c..63ff3aa0d 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -734,7 +734,7 @@ where
         Ok(n)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         if let Some(total_size) = self.size {
             if self.written + size > total_size {
                 return Err(Error::new(
@@ -750,7 +750,7 @@ where
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        let n = w.pipe(size, s).await?;
+        let n = w.copy_from(size, s).await?;
         self.written += n;
         Ok(n)
     }
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index c1504e6a2..22f5d4c52 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -293,8 +293,8 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
         self.inner.abort().await
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.pipe(size, s).await
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.copy_from(size, s).await
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 33cebe92c..925f1d191 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -419,9 +419,9 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.pipe(size, s).await.map_err(|err| {
-            err.with_operation(WriteOperation::Pipe)
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.copy_from(size, s).await.map_err(|err| {
+            err.with_operation(WriteOperation::CopyFrom)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
         })
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 5ed80a28b..1b0a6d6c0 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1285,15 +1285,15 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        match self.inner.pipe(size, s).await {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        match self.inner.copy_from(size, s).await {
             Ok(n) => {
                 self.written += n;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} written={} -> data sink 
{}B",
                     self.ctx.scheme,
-                    WriteOperation::Pipe,
+                    WriteOperation::CopyFrom,
                     self.path,
                     self.written,
                     n
@@ -1307,7 +1307,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                         lvl,
                         "service={} operation={} path={} written={} -> data 
sink failed: {}",
                         self.ctx.scheme,
-                        WriteOperation::Pipe,
+                        WriteOperation::CopyFrom,
                         self.path,
                         self.written,
                         self.ctx.error_print(&err),
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 55741389d..92f1925a8 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter {
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> crate::Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> 
crate::Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "will be supported in the future",
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index a96e83f8a..d4d65f839 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -861,9 +861,9 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
             })
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner
-            .pipe(size, s)
+            .copy_from(size, s)
             .await
             .map(|n| {
                 self.bytes += n;
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index f6487aa98..4daf0c056 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -347,11 +347,11 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
             .await
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner
-            .pipe(size, s)
+            .copy_from(size, s)
             .in_span(Span::enter_with_parent(
-                WriteOperation::Pipe.into_static(),
+                WriteOperation::CopyFrom.into_static(),
                 &self.span,
             ))
             .await
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 9bd464be5..b574e7b98 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -317,8 +317,8 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
         self.inner.write(bs).await
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.pipe(size, s).await
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.copy_from(size, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 70aae731e..35ce5144c 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -679,9 +679,9 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner
-            .pipe(size, s)
+            .copy_from(size, s)
             .await
             .map(|n| {
                 self.stats
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index d5c8a0738..9d3408866 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -918,13 +918,13 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
     /// The overhead is constant, which means the overhead will not increase 
with the size of
     /// stream. For example, if every `next` call cost 1ms, then the overhead 
will only take 0.005%
     /// which is acceptable.
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         let s = oio::into_cloneable_reader_within_tokio(s);
 
         let mut backoff = self.builder.build();
 
         loop {
-            match self.inner.pipe(size, Box::new(s.clone())).await {
+            match self.inner.copy_from(size, Box::new(s.clone())).await {
                 Ok(n) => return Ok(n),
                 Err(e) if !e.is_temporary() => return Err(e),
                 Err(e) => match backoff.next() {
@@ -947,7 +947,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
                             &e,
                             dur,
                             &[
-                                ("operation", 
WriteOperation::Pipe.into_static()),
+                                ("operation", 
WriteOperation::CopyFrom.into_static()),
                                 ("path", &self.path),
                             ],
                         );
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index d821aacbf..4edec592e 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -241,8 +241,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.pipe(size, s).await
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.copy_from(size, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 642b8a49b..e15cfeac9 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -335,14 +335,14 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
             })?
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         let timeout = self.io_timeout(size);
 
-        tokio::time::timeout(timeout, self.inner.pipe(size, s))
+        tokio::time::timeout(timeout, self.inner.copy_from(size, s))
             .await
             .map_err(|_| {
                 Error::new(ErrorKind::Unexpected, "operation timeout")
-                    .with_operation(WriteOperation::Pipe)
+                    .with_operation(WriteOperation::CopyFrom)
                     .with_context("timeout", timeout.as_secs_f64().to_string())
                     .set_temporary()
             })?
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 467d0a153..ca277fbd6 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -332,8 +332,8 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
-        self.inner.pipe(size, s).await
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+        self.inner.copy_from(size, s).await
     }
 
     #[tracing::instrument(
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index ce9ad3005..38b7db998 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -397,7 +397,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 494118e84..07b919f79 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -410,7 +410,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 796b2a349..2c79451c1 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -180,7 +180,7 @@ impl oio::Stream for Cursor {
 /// ChunkedCursor is used represents a non-contiguous bytes in memory.
 ///
 /// This is useful when we buffer users' random writes without copy. 
ChunkedCursor implements
-/// [`oio::Stream`] so it can be used in [`oio::Write::pipe`] directly.
+/// [`oio::Stream`] so it can be used in [`oio::Write::copy_from`] directly.
 ///
 /// # TODO
 ///
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index ba5a2b675..952c073ae 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -30,8 +30,8 @@ use crate::*;
 pub enum WriteOperation {
     /// Operation for [`Write::write`]
     Write,
-    /// Operation for [`Write::pipe`]
-    Pipe,
+    /// Operation for [`Write::copy_from`]
+    CopyFrom,
     /// Operation for [`Write::abort`]
     Abort,
     /// Operation for [`Write::close`]
@@ -61,7 +61,7 @@ impl From<WriteOperation> for &'static str {
 
         match v {
             Write => "Writer::write",
-            Pipe => "Writer::pipe",
+            CopyFrom => "Writer::copy_from",
             Abort => "Writer::abort",
             Close => "Writer::close",
             BlockingWrite => "BlockingWriter::write",
@@ -87,7 +87,7 @@ pub trait Write: Unpin + Send + Sync {
     /// repeatedly until all bytes has been written.
     async fn write(&mut self, bs: Bytes) -> Result<u64>;
 
-    /// Sink given stream into writer.
+    /// Copy from given reader into the writer.
     ///
     /// # Behavior
     ///
@@ -96,7 +96,7 @@ pub trait Write: Unpin + Send + Sync {
     ///
     /// It's possible that `n < size`, caller should pass the remaining bytes
     /// repeatedly until all bytes has been written.
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64>;
+    async fn copy_from(&mut self, size: u64, src: oio::Reader) -> Result<u64>;
 
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
@@ -113,7 +113,7 @@ impl Write for () {
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
-    async fn pipe(&mut self, _: u64, _: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _: u64, _: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support sink",
@@ -144,8 +144,8 @@ impl<T: Write + ?Sized> Write for Box<T> {
         (**self).write(bs).await
     }
 
-    async fn pipe(&mut self, n: u64, s: oio::Reader) -> Result<u64> {
-        (**self).pipe(n, s).await
+    async fn copy_from(&mut self, n: u64, s: oio::Reader) -> Result<u64> {
+        (**self).copy_from(n, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index 0e5fd9ed3..b7f1ff5c6 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -91,7 +91,7 @@ where
         Ok(size)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         let offset = self.offset().await?;
 
         self.inner
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index 05dfdd775..1afb7735e 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -63,10 +63,10 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for 
TwoWaysWriter<ONE, TWO> {
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         match self {
-            Self::One(one) => one.pipe(size, s).await,
-            Self::Two(two) => two.pipe(size, s).await,
+            Self::One(one) => one.copy_from(size, s).await,
+            Self::Two(two) => two.copy_from(size, s).await,
         }
     }
 
@@ -109,11 +109,11 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> 
oio::Write
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         match self {
-            Self::One(one) => one.pipe(size, s).await,
-            Self::Two(two) => two.pipe(size, s).await,
-            Self::Three(three) => three.pipe(size, s).await,
+            Self::One(one) => one.copy_from(size, s).await,
+            Self::Two(two) => two.copy_from(size, s).await,
+            Self::Three(three) => three.copy_from(size, s).await,
         }
     }
 
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index b2209ed73..c0ecac3aa 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -90,7 +90,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
         }
     }
 
-    async fn pipe(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
         loop {
             match &mut self.buffer {
                 Buffer::Filling(fill) => {
@@ -185,7 +185,7 @@ mod tests {
             Ok(bs.len() as u64)
         }
 
-        async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> 
{
+        async fn copy_from(&mut self, size: u64, mut s: oio::Reader) -> 
Result<u64> {
             let mut bs = vec![];
             s.read_to_end(&mut bs).await.unwrap();
             assert_eq!(bs.len() as u64, size);
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 0b314c163..e39d59ca5 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -138,7 +138,7 @@ where
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         let upload_id = self.upload_id().await?;
 
         self.inner
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index a85feecf3..65ae8a113 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -58,7 +58,7 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
         Ok(size)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.inner.write_once(size, Box::new(s)).await?;
         Ok(size)
     }
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index 11776755f..fbe509de9 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -180,7 +180,7 @@ impl oio::Write for AzblobWriter {
         Ok(size)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         if self.op.append() {
             self.append_oneshot(size, AsyncBody::Stream(Box::new(s)))
                 .await?;
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 4b9b90f9f..11c822fb1 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -88,7 +88,7 @@ impl oio::Write for AzdfsWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 37abab6a9..7cbf1a4d7 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for DropboxWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index ab27f2b65..78b1c99cb 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -67,7 +67,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
         Ok(size)
     }
 
-    async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, mut s: oio::Reader) -> 
Result<u64> {
         while let Some(bs) = s.next().await {
             let bs = bs?;
             self.f
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 3bb11582d..ffc08fbf1 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -55,7 +55,7 @@ impl oio::Write for FtpWriter {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 2e95f0b79..e9152f66c 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -161,7 +161,7 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
             .await?;
         Ok(size)
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index df974d940..174923c90 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -106,7 +106,7 @@ impl oio::Write for GdriveWriter {
         Ok(size)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
     }
 
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index bf9116ec2..5738f5d58 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for GhacWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 3b60a4df3..3353059e6 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 1434e980a..98afb6cbb 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for IpmfsWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 75a5e023a..8a0c84493 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for OneDriveWriter {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 5ee5d84ac..1c8fc9ba9 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -43,7 +43,7 @@ impl oio::Write for SftpWriter {
         Ok(size)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index 36a572bfe..406b9e5cd 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -77,7 +77,7 @@ impl oio::Write for SupabaseWriter {
         Ok(size as u64)
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index 936cddf01..68edbbfed 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for VercelArtifactsWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index a39c835dc..3b358a15f 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -65,7 +65,7 @@ impl oio::Write for WasabiWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index bbe79eb31..084fc08a7 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -70,7 +70,7 @@ impl oio::Write for WebdavWriter {
         Ok(size)
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
         self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
             .await?;
 
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index ae3396dc3..2116a30e8 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -64,7 +64,7 @@ impl oio::Write for WebhdfsWriter {
         }
     }
 
-    async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+    async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 9479b7c73..9173e4611 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -139,7 +139,7 @@ impl Writer {
     {
         if let State::Idle(Some(w)) = &mut self.state {
             let r = Box::new(oio::into_read_from_stream(sink_from.map_ok(|v| 
v.into())));
-            w.pipe(size, r).await
+            w.copy_from(size, r).await
         } else {
             unreachable!(
                 "writer state invalid while sink, expect Idle, actual {}",
@@ -187,7 +187,7 @@ impl Writer {
                 oio::into_read_from_file(read_from, 0, size),
                 64 * 1024,
             ));
-            w.pipe(size, r).await
+            w.copy_from(size, r).await
         } else {
             unreachable!(
                 "writer state invalid while copy, expect Idle, actual {}",

Reply via email to