This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-writer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit e6720722324c55a414f85c3ad9a05b88cbf1b57c
Author: Xuanwo <[email protected]>
AuthorDate: Mon Sep 4 14:53:17 2023 +0800

    Rename to pipe
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/utils.rs                        |  2 +-
 core/src/layers/complete.rs                      |  4 ++--
 core/src/layers/concurrent_limit.rs              |  4 ++--
 core/src/layers/error_context.rs                 |  6 +++---
 core/src/layers/logging.rs                       |  8 ++++----
 core/src/layers/madsim.rs                        |  2 +-
 core/src/layers/metrics.rs                       |  4 ++--
 core/src/layers/minitrace.rs                     |  6 +++---
 core/src/layers/oteltrace.rs                     |  4 ++--
 core/src/layers/prometheus.rs                    |  4 ++--
 core/src/layers/retry.rs                         |  6 +++---
 core/src/layers/throttle.rs                      |  4 ++--
 core/src/layers/timeout.rs                       |  6 +++---
 core/src/layers/tracing.rs                       |  4 ++--
 core/src/raw/adapters/kv/backend.rs              |  2 +-
 core/src/raw/adapters/typed_kv/backend.rs        |  2 +-
 core/src/raw/oio/cursor.rs                       |  2 +-
 core/src/raw/oio/write/api.rs                    | 14 +++++++-------
 core/src/raw/oio/write/append_object_write.rs    |  2 +-
 core/src/raw/oio/write/at_least_buf_write.rs     | 10 +++++-----
 core/src/raw/oio/write/compose_write.rs          | 14 +++++++-------
 core/src/raw/oio/write/exact_buf_write.rs        | 14 +++++++-------
 core/src/raw/oio/write/multipart_upload_write.rs |  2 +-
 core/src/raw/oio/write/one_shot_write.rs         |  2 +-
 core/src/services/azblob/writer.rs               |  2 +-
 core/src/services/azdfs/writer.rs                |  2 +-
 core/src/services/dropbox/writer.rs              |  2 +-
 core/src/services/fs/writer.rs                   |  2 +-
 core/src/services/ftp/writer.rs                  |  2 +-
 core/src/services/gcs/writer.rs                  |  2 +-
 core/src/services/gdrive/writer.rs               |  2 +-
 core/src/services/ghac/writer.rs                 |  2 +-
 core/src/services/hdfs/writer.rs                 |  2 +-
 core/src/services/ipmfs/writer.rs                |  2 +-
 core/src/services/onedrive/writer.rs             |  2 +-
 core/src/services/sftp/writer.rs                 |  2 +-
 core/src/services/supabase/writer.rs             |  2 +-
 core/src/services/vercel_artifacts/writer.rs     |  2 +-
 core/src/services/wasabi/writer.rs               |  2 +-
 core/src/services/webdav/writer.rs               |  2 +-
 core/src/services/webhdfs/writer.rs              |  2 +-
 core/src/types/writer.rs                         |  4 ++--
 42 files changed, 83 insertions(+), 83 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 9a14442c2..88e764c87 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -31,7 +31,7 @@ impl oio::Write for BlackHoleWriter {
         Ok(bs.len() as u64)
     }
 
