This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-writer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit e6720722324c55a414f85c3ad9a05b88cbf1b57c Author: Xuanwo <[email protected]> AuthorDate: Mon Sep 4 14:53:17 2023 +0800 Rename to pipe Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/utils.rs | 2 +- core/src/layers/complete.rs | 4 ++-- core/src/layers/concurrent_limit.rs | 4 ++-- core/src/layers/error_context.rs | 6 +++--- core/src/layers/logging.rs | 8 ++++---- core/src/layers/madsim.rs | 2 +- core/src/layers/metrics.rs | 4 ++-- core/src/layers/minitrace.rs | 6 +++--- core/src/layers/oteltrace.rs | 4 ++-- core/src/layers/prometheus.rs | 4 ++-- core/src/layers/retry.rs | 6 +++--- core/src/layers/throttle.rs | 4 ++-- core/src/layers/timeout.rs | 6 +++--- core/src/layers/tracing.rs | 4 ++-- core/src/raw/adapters/kv/backend.rs | 2 +- core/src/raw/adapters/typed_kv/backend.rs | 2 +- core/src/raw/oio/cursor.rs | 2 +- core/src/raw/oio/write/api.rs | 14 +++++++------- core/src/raw/oio/write/append_object_write.rs | 2 +- core/src/raw/oio/write/at_least_buf_write.rs | 10 +++++----- core/src/raw/oio/write/compose_write.rs | 14 +++++++------- core/src/raw/oio/write/exact_buf_write.rs | 14 +++++++------- core/src/raw/oio/write/multipart_upload_write.rs | 2 +- core/src/raw/oio/write/one_shot_write.rs | 2 +- core/src/services/azblob/writer.rs | 2 +- core/src/services/azdfs/writer.rs | 2 +- core/src/services/dropbox/writer.rs | 2 +- core/src/services/fs/writer.rs | 2 +- core/src/services/ftp/writer.rs | 2 +- core/src/services/gcs/writer.rs | 2 +- core/src/services/gdrive/writer.rs | 2 +- core/src/services/ghac/writer.rs | 2 +- core/src/services/hdfs/writer.rs | 2 +- core/src/services/ipmfs/writer.rs | 2 +- core/src/services/onedrive/writer.rs | 2 +- core/src/services/sftp/writer.rs | 2 +- core/src/services/supabase/writer.rs | 2 +- core/src/services/vercel_artifacts/writer.rs | 2 +- core/src/services/wasabi/writer.rs | 2 +- core/src/services/webdav/writer.rs | 2 +- core/src/services/webhdfs/writer.rs | 2 +- core/src/types/writer.rs | 4 ++-- 42 files changed, 83 insertions(+), 83 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index 9a14442c2..88e764c87 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -31,7 +31,7 @@ impl oio::Write for BlackHoleWriter { Ok(bs.len() as u64) } - async fn sink(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> { + async fn pipe(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> { Ok(size) } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index e986d1a47..21efc47f8 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -734,7 +734,7 @@ where Ok(n as u64) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { if let Some(total_size) = self.size { if self.written + size > total_size { return Err(Error::new( @@ -750,7 +750,7 @@ where let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - let n = w.sink(size, s).await?; + let n = w.pipe(size, s).await?; self.written += n; Ok(n) } diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 96a682d61..7384547f6 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -293,8 +293,8 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { self.inner.abort().await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { - self.inner.sink(size, s).await + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + self.inner.pipe(size, s).await } async fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 2acd6dd7d..536b7c956 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -419,9 +419,9 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { }) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { - self.inner.sink(size, s).await.map_err(|err| { - err.with_operation(WriteOperation::Sink) + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + self.inner.pipe(size, s).await.map_err(|err| { + err.with_operation(WriteOperation::Pipe) .with_context("service", self.scheme) .with_context("path", &self.path) }) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 6c63f466f..66708ec57 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1285,15 +1285,15 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { - match self.inner.sink(size, s).await { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + match self.inner.pipe(size, s).await { Ok(n) => { self.written += n; trace!( target: LOGGING_TARGET, "service={} operation={} path={} written={} -> data sink {}B", self.ctx.scheme, - WriteOperation::Sink, + WriteOperation::Pipe, self.path, self.written, n @@ -1307,7 +1307,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { lvl, "service={} operation={} path={} written={} -> data sink failed: {}", self.ctx.scheme, - WriteOperation::Sink, + WriteOperation::Pipe, self.path, self.written, self.ctx.error_print(&err), diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 17835e5ba..7dc193cb7 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter { } } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> crate::Result<u64> { Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 181ebb3c0..2439f0221 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -861,9 +861,9 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> { }) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner - .sink(size, s) + .pipe(size, s) .await .map(|n| { self.bytes += n; diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 75c852c3a..65f73ecd0 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -347,11 +347,11 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { .await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner - .sink(size, s) + .pipe(size, s) .in_span(Span::enter_with_parent( - WriteOperation::Sink.into_static(), + WriteOperation::Pipe.into_static(), &self.span, )) .await diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index fde87e9ba..a439d79b3 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -317,8 +317,8 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { self.inner.write(bs).await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { - self.inner.sink(size, s).await + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + self.inner.pipe(size, s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 644532bf6..4fec394de 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -679,9 +679,9 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { }) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner - .sink(size, s) + .pipe(size, s) .await .map(|n| { self.stats diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 07b92e24c..536860cde 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -919,13 +919,13 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { /// The overhead is constant, which means the overhead will not increase with the size of /// stream. For example, if every `next` call cost 1ms, then the overhead will only take 0.005% /// which is acceptable. - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { let s = Arc::new(Mutex::new(s)); let mut backoff = self.builder.build(); loop { - match self.inner.sink(size, Box::new(s.clone())).await { + match self.inner.pipe(size, Box::new(s.clone())).await { Ok(n) => return Ok(n), Err(e) if !e.is_temporary() => return Err(e), Err(e) => match backoff.next() { @@ -947,7 +947,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { &e, dur, &[ - ("operation", WriteOperation::Sink.into_static()), + ("operation", WriteOperation::Pipe.into_static()), ("path", &self.path), ], ); diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index a88d1c701..aea598c3a 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -242,8 +242,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { } } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { - self.inner.sink(size, s).await + async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { + self.inner.pipe(size, s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index be2289d04..421202023 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -335,14 +335,14 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { })? } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { let timeout = self.io_timeout(size); - tokio::time::timeout(timeout, self.inner.sink(size, s)) + tokio::time::timeout(timeout, self.inner.pipe(size, s)) .await .map_err(|_| { Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(WriteOperation::Sink) + .with_operation(WriteOperation::Pipe) .with_context("timeout", timeout.as_secs_f64().to_string()) .set_temporary() })? diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 33dcbdebc..002fe314f 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -332,8 +332,8 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { - self.inner.sink(size, s).await + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + self.inner.pipe(size, s).await } #[tracing::instrument( diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index be4913ff9..f3406b8ae 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -397,7 +397,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> { Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 48232c1fc..aeaab9864 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -410,7 +410,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> { Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index c9b670ead..796b2a349 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -180,7 +180,7 @@ impl oio::Stream for Cursor { /// ChunkedCursor is used represents a non-contiguous bytes in memory. /// /// This is useful when we buffer users' random writes without copy. ChunkedCursor implements -/// [`oio::Stream`] so it can be used in [`oio::Write::sink`] directly. +/// [`oio::Stream`] so it can be used in [`oio::Write::pipe`] directly. /// /// # TODO /// diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index 8ced843da..2bdcd426b 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -30,8 +30,8 @@ use crate::*; pub enum WriteOperation { /// Operation for [`Write::write`] Write, - /// Operation for [`Write::sink`] - Sink, + /// Operation for [`Write::pipe`] + Pipe, /// Operation for [`Write::abort`] Abort, /// Operation for [`Write::close`] @@ -61,7 +61,7 @@ impl From<WriteOperation> for &'static str { match v { Write => "Writer::write", - Sink => "Writer::sink", + Pipe => "Writer::pipe", Abort => "Writer::abort", Close => "Writer::close", BlockingWrite => "BlockingWriter::write", @@ -96,7 +96,7 @@ pub trait Write: Unpin + Send + Sync { /// /// It's possible that `n < size`, caller should pass the remaining bytes /// repeatedly until all bytes has been written. - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64>; + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64>; /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -113,7 +113,7 @@ impl Write for () { unimplemented!("write is required to be implemented for oio::Write") } - async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _: u64, _: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support sink", @@ -144,8 +144,8 @@ impl<T: Write + ?Sized> Write for Box<T> { (**self).write(bs).await } - async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<u64> { - (**self).sink(n, s).await + async fn pipe(&mut self, n: u64, s: oio::Streamer) -> Result<u64> { + (**self).pipe(n, s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index b047ef43d..473d4feac 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -92,7 +92,7 @@ where Ok(size) } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { let offset = self.offset().await?; self.inner diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 51f5d2645..62159ea28 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -83,7 +83,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { buf.push(bs); self.inner - .sink(buf.len() as u64, Box::new(buf)) + .pipe(buf.len() as u64, Box::new(buf)) .await // Clear buffer if the write is successful. .map(|v| { @@ -92,11 +92,11 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { }) } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { // If total size is known and equals to given stream, we can write it directly. if let Some(total_size) = self.total_size { if total_size == size { - return self.inner.sink(size, s).await; + return self.inner.pipe(size, s).await; } } @@ -113,7 +113,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { let stream = buf.chain(s); self.inner - .sink(buffer_size + size, Box::new(stream)) + .pipe(buffer_size + size, Box::new(stream)) .await // Clear buffer if the write is successful. .map(|v| { @@ -130,7 +130,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { async fn close(&mut self) -> Result<()> { if !self.buffer.is_empty() { self.inner - .sink(self.buffer.len() as u64, Box::new(self.buffer.clone())) + .pipe(self.buffer.len() as u64, Box::new(self.buffer.clone())) .await?; self.buffer.clear(); } diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index 79ddfc5ed..287db9bad 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -64,10 +64,10 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { } } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { match self { - Self::One(one) => one.sink(size, s).await, - Self::Two(two) => two.sink(size, s).await, + Self::One(one) => one.pipe(size, s).await, + Self::Two(two) => two.pipe(size, s).await, } } @@ -110,11 +110,11 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write } } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { match self { - Self::One(one) => one.sink(size, s).await, - Self::Two(two) => two.sink(size, s).await, - Self::Three(three) => three.sink(size, s).await, + Self::One(one) => one.pipe(size, s).await, + Self::Two(two) => two.pipe(size, s).await, + Self::Three(three) => three.pipe(size, s).await, } } diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 8e2d8a922..7c597e7d1 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -85,20 +85,20 @@ impl<W: oio::Write> ExactBufWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for ExactBufWriter<W> { async fn write(&mut self, bs: Bytes) -> Result<u64> { - self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))) + self.pipe(bs.len() as u64, Box::new(oio::Cursor::from(bs))) .await } /// # TODO /// /// We know every stream size, we can collect them into a buffer without chain them every time. - async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<u64> { + async fn pipe(&mut self, _: u64, mut s: Streamer) -> Result<u64> { if self.buffer.len() >= self.buffer_size { let mut buf = self.buffer.clone(); let to_write = buf.split_to(self.buffer_size); return self .inner - .sink(to_write.len() as u64, Box::new(to_write)) + .pipe(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|v| { @@ -128,7 +128,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let to_write = buf.split_to(self.buffer_size); self.inner - .sink(to_write.len() as u64, Box::new(to_write)) + .pipe(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|v| { @@ -161,7 +161,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let mut buf = self.buffer.clone(); let to_write = buf.split_to(self.buffer_size); self.inner - .sink(to_write.len() as u64, Box::new(to_write)) + .pipe(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|_| { @@ -175,7 +175,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let to_write = buf.split_to(min(self.buffer_size, buf.len())); self.inner - .sink(to_write.len() as u64, Box::new(to_write)) + .pipe(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|_| self.buffer = buf)?; @@ -212,7 +212,7 @@ mod tests { Ok(bs.len() as u64) } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { let bs = s.collect().await?; assert_eq!(bs.len() as u64, size); self.write(bs).await diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 7bfd0342c..232550607 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -138,7 +138,7 @@ where Ok(size as u64) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { let upload_id = self.upload_id().await?; self.inner diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index e6fe47616..11dbb501e 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -58,7 +58,7 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { Ok(size) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner.write_once(size, s).await?; Ok(size) } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index a3b8abe30..dfd4e378a 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -180,7 +180,7 @@ impl oio::Write for AzblobWriter { Ok(size) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { if self.op.append() { self.append_oneshot(size, AsyncBody::Stream(s)).await?; } else { diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index ff1125bfa..a03cec2a1 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -88,7 +88,7 @@ impl oio::Write for AzdfsWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 1b5b6b17d..1405fe224 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -62,7 +62,7 @@ impl oio::Write for DropboxWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 9ca571077..80637f068 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -67,7 +67,7 @@ impl oio::Write for FsWriter<tokio::fs::File> { Ok(size) } - async fn sink(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> { while let Some(bs) = s.next().await { let bs = bs?; self.f diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 18dd6fed9..3ce495795 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -55,7 +55,7 @@ impl oio::Write for FtpWriter { Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 8c4431302..305624ed8 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -167,7 +167,7 @@ impl oio::Write for GcsWriter { } } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.write_oneshot(size, AsyncBody::Stream(s)).await?; Ok(size) } diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index b33137137..cfe027b6c 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -106,7 +106,7 @@ impl oio::Write for GdriveWriter { Ok(size) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new(ErrorKind::Unsupported, "sink is not supported")) } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 6bd4bf057..59084c70e 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -62,7 +62,7 @@ impl oio::Write for GhacWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 011c8352e..4b05c08de 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -58,7 +58,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 43a46e500..108f67e39 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -53,7 +53,7 @@ impl oio::Write for IpmfsWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 1086f3fde..edf55c127 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -58,7 +58,7 @@ impl oio::Write for OneDriveWriter { Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 71ac41d7c..9f9e1f3dc 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -43,7 +43,7 @@ impl oio::Write for SftpWriter { Ok(size) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index b786896c2..ee3ee8251 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -77,7 +77,7 @@ impl oio::Write for SupabaseWriter { Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 1db2d230f..b2cf603de 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -62,7 +62,7 @@ impl oio::Write for VercelArtifactsWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 130e8e911..c0ddf30b1 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -65,7 +65,7 @@ impl oio::Write for WasabiWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index 8dc093e65..1b6e7cfa5 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -70,7 +70,7 @@ impl oio::Write for WebdavWriter { Ok(size) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.write_oneshot(size, AsyncBody::Stream(s)).await?; Ok(size) diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 1b055f122..dbb4b409d 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -64,7 +64,7 @@ impl oio::Write for WebhdfsWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index f4b93dfa6..eef54f164 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -139,7 +139,7 @@ impl Writer { { if let State::Idle(Some(w)) = &mut self.state { let s = Box::new(oio::into_stream(sink_from.map_ok(|v| v.into()))); - w.sink(size, s).await + w.pipe(size, s).await } else { unreachable!( "writer state invalid while sink, expect Idle, actual {}", @@ -184,7 +184,7 @@ impl Writer { { if let State::Idle(Some(w)) = &mut self.state { let s = Box::new(oio::into_stream_from_reader(read_from)); - w.sink(size, s).await + w.pipe(size, s).await } else { unreachable!( "writer state invalid while copy, expect Idle, actual {}",
