This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 0ac6d29de refactor(core): Rename confusing pipe into copy_from (#3015)
0ac6d29de is described below
commit 0ac6d29deaa7cddc7e774bed64cb4571ec9c5041
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 6 14:22:36 2023 +0800
refactor(core): Rename confusing pipe into copy_from (#3015)
---
core/benches/oio/utils.rs | 2 +-
core/src/layers/complete.rs | 4 ++--
core/src/layers/concurrent_limit.rs | 4 ++--
core/src/layers/error_context.rs | 6 +++---
core/src/layers/logging.rs | 8 ++++----
core/src/layers/madsim.rs | 2 +-
core/src/layers/metrics.rs | 4 ++--
core/src/layers/minitrace.rs | 6 +++---
core/src/layers/oteltrace.rs | 4 ++--
core/src/layers/prometheus.rs | 4 ++--
core/src/layers/retry.rs | 6 +++---
core/src/layers/throttle.rs | 4 ++--
core/src/layers/timeout.rs | 6 +++---
core/src/layers/tracing.rs | 4 ++--
core/src/raw/adapters/kv/backend.rs | 2 +-
core/src/raw/adapters/typed_kv/backend.rs | 2 +-
core/src/raw/oio/cursor.rs | 2 +-
core/src/raw/oio/write/api.rs | 16 ++++++++--------
core/src/raw/oio/write/append_object_write.rs | 2 +-
core/src/raw/oio/write/compose_write.rs | 14 +++++++-------
core/src/raw/oio/write/exact_buf_write.rs | 4 ++--
core/src/raw/oio/write/multipart_upload_write.rs | 2 +-
core/src/raw/oio/write/one_shot_write.rs | 2 +-
core/src/services/azblob/writer.rs | 2 +-
core/src/services/azdfs/writer.rs | 2 +-
core/src/services/dropbox/writer.rs | 2 +-
core/src/services/fs/writer.rs | 2 +-
core/src/services/ftp/writer.rs | 2 +-
core/src/services/gcs/writer.rs | 2 +-
core/src/services/gdrive/writer.rs | 2 +-
core/src/services/ghac/writer.rs | 2 +-
core/src/services/hdfs/writer.rs | 2 +-
core/src/services/ipmfs/writer.rs | 2 +-
core/src/services/onedrive/writer.rs | 2 +-
core/src/services/sftp/writer.rs | 2 +-
core/src/services/supabase/writer.rs | 2 +-
core/src/services/vercel_artifacts/writer.rs | 2 +-
core/src/services/wasabi/writer.rs | 2 +-
core/src/services/webdav/writer.rs | 2 +-
core/src/services/webhdfs/writer.rs | 2 +-
core/src/types/writer.rs | 4 ++--
41 files changed, 74 insertions(+), 74 deletions(-)
diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 53aa665ca..0171b9abf 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -30,7 +30,7 @@ impl oio::Write for BlackHoleWriter {
Ok(bs.len() as u64)
}
- async fn pipe(&mut self, size: u64, _: oio::Reader) ->
opendal::Result<u64> {
+ async fn copy_from(&mut self, size: u64, _: oio::Reader) ->
opendal::Result<u64> {
Ok(size)
}
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index db470334c..63ff3aa0d 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -734,7 +734,7 @@ where
Ok(n)
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
if let Some(total_size) = self.size {
if self.written + size > total_size {
return Err(Error::new(
@@ -750,7 +750,7 @@ where
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
- let n = w.pipe(size, s).await?;
+ let n = w.copy_from(size, s).await?;
self.written += n;
Ok(n)
}
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index c1504e6a2..22f5d4c52 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -293,8 +293,8 @@ impl<R: oio::Write> oio::Write for
ConcurrentLimitWrapper<R> {
self.inner.abort().await
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.pipe(size, s).await
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.copy_from(size, s).await
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 33cebe92c..925f1d191 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -419,9 +419,9 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
})
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.pipe(size, s).await.map_err(|err| {
- err.with_operation(WriteOperation::Pipe)
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.copy_from(size, s).await.map_err(|err| {
+ err.with_operation(WriteOperation::CopyFrom)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 5ed80a28b..1b0a6d6c0 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1285,15 +1285,15 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- match self.inner.pipe(size, s).await {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ match self.inner.copy_from(size, s).await {
Ok(n) => {
self.written += n;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data sink
{}B",
self.ctx.scheme,
- WriteOperation::Pipe,
+ WriteOperation::CopyFrom,
self.path,
self.written,
n
@@ -1307,7 +1307,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={} -> data
sink failed: {}",
self.ctx.scheme,
- WriteOperation::Pipe,
+ WriteOperation::CopyFrom,
self.path,
self.written,
self.ctx.error_print(&err),
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 55741389d..92f1925a8 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter {
}
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> crate::Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) ->
crate::Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index a96e83f8a..d4d65f839 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -861,9 +861,9 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
})
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
self.inner
- .pipe(size, s)
+ .copy_from(size, s)
.await
.map(|n| {
self.bytes += n;
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index f6487aa98..4daf0c056 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -347,11 +347,11 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
.await
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
self.inner
- .pipe(size, s)
+ .copy_from(size, s)
.in_span(Span::enter_with_parent(
- WriteOperation::Pipe.into_static(),
+ WriteOperation::CopyFrom.into_static(),
&self.span,
))
.await
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 9bd464be5..b574e7b98 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -317,8 +317,8 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
self.inner.write(bs).await
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.pipe(size, s).await
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.copy_from(size, s).await
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 70aae731e..35ce5144c 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -679,9 +679,9 @@ impl<R: oio::Write> oio::Write for
PrometheusMetricWrapper<R> {
})
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
self.inner
- .pipe(size, s)
+ .copy_from(size, s)
.await
.map(|n| {
self.stats
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index d5c8a0738..9d3408866 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -918,13 +918,13 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
/// The overhead is constant, which means the overhead will not increase
with the size of
/// stream. For example, if every `next` call cost 1ms, then the overhead
will only take 0.005%
/// which is acceptable.
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
let s = oio::into_cloneable_reader_within_tokio(s);
let mut backoff = self.builder.build();
loop {
- match self.inner.pipe(size, Box::new(s.clone())).await {
+ match self.inner.copy_from(size, Box::new(s.clone())).await {
Ok(n) => return Ok(n),
Err(e) if !e.is_temporary() => return Err(e),
Err(e) => match backoff.next() {
@@ -947,7 +947,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
&e,
dur,
&[
- ("operation",
WriteOperation::Pipe.into_static()),
+ ("operation",
WriteOperation::CopyFrom.into_static()),
("path", &self.path),
],
);
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index d821aacbf..4edec592e 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -241,8 +241,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
}
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.pipe(size, s).await
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.copy_from(size, s).await
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 642b8a49b..e15cfeac9 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -335,14 +335,14 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
})?
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
let timeout = self.io_timeout(size);
- tokio::time::timeout(timeout, self.inner.pipe(size, s))
+ tokio::time::timeout(timeout, self.inner.copy_from(size, s))
.await
.map_err(|_| {
Error::new(ErrorKind::Unexpected, "operation timeout")
- .with_operation(WriteOperation::Pipe)
+ .with_operation(WriteOperation::CopyFrom)
.with_context("timeout", timeout.as_secs_f64().to_string())
.set_temporary()
})?
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 467d0a153..ca277fbd6 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -332,8 +332,8 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.pipe(size, s).await
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.copy_from(size, s).await
}
#[tracing::instrument(
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index ce9ad3005..38b7db998 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -397,7 +397,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
Ok(size as u64)
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/raw/adapters/typed_kv/backend.rs
b/core/src/raw/adapters/typed_kv/backend.rs
index 494118e84..07b919f79 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -410,7 +410,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
Ok(size as u64)
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 796b2a349..2c79451c1 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -180,7 +180,7 @@ impl oio::Stream for Cursor {
/// ChunkedCursor is used represents a non-contiguous bytes in memory.
///
/// This is useful when we buffer users' random writes without copy.
ChunkedCursor implements
-/// [`oio::Stream`] so it can be used in [`oio::Write::pipe`] directly.
+/// [`oio::Stream`] so it can be used in [`oio::Write::copy_from`] directly.
///
/// # TODO
///
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index ba5a2b675..952c073ae 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -30,8 +30,8 @@ use crate::*;
pub enum WriteOperation {
/// Operation for [`Write::write`]
Write,
- /// Operation for [`Write::pipe`]
- Pipe,
+ /// Operation for [`Write::copy_from`]
+ CopyFrom,
/// Operation for [`Write::abort`]
Abort,
/// Operation for [`Write::close`]
@@ -61,7 +61,7 @@ impl From<WriteOperation> for &'static str {
match v {
Write => "Writer::write",
- Pipe => "Writer::pipe",
+ CopyFrom => "Writer::copy_from",
Abort => "Writer::abort",
Close => "Writer::close",
BlockingWrite => "BlockingWriter::write",
@@ -87,7 +87,7 @@ pub trait Write: Unpin + Send + Sync {
/// repeatedly until all bytes has been written.
async fn write(&mut self, bs: Bytes) -> Result<u64>;
- /// Sink given stream into writer.
+ /// Copy from given reader into the writer.
///
/// # Behavior
///
@@ -96,7 +96,7 @@ pub trait Write: Unpin + Send + Sync {
///
/// It's possible that `n < size`, caller should pass the remaining bytes
/// repeatedly until all bytes has been written.
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64>;
+ async fn copy_from(&mut self, size: u64, src: oio::Reader) -> Result<u64>;
/// Abort the pending writer.
async fn abort(&mut self) -> Result<()>;
@@ -113,7 +113,7 @@ impl Write for () {
unimplemented!("write is required to be implemented for oio::Write")
}
- async fn pipe(&mut self, _: u64, _: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _: u64, _: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"output writer doesn't support sink",
@@ -144,8 +144,8 @@ impl<T: Write + ?Sized> Write for Box<T> {
(**self).write(bs).await
}
- async fn pipe(&mut self, n: u64, s: oio::Reader) -> Result<u64> {
- (**self).pipe(n, s).await
+ async fn copy_from(&mut self, n: u64, s: oio::Reader) -> Result<u64> {
+ (**self).copy_from(n, s).await
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/append_object_write.rs
b/core/src/raw/oio/write/append_object_write.rs
index 0e5fd9ed3..b7f1ff5c6 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -91,7 +91,7 @@ where
Ok(size)
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
let offset = self.offset().await?;
self.inner
diff --git a/core/src/raw/oio/write/compose_write.rs
b/core/src/raw/oio/write/compose_write.rs
index 05dfdd775..1afb7735e 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -63,10 +63,10 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for
TwoWaysWriter<ONE, TWO> {
}
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
match self {
- Self::One(one) => one.pipe(size, s).await,
- Self::Two(two) => two.pipe(size, s).await,
+ Self::One(one) => one.copy_from(size, s).await,
+ Self::Two(two) => two.copy_from(size, s).await,
}
}
@@ -109,11 +109,11 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write>
oio::Write
}
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
match self {
- Self::One(one) => one.pipe(size, s).await,
- Self::Two(two) => two.pipe(size, s).await,
- Self::Three(three) => three.pipe(size, s).await,
+ Self::One(one) => one.copy_from(size, s).await,
+ Self::Two(two) => two.copy_from(size, s).await,
+ Self::Three(three) => three.copy_from(size, s).await,
}
}
diff --git a/core/src/raw/oio/write/exact_buf_write.rs
b/core/src/raw/oio/write/exact_buf_write.rs
index b2209ed73..c0ecac3aa 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -90,7 +90,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
}
}
- async fn pipe(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
loop {
match &mut self.buffer {
Buffer::Filling(fill) => {
@@ -185,7 +185,7 @@ mod tests {
Ok(bs.len() as u64)
}
- async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64>
{
+ async fn copy_from(&mut self, size: u64, mut s: oio::Reader) ->
Result<u64> {
let mut bs = vec![];
s.read_to_end(&mut bs).await.unwrap();
assert_eq!(bs.len() as u64, size);
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs
b/core/src/raw/oio/write/multipart_upload_write.rs
index 0b314c163..e39d59ca5 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -138,7 +138,7 @@ where
Ok(size as u64)
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
let upload_id = self.upload_id().await?;
self.inner
diff --git a/core/src/raw/oio/write/one_shot_write.rs
b/core/src/raw/oio/write/one_shot_write.rs
index a85feecf3..65ae8a113 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -58,7 +58,7 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
Ok(size)
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
self.inner.write_once(size, Box::new(s)).await?;
Ok(size)
}
diff --git a/core/src/services/azblob/writer.rs
b/core/src/services/azblob/writer.rs
index 11776755f..fbe509de9 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -180,7 +180,7 @@ impl oio::Write for AzblobWriter {
Ok(size)
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
if self.op.append() {
self.append_oneshot(size, AsyncBody::Stream(Box::new(s)))
.await?;
diff --git a/core/src/services/azdfs/writer.rs
b/core/src/services/azdfs/writer.rs
index 4b9b90f9f..11c822fb1 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -88,7 +88,7 @@ impl oio::Write for AzdfsWriter {
}
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/dropbox/writer.rs
b/core/src/services/dropbox/writer.rs
index 37abab6a9..7cbf1a4d7 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for DropboxWriter {
}
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index ab27f2b65..78b1c99cb 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -67,7 +67,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
Ok(size)
}
- async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, mut s: oio::Reader) ->
Result<u64> {
while let Some(bs) = s.next().await {
let bs = bs?;
self.f
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 3bb11582d..ffc08fbf1 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -55,7 +55,7 @@ impl oio::Write for FtpWriter {
Ok(size as u64)
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 2e95f0b79..e9152f66c 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -161,7 +161,7 @@ impl oio::Write for GcsWriter {
}
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
.await?;
Ok(size)
diff --git a/core/src/services/gdrive/writer.rs
b/core/src/services/gdrive/writer.rs
index df974d940..174923c90 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -106,7 +106,7 @@ impl oio::Write for GdriveWriter {
Ok(size)
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
}
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index bf9116ec2..5738f5d58 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for GhacWriter {
}
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 3b60a4df3..3353059e6 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
Ok(size as u64)
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/ipmfs/writer.rs
b/core/src/services/ipmfs/writer.rs
index 1434e980a..98afb6cbb 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for IpmfsWriter {
}
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/onedrive/writer.rs
b/core/src/services/onedrive/writer.rs
index 75a5e023a..8a0c84493 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for OneDriveWriter {
Ok(size as u64)
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 5ee5d84ac..1c8fc9ba9 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -43,7 +43,7 @@ impl oio::Write for SftpWriter {
Ok(size)
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/supabase/writer.rs
b/core/src/services/supabase/writer.rs
index 36a572bfe..406b9e5cd 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -77,7 +77,7 @@ impl oio::Write for SupabaseWriter {
Ok(size as u64)
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/vercel_artifacts/writer.rs
b/core/src/services/vercel_artifacts/writer.rs
index 936cddf01..68edbbfed 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for VercelArtifactsWriter {
}
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/wasabi/writer.rs
b/core/src/services/wasabi/writer.rs
index a39c835dc..3b358a15f 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -65,7 +65,7 @@ impl oio::Write for WasabiWriter {
}
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/webdav/writer.rs
b/core/src/services/webdav/writer.rs
index bbe79eb31..084fc08a7 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -70,7 +70,7 @@ impl oio::Write for WebdavWriter {
Ok(size)
}
- async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
.await?;
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index ae3396dc3..2116a30e8 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -64,7 +64,7 @@ impl oio::Write for WebhdfsWriter {
}
}
- async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
+ async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 9479b7c73..9173e4611 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -139,7 +139,7 @@ impl Writer {
{
if let State::Idle(Some(w)) = &mut self.state {
let r = Box::new(oio::into_read_from_stream(sink_from.map_ok(|v|
v.into())));
- w.pipe(size, r).await
+ w.copy_from(size, r).await
} else {
unreachable!(
"writer state invalid while sink, expect Idle, actual {}",
@@ -187,7 +187,7 @@ impl Writer {
oio::into_read_from_file(read_from, 0, size),
64 * 1024,
));
- w.pipe(size, r).await
+ w.copy_from(size, r).await
} else {
unreachable!(
"writer state invalid while copy, expect Idle, actual {}",