This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 5b7c6440fdb2ab2c3e45a1dfc46fb342edeec6e3
Author: Xuanwo <[email protected]>
AuthorDate: Tue Aug 29 17:55:55 2023 +0800

    Rename to write
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/utils.rs                        |  2 +-
 core/benches/oio/write.rs                        |  4 ++--
 core/src/layers/blocking.rs                      |  2 +-
 core/src/layers/complete.rs                      |  4 ++--
 core/src/layers/concurrent_limit.rs              |  4 ++--
 core/src/layers/error_context.rs                 |  4 ++--
 core/src/layers/logging.rs                       |  4 ++--
 core/src/layers/madsim.rs                        |  2 +-
 core/src/layers/metrics.rs                       |  4 ++--
 core/src/layers/minitrace.rs                     |  4 ++--
 core/src/layers/oteltrace.rs                     |  4 ++--
 core/src/layers/prometheus.rs                    |  4 ++--
 core/src/layers/retry.rs                         |  4 ++--
 core/src/layers/throttle.rs                      |  4 ++--
 core/src/layers/timeout.rs                       |  4 ++--
 core/src/layers/tracing.rs                       |  4 ++--
 core/src/raw/adapters/kv/backend.rs              |  2 +-
 core/src/raw/adapters/typed_kv/backend.rs        |  2 +-
 core/src/raw/oio/cursor.rs                       |  2 +-
 core/src/raw/oio/write/api.rs                    | 10 +++++-----
 core/src/raw/oio/write/append_object_write.rs    |  2 +-
 core/src/raw/oio/write/at_least_buf_write.rs     |  8 ++++----
 core/src/raw/oio/write/compose_write.rs          | 14 +++++++-------
 core/src/raw/oio/write/exact_buf_write.rs        | 16 ++++++++--------
 core/src/raw/oio/write/multipart_upload_write.rs |  2 +-
 core/src/raw/oio/write/one_shot_write.rs         |  2 +-
 core/src/services/azblob/writer.rs               |  2 +-
 core/src/services/azdfs/writer.rs                |  2 +-
 core/src/services/dropbox/writer.rs              |  2 +-
 core/src/services/fs/writer.rs                   |  2 +-
 core/src/services/ftp/writer.rs                  |  2 +-
 core/src/services/gcs/writer.rs                  |  2 +-
 core/src/services/gdrive/writer.rs               |  2 +-
 core/src/services/ghac/writer.rs                 |  2 +-
 core/src/services/hdfs/writer.rs                 |  2 +-
 core/src/services/ipmfs/writer.rs                |  2 +-
 core/src/services/onedrive/writer.rs             |  2 +-
 core/src/services/sftp/writer.rs                 |  2 +-
 core/src/services/supabase/writer.rs             |  2 +-
 core/src/services/vercel_artifacts/writer.rs     |  2 +-
 core/src/services/wasabi/writer.rs               |  2 +-
 core/src/services/webdav/writer.rs               |  2 +-
 core/src/services/webhdfs/writer.rs              |  2 +-
 core/src/types/operator/operator.rs              |  2 +-
 core/src/types/writer.rs                         | 12 +++++++-----
 45 files changed, 84 insertions(+), 82 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index e704f8d52..e90c439ca 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -27,7 +27,7 @@ pub struct BlackHoleWriter;
 
 #[async_trait]
 impl oio::Write for BlackHoleWriter {
-    async fn sink(&mut self, _: u64, _: Streamer) -> opendal::Result<()> {
+    async fn write(&mut self, _: u64, _: Streamer) -> opendal::Result<()> {
         Ok(())
     }
 
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index befc6c587..2fd5392f5 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -47,7 +47,7 @@ pub fn bench_at_least_buf_write(c: &mut Criterion) {
             b.to_async(&*TOKIO).iter(|| async {
                 let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024);
 
-                w.sink(
+                w.write(
                     content.len() as u64,
                     Box::new(oio::Cursor::from(content.clone())),
                 )
@@ -78,7 +78,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
         group.bench_with_input(size.to_string(), &content, |b, content| {
             b.to_async(&*TOKIO).iter(|| async {
                 let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024);
-                w.sink(
+                w.write(
                     content.len() as u64,
                     Box::new(oio::Cursor::from(content.clone())),
                 )
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 2dab3ea45..f5f264395 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -199,7 +199,7 @@ impl<I: oio::Write + 'static> oio::BlockingWrite for 
BlockingWrapper<I> {
     fn write(&mut self, bs: Bytes) -> Result<()> {
         self.handle.block_on(
             self.inner
-                .sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))),
+                .write(bs.len() as u64, Box::new(oio::Cursor::from(bs))),
         )
     }
 
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 13e009746..d0d39784c 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -711,7 +711,7 @@ impl<W> oio::Write for CompleteWriter<W>
 where
     W: oio::Write,
 {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         if let Some(total_size) = self.size {
             if self.written + size > total_size {
                 return Err(Error::new(
@@ -727,7 +727,7 @@ where
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        w.sink(size, s).await?;
+        w.write(size, s).await?;
         self.written += size;
         Ok(())
     }
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 781fb7610..e6521cfbb 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -285,8 +285,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ConcurrentLimitWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.sink(size, s).await
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        self.inner.write(size, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index a0da6a0a2..0e588fddc 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -411,8 +411,8 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.sink(size, s).await.map_err(|err| {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        self.inner.write(size, s).await.map_err(|err| {
             err.with_operation(WriteOperation::Sink)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 5b2f39e63..316a2c073 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1252,8 +1252,8 @@ impl<W> LoggingWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        match self.inner.sink(size, s).await {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        match self.inner.write(size, s).await {
             Ok(_) => {
                 self.written += size;
                 trace!(
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index e4b490c87..9ca4105ff 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -302,7 +302,7 @@ pub struct MadsimWriter {
 
 #[async_trait]
 impl oio::Write for MadsimWriter {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> 
{
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> 
crate::Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "will be supported in the future",
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 40ad53693..580e42f6b 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -847,9 +847,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.inner
-            .sink(size, s)
+            .write(size, s)
             .await
             .map(|_| self.bytes += size)
             .map_err(|err| {
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index da216cd61..b40d567c9 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,9 +337,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.inner
-            .sink(size, s)
+            .write(size, s)
             .in_span(Span::enter_with_parent(
                 WriteOperation::Sink.into_static(),
                 &self.span,
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 39455cd54..903ab098d 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -313,8 +313,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
OtelTraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.sink(size, s).await
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        self.inner.write(size, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 7f6b5aecb..82f249f03 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -662,9 +662,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.inner
-            .sink(size, s)
+            .write(size, s)
             .await
             .map(|_| {
                 self.stats
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 5127ca6e3..38511f527 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -893,13 +893,13 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
     /// The overhead is constant, which means the overhead will not increase 
with the size of
     /// stream. For example, if every `next` call cost 1ms, then the overhead 
will only take 0.005%
     /// which is acceptable.
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         let s = Arc::new(Mutex::new(s));
 
         let mut backoff = self.builder.build();
 
         loop {
-            match self.inner.sink(size, Box::new(s.clone())).await {
+            match self.inner.write(size, Box::new(s.clone())).await {
                 Ok(_) => return Ok(()),
                 Err(e) if !e.is_temporary() => return Err(e),
                 Err(e) => match backoff.next() {
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 89fabe71d..3429c14a4 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -217,8 +217,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ThrottleWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
-        self.inner.sink(size, s).await
+    async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
+        self.inner.write(size, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 8a894d17b..0dfec0212 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -322,10 +322,10 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
 
 #[async_trait]
 impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         let timeout = self.io_timeout(size);
 
-        tokio::time::timeout(timeout, self.inner.sink(size, s))
+        tokio::time::timeout(timeout, self.inner.write(size, s))
             .await
             .map_err(|_| {
                 Error::new(ErrorKind::Unexpected, "operation timeout")
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index e3812fd48..c11574dba 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -324,8 +324,8 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.inner.sink(size, s).await
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        self.inner.write(size, s).await
     }
 
     #[tracing::instrument(
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index c1314b2de..da402e86b 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -389,7 +389,7 @@ impl<S> KvWriter<S> {
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 76aee64f9..6500a2f1a 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -402,7 +402,7 @@ impl<S> KvWriter<S> {
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index c9b670ead..e02522cc6 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -180,7 +180,7 @@ impl oio::Stream for Cursor {
 /// ChunkedCursor is used represents a non-contiguous bytes in memory.
 ///
 /// This is useful when we buffer users' random writes without copy. 
ChunkedCursor implements
-/// [`oio::Stream`] so it can be used in [`oio::Write::sink`] directly.
+/// [`oio::Stream`] so it can be used in [`oio::Write::write`] directly.
 ///
 /// # TODO
 ///
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index 14bff1ec7..ba4705679 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -30,7 +30,7 @@ use crate::*;
 pub enum WriteOperation {
     /// Operation for [`Write::write`]
     Write,
-    /// Operation for [`Write::sink`]
+    /// Operation for [`Write::write`]
     Sink,
     /// Operation for [`Write::abort`]
     Abort,
@@ -95,7 +95,7 @@ pub trait Write: Unpin + Send + Sync {
     /// content length. And users will call write multiple times.
     ///
     /// Please make sure `write` is safe to re-enter.
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>;
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()>;
 
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
@@ -106,7 +106,7 @@ pub trait Write: Unpin + Send + Sync {
 
 #[async_trait]
 impl Write for () {
-    async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _: u64, _: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "output writer doesn't support sink",
@@ -133,8 +133,8 @@ impl Write for () {
 /// To make Writer work as expected, we must add this impl.
 #[async_trait]
 impl<T: Write + ?Sized> Write for Box<T> {
-    async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<()> {
-        (**self).sink(n, s).await
+    async fn write(&mut self, n: u64, s: oio::Streamer) -> Result<()> {
+        (**self).write(n, s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index 7e02fc0ea..d1380c584 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -78,7 +78,7 @@ impl<W> oio::Write for AppendObjectWriter<W>
 where
     W: AppendObjectWrite,
 {
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
         let offset = self.offset().await?;
 
         self.inner
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
index 94be240f4..2c269cc25 100644
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -63,11 +63,11 @@ impl<W: oio::Write> AtLeastBufWriter<W> {
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
         // If total size is known and equals to given stream, we can write it 
directly.
         if let Some(total_size) = self.total_size {
             if total_size == size {
-                return self.inner.sink(size, s).await;
+                return self.inner.write(size, s).await;
             }
         }
 
@@ -82,7 +82,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
         let stream = buf.chain(s);
 
         self.inner
-            .sink(buffer_size + size, Box::new(stream))
+            .write(buffer_size + size, Box::new(stream))
             .await
             // Clear buffer if the write is successful.
             .map(|_| self.buffer.clear())
@@ -96,7 +96,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
     async fn close(&mut self) -> Result<()> {
         if !self.buffer.is_empty() {
             self.inner
-                .sink(self.buffer.len() as u64, Box::new(self.buffer.clone()))
+                .write(self.buffer.len() as u64, Box::new(self.buffer.clone()))
                 .await?;
             self.buffer.clear();
         }
diff --git a/core/src/raw/oio/write/compose_write.rs 
b/core/src/raw/oio/write/compose_write.rs
index ea5f3bf25..1cd7c6f72 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -56,10 +56,10 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
 
 #[async_trait]
 impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
         match self {
-            Self::One(one) => one.sink(size, s).await,
-            Self::Two(two) => two.sink(size, s).await,
+            Self::One(one) => one.write(size, s).await,
+            Self::Two(two) => two.write(size, s).await,
         }
     }
 
@@ -94,11 +94,11 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, 
THREE: oio::Write> {
 impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
     for ThreeWaysWriter<ONE, TWO, THREE>
 {
-    async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
         match self {
-            Self::One(one) => one.sink(size, s).await,
-            Self::Two(two) => two.sink(size, s).await,
-            Self::Three(three) => three.sink(size, s).await,
+            Self::One(one) => one.write(size, s).await,
+            Self::Two(two) => two.write(size, s).await,
+            Self::Three(three) => three.write(size, s).await,
         }
     }
 
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 59533db6a..1d25751a6 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -87,13 +87,13 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
     /// # TODO
     ///
     /// We know every stream size, we can collect them into a buffer without 
chain them every time.
-    async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<()> {
+    async fn write(&mut self, _: u64, mut s: Streamer) -> Result<()> {
         if self.buffer.len() >= self.buffer_size {
             let mut buf = self.buffer.clone();
             let to_write = buf.split_to(self.buffer_size);
             return self
                 .inner
-                .sink(to_write.len() as u64, Box::new(to_write))
+                .write(to_write.len() as u64, Box::new(to_write))
                 .await
                 // Replace buffer with remaining if the write is successful.
                 .map(|_| {
@@ -121,7 +121,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
 
         let to_write = buf.split_to(self.buffer_size);
         self.inner
-            .sink(to_write.len() as u64, Box::new(to_write))
+            .write(to_write.len() as u64, Box::new(to_write))
             .await
             // Replace buffer with remaining if the write is successful.
             .map(|_| {
@@ -153,7 +153,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                 let mut buf = self.buffer.clone();
                 let to_write = buf.split_to(self.buffer_size);
                 self.inner
-                    .sink(to_write.len() as u64, Box::new(to_write))
+                    .write(to_write.len() as u64, Box::new(to_write))
                     .await
                     // Replace buffer with remaining if the write is 
successful.
                     .map(|_| {
@@ -167,7 +167,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
             let to_write = buf.split_to(min(self.buffer_size, buf.len()));
 
             self.inner
-                .sink(to_write.len() as u64, Box::new(to_write))
+                .write(to_write.len() as u64, Box::new(to_write))
                 .await
                 // Replace buffer with remaining if the write is successful.
                 .map(|_| self.buffer = buf)?;
@@ -197,7 +197,7 @@ mod tests {
 
     #[async_trait]
     impl Write for MockWriter {
-        async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+        async fn write(&mut self, size: u64, s: Streamer) -> Result<()> {
             let bs = s.collect().await?;
             assert_eq!(bs.len() as u64, size);
             self.buf.extend_from_slice(&bs);
@@ -228,7 +228,7 @@ mod tests {
 
         let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10);
 
-        w.sink(
+        w.write(
             expected.len() as u64,
             Box::new(oio::Cursor::from(Bytes::from(expected.clone()))),
         )
@@ -266,7 +266,7 @@ mod tests {
 
             expected.extend_from_slice(&content);
             writer
-                .sink(
+                .write(
                     expected.len() as u64,
                     Box::new(oio::Cursor::from(Bytes::from(expected.clone()))),
                 )
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 0a0d49680..da8f01695 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -119,7 +119,7 @@ impl<W> oio::Write for MultipartUploadWriter<W>
 where
     W: MultipartUploadWrite,
 {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         let upload_id = self.upload_id().await?;
 
         self.inner
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index a3fcd1b2c..971c59878 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -48,7 +48,7 @@ impl<W: OneShotWrite> OneShotWriter<W> {
 
 #[async_trait]
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.inner.write_once(size, s).await
     }
 
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index 2db716b11..0b243072f 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -160,7 +160,7 @@ impl AzblobWriter {
 
 #[async_trait]
 impl oio::Write for AzblobWriter {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         if self.op.append() {
             self.append_oneshot(size, AsyncBody::Stream(s)).await
         } else {
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 2eda93e40..cf1c68cf4 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -86,7 +86,7 @@ impl AzdfsWriter {
 
 #[async_trait]
 impl oio::Write for AzdfsWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index f4a4e225b..cea9cb199 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -60,7 +60,7 @@ impl DropboxWriter {
 
 #[async_trait]
 impl oio::Write for DropboxWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 6b518b436..567e63bd9 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -50,7 +50,7 @@ impl<F> FsWriter<F> {
 
 #[async_trait]
 impl oio::Write for FsWriter<tokio::fs::File> {
-    async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> {
         while let Some(bs) = s.next().await {
             let bs = bs?;
             self.f
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 7816277c1..8c56f8e5d 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -53,7 +53,7 @@ impl FtpWriter {
 
 #[async_trait]
 impl oio::Write for FtpWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 70d79798a..e7e68f320 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -164,7 +164,7 @@ impl GcsWriter {
 
 #[async_trait]
 impl oio::Write for GcsWriter {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.write_oneshot(size, AsyncBody::Stream(s)).await
     }
 
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index 718307dc8..f4bc19b12 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -103,7 +103,7 @@ impl GdriveWriter {
 
 #[async_trait]
 impl oio::Write for GdriveWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
     }
 
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 9a088a0be..2bdd44507 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,7 +62,7 @@ impl GhacWriter {
 
 #[async_trait]
 impl oio::Write for GhacWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 8800ab241..6c9c679dc 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -58,7 +58,7 @@ impl HdfsWriter<hdrs::AsyncFile> {
 
 #[async_trait]
 impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 011624c23..a876aac3a 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -52,7 +52,7 @@ impl IpmfsWriter {
 
 #[async_trait]
 impl oio::Write for IpmfsWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 8b38e30ee..a320e3b87 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -56,7 +56,7 @@ impl OneDriveWriter {
 
 #[async_trait]
 impl oio::Write for OneDriveWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index cad5c2cca..c36b02c3b 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -42,7 +42,7 @@ impl SftpWriter {
 
 #[async_trait]
 impl oio::Write for SftpWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index 9eb661c10..5c364ab8d 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -75,7 +75,7 @@ impl SupabaseWriter {
 
 #[async_trait]
 impl oio::Write for SupabaseWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index 0d55ca277..6cb65dca6 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -60,7 +60,7 @@ impl VercelArtifactsWriter {
 
 #[async_trait]
 impl oio::Write for VercelArtifactsWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 4f3f1697a..1cf7b740f 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -63,7 +63,7 @@ impl WasabiWriter {
 
 #[async_trait]
 impl oio::Write for WasabiWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index aeaf0d781..2deb0bfec 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -67,7 +67,7 @@ impl WebdavWriter {
 
 #[async_trait]
 impl oio::Write for WebdavWriter {
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
         self.write_oneshot(size, AsyncBody::Stream(s)).await
     }
 
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 85e6834f4..03cbfdf87 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -62,7 +62,7 @@ impl WebhdfsWriter {
 
 #[async_trait]
 impl oio::Write for WebhdfsWriter {
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+    async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
             "Write::sink is not supported",
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index ab6bca5ac..442bc2188 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -730,7 +730,7 @@ impl Operator {
 
                     let (_, mut w) = inner.write(&path, args).await?;
                     // FIXME: we should bench here to measure the perf.
-                    w.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
+                    w.write(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
                         .await?;
                     w.close().await?;
 
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 8517a2f04..204508f8f 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -83,7 +83,7 @@ impl Writer {
     pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
         if let State::Idle(Some(w)) = &mut self.state {
             let bs = bs.into();
-            w.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
+            w.write(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
                 .await
         } else {
             unreachable!(
@@ -132,7 +132,7 @@ impl Writer {
     {
         if let State::Idle(Some(w)) = &mut self.state {
             let s = Box::new(oio::into_stream(sink_from.map_ok(|v| v.into())));
-            w.sink(size, s).await
+            w.write(size, s).await
         } else {
             unreachable!(
                 "writer state invalid while sink, expect Idle, actual {}",
@@ -177,7 +177,7 @@ impl Writer {
     {
         if let State::Idle(Some(w)) = &mut self.state {
             let s = Box::new(oio::into_stream_from_reader(read_from));
-            w.sink(size, s).await
+            w.write(size, s).await
         } else {
             unreachable!(
                 "writer state invalid while copy, expect Idle, actual {}",
@@ -253,7 +253,8 @@ impl AsyncWrite for Writer {
                     let size = bs.len();
                     let fut = async move {
                         // FIXME: we should bench here to measure the perf.
-                        w.sink(size as u64, 
Box::new(oio::Cursor::from(bs))).await?;
+                        w.write(size as u64, Box::new(oio::Cursor::from(bs)))
+                            .await?;
                         Ok((size, w))
                     };
                     self.state = State::Write(Box::pin(fut));
@@ -321,7 +322,8 @@ impl tokio::io::AsyncWrite for Writer {
                     let size = bs.len();
                     let fut = async move {
                         // FIXME: we should bench here to measure the perf.
-                        w.sink(size as u64, 
Box::new(oio::Cursor::from(bs))).await?;
+                        w.write(size as u64, Box::new(oio::Cursor::from(bs)))
+                            .await?;
                         Ok((size, w))
                     };
                     self.state = State::Write(Box::pin(fut));


Reply via email to