-    async fn sink(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> {
+    async fn pipe(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> {
         Ok(size)
     }
 
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index e986d1a47..21efc47f8 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -734,7 +734,7 @@ where
         Ok(n as u64)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         if let Some(total_size) = self.size {
             if self.written + size > total_size {
                 return Err(Error::new(
@@ -750,7 +750,7 @@ where
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        let n = w.sink(size, s).await?;
+        let n = w.pipe(size, s).await?;
         self.written += n;
         Ok(n)
     }
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 96a682d61..7384547f6 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -293,8 +293,8 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
         self.inner.abort().await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.inner.sink(size, s).await
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+        self.inner.pipe(size, s).await
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 2acd6dd7d..536b7c956 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -419,9 +419,9 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.inner.sink(size, s).await.map_err(|err| {
-            err.with_operation(WriteOperation::Sink)
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+        self.inner.pipe(size, s).await.map_err(|err| {
+            err.with_operation(WriteOperation::Pipe)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
         })
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 6c63f466f..66708ec57 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1285,15 +1285,15 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        match self.inner.sink(size, s).await {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+        match self.inner.pipe(size, s).await {
             Ok(n) => {
                 self.written += n;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} written={} -> data sink 
{}B",
                     self.ctx.scheme,
-                    WriteOperation::Sink,
+                    WriteOperation::Pipe,
                     self.path,
                     self.written,
                     n
@@ -1307,7 +1307,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
                         lvl,
                         "service={} operation={} path={} written={} -> data 
sink failed: {}",
                         self.ctx.scheme,
-                        WriteOperation::Sink,
+                        WriteOperation::Pipe,
                         self.path,
                         self.written,
                         self.ctx.error_print(&err),
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 17835e5ba..7dc193cb7 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> 
crate::Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> 
crate::Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "will be supported in the future",
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 181ebb3c0..2439f0221 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -861,9 +861,9 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
             })
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner
-            .sink(size, s)
+            .pipe(size, s)
             .await
             .map(|n| {
                 self.bytes += n;
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 75c852c3a..65f73ecd0 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -347,11 +347,11 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
             .await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner
-            .sink(size, s)
+            .pipe(size, s)
             .in_span(Span::enter_with_parent(
-                WriteOperation::Sink.into_static(),
+                WriteOperation::Pipe.into_static(),
                 &self.span,
             ))
             .await
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index fde87e9ba..a439d79b3 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -317,8 +317,8 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
         self.inner.write(bs).await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.inner.sink(size, s).await
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+        self.inner.pipe(size, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 644532bf6..4fec394de 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -679,9 +679,9 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner
-            .sink(size, s)
+            .pipe(size, s)
             .await
             .map(|n| {
                 self.stats
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 07b92e24c..536860cde 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -919,13 +919,13 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
     /// The overhead is constant, which means the overhead will not increase 
with the size of
     /// stream. For example, if every `next` call cost 1ms, then the overhead 
will only take 0.005%
     /// which is acceptable.
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         let s = Arc::new(Mutex::new(s));
 
         let mut backoff = self.builder.build();
 
         loop {
-            match self.inner.sink(size, Box::new(s.clone())).await {
+            match self.inner.pipe(size, Box::new(s.clone())).await {
                 Ok(n) => return Ok(n),
                 Err(e) if !e.is_temporary() => return Err(e),
                 Err(e) => match backoff.next() {
@@ -947,7 +947,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
                             &e,
                             dur,
                             &[
-                                ("operation", 
WriteOperation::Sink.into_static()),
+                                ("operation", 
WriteOperation::Pipe.into_static()),
                                 ("path", &self.path),
                             ],
                         );
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index a88d1c701..aea598c3a 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -242,8 +242,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
-        self.inner.sink(size, s).await
+    async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
+        self.inner.pipe(size, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index be2289d04..421202023 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -335,14 +335,14 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
             })?
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         let timeout = self.io_timeout(size);
 
-        tokio::time::timeout(timeout, self.inner.sink(size, s))
+        tokio::time::timeout(timeout, self.inner.pipe(size, s))
             .await
             .map_err(|_| {
                 Error::new(ErrorKind::Unexpected, "operation timeout")
-                    .with_operation(WriteOperation::Sink)
+                    .with_operation(WriteOperation::Pipe)
                     .with_context("timeout", timeout.as_secs_f64().to_string())
                     .set_temporary()
             })?
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 33dcbdebc..002fe314f 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -332,8 +332,8 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
-        self.inner.sink(size, s).await
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+        self.inner.pipe(size, s).await
     }
 
     #[tracing::instrument(
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index be4913ff9..f3406b8ae 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -397,7 +397,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 48232c1fc..aeaab9864 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -410,7 +410,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index c9b670ead..796b2a349 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -180,7 +180,7 @@ impl oio::Stream for Cursor {
 /// ChunkedCursor is used represents a non-contiguous bytes in memory.
 ///
 /// This is useful when we buffer users' random writes without copy. 
ChunkedCursor implements
-/// [`oio::Stream`] so it can be used in [`oio::Write::sink`] directly.
+/// [`oio::Stream`] so it can be used in [`oio::Write::pipe`] directly.
 ///
 /// # TODO
 ///
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index 8ced843da..2bdcd426b 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -30,8 +30,8 @@ use crate::*;
 pub enum WriteOperation {
     /// Operation for [`Write::write`]
     Write,
-    /// Operation for [`Write::sink`]
-    Sink,
+    /// Operation for [`Write::pipe`]
+    Pipe,
     /// Operation for [`Write::abort`]
     Abort,
     /// Operation for [`Write::close`]
@@ -61,7 +61,7 @@ impl From<WriteOperation> for &'static str {
 
         match v {
             Write => "Writer::write",
-            Sink => "Writer::sink",
+            Pipe => "Writer::pipe",
             Abort => "Writer::abort",
             Close => "Writer::close",
             BlockingWrite => "BlockingWriter::write",
@@ -96,7 +96,7 @@ pub trait Write: Unpin + Send + Sync {
     ///
     /// It's possible that `n < size`, caller should pass the remaining bytes
     /// repeatedly until all bytes has been written.
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64>;
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64>;
 
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
@@ -113,7 +113,7 @@ impl Write for () {
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
-    async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _: u64, _: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support sink",
@@ -144,8 +144,8 @@ impl<T: Write + ?Sized> Write for Box<T> {
         (**self).write(bs).await
     }
 
-    async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<u64> {
-        (**self).sink(n, s).await
+    async fn pipe(&mut self, n: u64, s: oio::Streamer) -> Result<u64> {
+        (**self).pipe(n, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index b047ef43d..473d4feac 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -92,7 +92,7 @@ where
         Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
         let offset = self.offset().await?;
 
         self.inner
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
index 51f5d2645..62159ea28 100644
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -83,7 +83,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
         buf.push(bs);
 
         self.inner
-            .sink(buf.len() as u64, Box::new(buf))
+            .pipe(buf.len() as u64, Box::new(buf))
             .await
             // Clear buffer if the write is successful.
             .map(|v| {
@@ -92,11 +92,11 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
             })
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
         // If total size is known and equals to given stream, we can write it 
directly.
         if let Some(total_size) = self.total_size {
             if total_size == size {
-                return self.inner.sink(size, s).await;
+                return self.inner.pipe(size, s).await;
             }
         }
 
@@ -113,7 +113,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
         let stream = buf.chain(s);
 
         self.inner
-            .sink(buffer_size + size, Box::new(stream))
+            .pipe(buffer_size + size, Box::new(stream))
             .await
             // Clear buffer if the write is successful.
             .map(|v| {
@@ -130,7 +130,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
     async fn close(&mut self) -> Result<()> {
         if !self.buffer.is_empty() {
             self.inner
-                .sink(self.buffer.len() as u64, Box::new(self.buffer.clone()))
+                .pipe(self.buffer.len() as u64, Box::new(self.buffer.clone()))
                 .await?;
             self.buffer.clear();
         }
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index 79ddfc5ed..287db9bad 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -64,10 +64,10 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for 
TwoWaysWriter<ONE, TWO> {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
         match self {
-            Self::One(one) => one.sink(size, s).await,
-            Self::Two(two) => two.sink(size, s).await,
+            Self::One(one) => one.pipe(size, s).await,
+            Self::Two(two) => two.pipe(size, s).await,
         }
     }
 
@@ -110,11 +110,11 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> 
oio::Write
         }
     }
 
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
         match self {
-            Self::One(one) => one.sink(size, s).await,
-            Self::Two(two) => two.sink(size, s).await,
-            Self::Three(three) => three.sink(size, s).await,
+            Self::One(one) => one.pipe(size, s).await,
+            Self::Two(two) => two.pipe(size, s).await,
+            Self::Three(three) => three.pipe(size, s).await,
         }
     }
 
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 8e2d8a922..7c597e7d1 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -85,20 +85,20 @@ impl<W: oio::Write> ExactBufWriter<W> {
 #[async_trait]
 impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
     async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
+        self.pipe(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
             .await
     }
 
     /// # TODO
     ///
     /// We know every stream size, we can collect them into a buffer without 
chain them every time.
-    async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _: u64, mut s: Streamer) -> Result<u64> {
         if self.buffer.len() >= self.buffer_size {
             let mut buf = self.buffer.clone();
             let to_write = buf.split_to(self.buffer_size);
             return self
                 .inner
-                .sink(to_write.len() as u64, Box::new(to_write))
+                .pipe(to_write.len() as u64, Box::new(to_write))
                 .await
                 // Replace buffer with remaining if the write is successful.
                 .map(|v| {
@@ -128,7 +128,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
 
         let to_write = buf.split_to(self.buffer_size);
         self.inner
-            .sink(to_write.len() as u64, Box::new(to_write))
+            .pipe(to_write.len() as u64, Box::new(to_write))
             .await
             // Replace buffer with remaining if the write is successful.
             .map(|v| {
@@ -161,7 +161,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                 let mut buf = self.buffer.clone();
                 let to_write = buf.split_to(self.buffer_size);
                 self.inner
-                    .sink(to_write.len() as u64, Box::new(to_write))
+                    .pipe(to_write.len() as u64, Box::new(to_write))
                     .await
                     // Replace buffer with remaining if the write is 
successful.
                     .map(|_| {
@@ -175,7 +175,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
             let to_write = buf.split_to(min(self.buffer_size, buf.len()));
 
             self.inner
-                .sink(to_write.len() as u64, Box::new(to_write))
+                .pipe(to_write.len() as u64, Box::new(to_write))
                 .await
                 // Replace buffer with remaining if the write is successful.
                 .map(|_| self.buffer = buf)?;
@@ -212,7 +212,7 @@ mod tests {
             Ok(bs.len() as u64)
         }
 
-        async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+        async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> {
             let bs = s.collect().await?;
             assert_eq!(bs.len() as u64, size);
             self.write(bs).await
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 7bfd0342c..232550607 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -138,7 +138,7 @@ where
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         let upload_id = self.upload_id().await?;
 
         self.inner
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index e6fe47616..11dbb501e 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -58,7 +58,7 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
         Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.inner.write_once(size, s).await?;
         Ok(size)
     }
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index a3b8abe30..dfd4e378a 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -180,7 +180,7 @@ impl oio::Write for AzblobWriter {
         Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         if self.op.append() {
             self.append_oneshot(size, AsyncBody::Stream(s)).await?;
         } else {
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index ff1125bfa..a03cec2a1 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -88,7 +88,7 @@ impl oio::Write for AzdfsWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 1b5b6b17d..1405fe224 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for DropboxWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 9ca571077..80637f068 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -67,7 +67,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
         Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> {
         while let Some(bs) = s.next().await {
             let bs = bs?;
             self.f
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 18dd6fed9..3ce495795 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -55,7 +55,7 @@ impl oio::Write for FtpWriter {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 8c4431302..305624ed8 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -167,7 +167,7 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.write_oneshot(size, AsyncBody::Stream(s)).await?;
         Ok(size)
     }
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index b33137137..cfe027b6c 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -106,7 +106,7 @@ impl oio::Write for GdriveWriter {
         Ok(size)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
     }
 
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 6bd4bf057..59084c70e 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for GhacWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 011c8352e..4b05c08de 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 43a46e500..108f67e39 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for IpmfsWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 1086f3fde..edf55c127 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for OneDriveWriter {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 71ac41d7c..9f9e1f3dc 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -43,7 +43,7 @@ impl oio::Write for SftpWriter {
         Ok(size)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index b786896c2..ee3ee8251 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -77,7 +77,7 @@ impl oio::Write for SupabaseWriter {
         Ok(size as u64)
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index 1db2d230f..b2cf603de 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for VercelArtifactsWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 130e8e911..c0ddf30b1 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -65,7 +65,7 @@ impl oio::Write for WasabiWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 8dc093e65..1b6e7cfa5 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -70,7 +70,7 @@ impl oio::Write for WebdavWriter {
         Ok(size)
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
         self.write_oneshot(size, AsyncBody::Stream(s)).await?;
 
         Ok(size)
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 1b055f122..dbb4b409d 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -64,7 +64,7 @@ impl oio::Write for WebhdfsWriter {
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+    async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index f4b93dfa6..eef54f164 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -139,7 +139,7 @@ impl Writer {
     {
         if let State::Idle(Some(w)) = &mut self.state {
             let s = Box::new(oio::into_stream(sink_from.map_ok(|v| v.into())));
-            w.sink(size, s).await
+            w.pipe(size, s).await
         } else {
             unreachable!(
                 "writer state invalid while sink, expect Idle, actual {}",
@@ -184,7 +184,7 @@ impl Writer {
     {
         if let State::Idle(Some(w)) = &mut self.state {
             let s = Box::new(oio::into_stream_from_reader(read_from));
-            w.sink(size, s).await
+            w.pipe(size, s).await
         } else {
             unreachable!(
                 "writer state invalid while copy, expect Idle, actual {}",


Reply via email to