This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 5b7c6440fdb2ab2c3e45a1dfc46fb342edeec6e3 Author: Xuanwo <[email protected]> AuthorDate: Tue Aug 29 17:55:55 2023 +0800 Rename to write Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/utils.rs | 2 +- core/benches/oio/write.rs | 4 ++-- core/src/layers/blocking.rs | 2 +- core/src/layers/complete.rs | 4 ++-- core/src/layers/concurrent_limit.rs | 4 ++-- core/src/layers/error_context.rs | 4 ++-- core/src/layers/logging.rs | 4 ++-- core/src/layers/madsim.rs | 2 +- core/src/layers/metrics.rs | 4 ++-- core/src/layers/minitrace.rs | 4 ++-- core/src/layers/oteltrace.rs | 4 ++-- core/src/layers/prometheus.rs | 4 ++-- core/src/layers/retry.rs | 4 ++-- core/src/layers/throttle.rs | 4 ++-- core/src/layers/timeout.rs | 4 ++-- core/src/layers/tracing.rs | 4 ++-- core/src/raw/adapters/kv/backend.rs | 2 +- core/src/raw/adapters/typed_kv/backend.rs | 2 +- core/src/raw/oio/cursor.rs | 2 +- core/src/raw/oio/write/api.rs | 10 +++++----- core/src/raw/oio/write/append_object_write.rs | 2 +- core/src/raw/oio/write/at_least_buf_write.rs | 8 ++++---- core/src/raw/oio/write/compose_write.rs | 14 +++++++------- core/src/raw/oio/write/exact_buf_write.rs | 16 ++++++++-------- core/src/raw/oio/write/multipart_upload_write.rs | 2 +- core/src/raw/oio/write/one_shot_write.rs | 2 +- core/src/services/azblob/writer.rs | 2 +- core/src/services/azdfs/writer.rs | 2 +- core/src/services/dropbox/writer.rs | 2 +- core/src/services/fs/writer.rs | 2 +- core/src/services/ftp/writer.rs | 2 +- core/src/services/gcs/writer.rs | 2 +- core/src/services/gdrive/writer.rs | 2 +- core/src/services/ghac/writer.rs | 2 +- core/src/services/hdfs/writer.rs | 2 +- core/src/services/ipmfs/writer.rs | 2 +- core/src/services/onedrive/writer.rs | 2 +- core/src/services/sftp/writer.rs | 2 +- core/src/services/supabase/writer.rs | 2 +- core/src/services/vercel_artifacts/writer.rs | 2 +- core/src/services/wasabi/writer.rs | 2 +- core/src/services/webdav/writer.rs | 2 +- core/src/services/webhdfs/writer.rs | 2 +- core/src/types/operator/operator.rs | 2 +- core/src/types/writer.rs | 12 +++++++----- 45 files changed, 84 insertions(+), 82 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index e704f8d52..e90c439ca 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -27,7 +27,7 @@ pub struct BlackHoleWriter; #[async_trait] impl oio::Write for BlackHoleWriter { - async fn sink(&mut self, _: u64, _: Streamer) -> opendal::Result<()> { + async fn write(&mut self, _: u64, _: Streamer) -> opendal::Result<()> { Ok(()) } diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs index befc6c587..2fd5392f5 100644 --- a/core/benches/oio/write.rs +++ b/core/benches/oio/write.rs @@ -47,7 +47,7 @@ pub fn bench_at_least_buf_write(c: &mut Criterion) { b.to_async(&*TOKIO).iter(|| async { let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024); - w.sink( + w.write( content.len() as u64, Box::new(oio::Cursor::from(content.clone())), ) @@ -78,7 +78,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) { group.bench_with_input(size.to_string(), &content, |b, content| { b.to_async(&*TOKIO).iter(|| async { let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024); - w.sink( + w.write( content.len() as u64, Box::new(oio::Cursor::from(content.clone())), ) diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 2dab3ea45..f5f264395 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -199,7 +199,7 @@ impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> { fn write(&mut self, bs: Bytes) -> Result<()> { self.handle.block_on( self.inner - .sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))), + .write(bs.len() as u64, Box::new(oio::Cursor::from(bs))), ) } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 13e009746..d0d39784c 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -711,7 +711,7 @@ impl<W> oio::Write for CompleteWriter<W> where W: oio::Write, { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { if let Some(total_size) = self.size { if self.written + size > total_size { return Err(Error::new( @@ -727,7 +727,7 @@ where let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - w.sink(size, s).await?; + w.write(size, s).await?; self.written += size; Ok(()) } diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 781fb7610..e6521cfbb 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -285,8 +285,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.sink(size, s).await + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.write(size, s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index a0da6a0a2..0e588fddc 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -411,8 +411,8 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { }) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.sink(size, s).await.map_err(|err| { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.write(size, s).await.map_err(|err| { err.with_operation(WriteOperation::Sink) .with_context("service", self.scheme) .with_context("path", &self.path) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 5b2f39e63..316a2c073 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1252,8 +1252,8 @@ impl<W> LoggingWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for LoggingWriter<W> { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - match self.inner.sink(size, s).await { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + match self.inner.write(size, s).await { Ok(_) => { self.written += size; trace!( diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index e4b490c87..9ca4105ff 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -302,7 +302,7 @@ pub struct MadsimWriter { #[async_trait] impl oio::Write for MadsimWriter { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> { Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 40ad53693..580e42f6b 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -847,9 +847,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MetricWrapper<R> { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner - .sink(size, s) + .write(size, s) .await .map(|_| self.bytes += size) .map_err(|err| { diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index da216cd61..b40d567c9 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -337,9 +337,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner - .sink(size, s) + .write(size, s) .in_span(Span::enter_with_parent( WriteOperation::Sink.into_static(), &self.span, diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 39455cd54..903ab098d 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -313,8 +313,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.sink(size, s).await + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.write(size, s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 7f6b5aecb..82f249f03 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -662,9 +662,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner - .sink(size, s) + .write(size, s) .await .map(|_| { self.stats diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 5127ca6e3..38511f527 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -893,13 +893,13 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { /// The overhead is constant, which means the overhead will not increase with the size of /// stream. For example, if every `next` call cost 1ms, then the overhead will only take 0.005% /// which is acceptable. - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { let s = Arc::new(Mutex::new(s)); let mut backoff = self.builder.build(); loop { - match self.inner.sink(size, Box::new(s.clone())).await { + match self.inner.write(size, Box::new(s.clone())).await { Ok(_) => return Ok(()), Err(e) if !e.is_temporary() => return Err(e), Err(e) => match backoff.next() { diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 89fabe71d..3429c14a4 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -217,8 +217,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { - self.inner.sink(size, s).await + async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { + self.inner.write(size, s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 8a894d17b..0dfec0212 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -322,10 +322,10 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { let timeout = self.io_timeout(size); - tokio::time::timeout(timeout, self.inner.sink(size, s)) + tokio::time::timeout(timeout, self.inner.write(size, s)) .await .map_err(|_| { Error::new(ErrorKind::Unexpected, "operation timeout") diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index e3812fd48..c11574dba 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -324,8 +324,8 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.sink(size, s).await + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.write(size, s).await } #[tracing::instrument( diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index c1314b2de..da402e86b 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -389,7 +389,7 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 76aee64f9..6500a2f1a 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -402,7 +402,7 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index c9b670ead..e02522cc6 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -180,7 +180,7 @@ impl oio::Stream for Cursor { /// ChunkedCursor is used represents a non-contiguous bytes in memory. /// /// This is useful when we buffer users' random writes without copy. ChunkedCursor implements -/// [`oio::Stream`] so it can be used in [`oio::Write::sink`] directly. +/// [`oio::Stream`] so it can be used in [`oio::Write::write`] directly. /// /// # TODO /// diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index 14bff1ec7..ba4705679 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -30,7 +30,7 @@ use crate::*; pub enum WriteOperation { /// Operation for [`Write::write`] Write, - /// Operation for [`Write::sink`] + /// Operation for [`Write::write`] Sink, /// Operation for [`Write::abort`] Abort, @@ -95,7 +95,7 @@ pub trait Write: Unpin + Send + Sync { /// content length. And users will call write multiple times. /// /// Please make sure `write` is safe to re-enter. - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>; + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()>; /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -106,7 +106,7 @@ pub trait Write: Unpin + Send + Sync { #[async_trait] impl Write for () { - async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<()> { + async fn write(&mut self, _: u64, _: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support sink", @@ -133,8 +133,8 @@ impl Write for () { /// To make Writer work as expected, we must add this impl. #[async_trait] impl<T: Write + ?Sized> Write for Box<T> { - async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<()> { - (**self).sink(n, s).await + async fn write(&mut self, n: u64, s: oio::Streamer) -> Result<()> { + (**self).write(n, s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index 7e02fc0ea..d1380c584 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -78,7 +78,7 @@ impl<W> oio::Write for AppendObjectWriter<W> where W: AppendObjectWrite, { - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { let offset = self.offset().await?; self.inner diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 94be240f4..2c269cc25 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -63,11 +63,11 @@ impl<W: oio::Write> AtLeastBufWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { // If total size is known and equals to given stream, we can write it directly. if let Some(total_size) = self.total_size { if total_size == size { - return self.inner.sink(size, s).await; + return self.inner.write(size, s).await; } } @@ -82,7 +82,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { let stream = buf.chain(s); self.inner - .sink(buffer_size + size, Box::new(stream)) + .write(buffer_size + size, Box::new(stream)) .await // Clear buffer if the write is successful. .map(|_| self.buffer.clear()) @@ -96,7 +96,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { async fn close(&mut self) -> Result<()> { if !self.buffer.is_empty() { self.inner - .sink(self.buffer.len() as u64, Box::new(self.buffer.clone())) + .write(self.buffer.len() as u64, Box::new(self.buffer.clone())) .await?; self.buffer.clear(); } diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index ea5f3bf25..1cd7c6f72 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -56,10 +56,10 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> { #[async_trait] impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { match self { - Self::One(one) => one.sink(size, s).await, - Self::Two(two) => two.sink(size, s).await, + Self::One(one) => one.write(size, s).await, + Self::Two(two) => two.write(size, s).await, } } @@ -94,11 +94,11 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> { impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write for ThreeWaysWriter<ONE, TWO, THREE> { - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { match self { - Self::One(one) => one.sink(size, s).await, - Self::Two(two) => two.sink(size, s).await, - Self::Three(three) => three.sink(size, s).await, + Self::One(one) => one.write(size, s).await, + Self::Two(two) => two.write(size, s).await, + Self::Three(three) => three.write(size, s).await, } } diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 59533db6a..1d25751a6 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -87,13 +87,13 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { /// # TODO /// /// We know every stream size, we can collect them into a buffer without chain them every time. - async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<()> { + async fn write(&mut self, _: u64, mut s: Streamer) -> Result<()> { if self.buffer.len() >= self.buffer_size { let mut buf = self.buffer.clone(); let to_write = buf.split_to(self.buffer_size); return self .inner - .sink(to_write.len() as u64, Box::new(to_write)) + .write(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|_| { @@ -121,7 +121,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let to_write = buf.split_to(self.buffer_size); self.inner - .sink(to_write.len() as u64, Box::new(to_write)) + .write(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|_| { @@ -153,7 +153,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let mut buf = self.buffer.clone(); let to_write = buf.split_to(self.buffer_size); self.inner - .sink(to_write.len() as u64, Box::new(to_write)) + .write(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|_| { @@ -167,7 +167,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let to_write = buf.split_to(min(self.buffer_size, buf.len())); self.inner - .sink(to_write.len() as u64, Box::new(to_write)) + .write(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|_| self.buffer = buf)?; @@ -197,7 +197,7 @@ mod tests { #[async_trait] impl Write for MockWriter { - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { let bs = s.collect().await?; assert_eq!(bs.len() as u64, size); self.buf.extend_from_slice(&bs); @@ -228,7 +228,7 @@ mod tests { let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10); - w.sink( + w.write( expected.len() as u64, Box::new(oio::Cursor::from(Bytes::from(expected.clone()))), ) @@ -266,7 +266,7 @@ mod tests { expected.extend_from_slice(&content); writer - .sink( + .write( expected.len() as u64, Box::new(oio::Cursor::from(Bytes::from(expected.clone()))), ) diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 0a0d49680..da8f01695 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -119,7 +119,7 @@ impl<W> oio::Write for MultipartUploadWriter<W> where W: MultipartUploadWrite, { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { let upload_id = self.upload_id().await?; self.inner diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index a3fcd1b2c..971c59878 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -48,7 +48,7 @@ impl<W: OneShotWrite> OneShotWriter<W> { #[async_trait] impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner.write_once(size, s).await } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 2db716b11..0b243072f 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -160,7 +160,7 @@ impl AzblobWriter { #[async_trait] impl oio::Write for AzblobWriter { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { if self.op.append() { self.append_oneshot(size, AsyncBody::Stream(s)).await } else { diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 2eda93e40..cf1c68cf4 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -86,7 +86,7 @@ impl AzdfsWriter { #[async_trait] impl oio::Write for AzdfsWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index f4a4e225b..cea9cb199 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -60,7 +60,7 @@ impl DropboxWriter { #[async_trait] impl oio::Write for DropboxWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 6b518b436..567e63bd9 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -50,7 +50,7 @@ impl<F> FsWriter<F> { #[async_trait] impl oio::Write for FsWriter<tokio::fs::File> { - async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> { while let Some(bs) = s.next().await { let bs = bs?; self.f diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 7816277c1..8c56f8e5d 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -53,7 +53,7 @@ impl FtpWriter { #[async_trait] impl oio::Write for FtpWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 70d79798a..e7e68f320 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -164,7 +164,7 @@ impl GcsWriter { #[async_trait] impl oio::Write for GcsWriter { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.write_oneshot(size, AsyncBody::Stream(s)).await } diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index 718307dc8..f4bc19b12 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -103,7 +103,7 @@ impl GdriveWriter { #[async_trait] impl oio::Write for GdriveWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new(ErrorKind::Unsupported, "sink is not supported")) } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 9a088a0be..2bdd44507 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -62,7 +62,7 @@ impl GhacWriter { #[async_trait] impl oio::Write for GhacWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 8800ab241..6c9c679dc 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -58,7 +58,7 @@ impl HdfsWriter<hdrs::AsyncFile> { #[async_trait] impl oio::Write for HdfsWriter<hdrs::AsyncFile> { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 011624c23..a876aac3a 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -52,7 +52,7 @@ impl IpmfsWriter { #[async_trait] impl oio::Write for IpmfsWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 8b38e30ee..a320e3b87 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -56,7 +56,7 @@ impl OneDriveWriter { #[async_trait] impl oio::Write for OneDriveWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index cad5c2cca..c36b02c3b 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -42,7 +42,7 @@ impl SftpWriter { #[async_trait] impl oio::Write for SftpWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index 9eb661c10..5c364ab8d 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -75,7 +75,7 @@ impl SupabaseWriter { #[async_trait] impl oio::Write for SupabaseWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 0d55ca277..6cb65dca6 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -60,7 +60,7 @@ impl VercelArtifactsWriter { #[async_trait] impl oio::Write for VercelArtifactsWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 4f3f1697a..1cf7b740f 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -63,7 +63,7 @@ impl WasabiWriter { #[async_trait] impl oio::Write for WasabiWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index aeaf0d781..2deb0bfec 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -67,7 +67,7 @@ impl WebdavWriter { #[async_trait] impl oio::Write for WebdavWriter { - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.write_oneshot(size, AsyncBody::Stream(s)).await } diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 85e6834f4..03cbfdf87 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -62,7 +62,7 @@ impl WebhdfsWriter { #[async_trait] impl oio::Write for WebhdfsWriter { - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index ab6bca5ac..442bc2188 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -730,7 +730,7 @@ impl Operator { let (_, mut w) = inner.write(&path, args).await?; // FIXME: we should bench here to measure the perf. - w.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))) + w.write(bs.len() as u64, Box::new(oio::Cursor::from(bs))) .await?; w.close().await?; diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 8517a2f04..204508f8f 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -83,7 +83,7 @@ impl Writer { pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> { if let State::Idle(Some(w)) = &mut self.state { let bs = bs.into(); - w.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))) + w.write(bs.len() as u64, Box::new(oio::Cursor::from(bs))) .await } else { unreachable!( @@ -132,7 +132,7 @@ impl Writer { { if let State::Idle(Some(w)) = &mut self.state { let s = Box::new(oio::into_stream(sink_from.map_ok(|v| v.into()))); - w.sink(size, s).await + w.write(size, s).await } else { unreachable!( "writer state invalid while sink, expect Idle, actual {}", @@ -177,7 +177,7 @@ impl Writer { { if let State::Idle(Some(w)) = &mut self.state { let s = Box::new(oio::into_stream_from_reader(read_from)); - w.sink(size, s).await + w.write(size, s).await } else { unreachable!( "writer state invalid while copy, expect Idle, actual {}", @@ -253,7 +253,8 @@ impl AsyncWrite for Writer { let size = bs.len(); let fut = async move { // FIXME: we should bench here to measure the perf. - w.sink(size as u64, Box::new(oio::Cursor::from(bs))).await?; + w.write(size as u64, Box::new(oio::Cursor::from(bs))) + .await?; Ok((size, w)) }; self.state = State::Write(Box::pin(fut)); @@ -321,7 +322,8 @@ impl tokio::io::AsyncWrite for Writer { let size = bs.len(); let fut = async move { // FIXME: we should bench here to measure the perf. - w.sink(size as u64, Box::new(oio::Cursor::from(bs))).await?; + w.write(size as u64, Box::new(oio::Cursor::from(bs))) + .await?; Ok((size, w)) }; self.state = State::Write(Box::pin(fut));
