This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 098ef52ef refactor: Remove oio::Write::copy_from (#3018)
098ef52ef is described below
commit 098ef52ef6fb94cbe100a2da582c7541880e1e92
Author: Xuanwo <[email protected]>
AuthorDate: Thu Sep 7 13:27:08 2023 +0800
refactor: Remove oio::Write::copy_from (#3018)
* refactor: Remove oio::Write::copy_from
Signed-off-by: Xuanwo <[email protected]>
* Remove write can sink
Signed-off-by: Xuanwo <[email protected]>
* Fix test
Signed-off-by: Xuanwo <[email protected]>
* Fix test
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/benches/oio/utils.rs | 4 --
core/src/layers/complete.rs | 21 --------
core/src/layers/concurrent_limit.rs | 4 --
core/src/layers/error_context.rs | 8 ---
core/src/layers/logging.rs | 33 -------------
core/src/layers/madsim.rs | 7 ---
core/src/layers/metrics.rs | 14 ------
core/src/layers/minitrace.rs | 10 ----
core/src/layers/oteltrace.rs | 4 --
core/src/layers/prometheus.rs | 17 -------
core/src/layers/retry.rs | 61 -----------------------
core/src/layers/throttle.rs | 4 --
core/src/layers/timeout.rs | 13 -----
core/src/layers/tracing.rs | 8 ---
core/src/raw/adapters/kv/backend.rs | 7 ---
core/src/raw/adapters/typed_kv/backend.rs | 7 ---
core/src/raw/oio/write/api.rs | 26 ----------
core/src/raw/oio/write/append_object_write.rs | 11 -----
core/src/raw/oio/write/compose_write.rs | 15 ------
core/src/raw/oio/write/exact_buf_write.rs | 56 +--------------------
core/src/raw/oio/write/multipart_upload_write.rs | 16 ------
core/src/raw/oio/write/one_shot_write.rs | 5 --
core/src/services/azblob/backend.rs | 1 -
core/src/services/azblob/writer.rs | 19 -------
core/src/services/azdfs/writer.rs | 7 ---
core/src/services/cos/backend.rs | 1 -
core/src/services/dropbox/writer.rs | 7 ---
core/src/services/fs/backend.rs | 1 -
core/src/services/fs/writer.rs | 15 ------
core/src/services/ftp/writer.rs | 7 ---
core/src/services/gcs/backend.rs | 1 -
core/src/services/gcs/writer.rs | 6 ---
core/src/services/gdrive/writer.rs | 4 --
core/src/services/ghac/writer.rs | 7 ---
core/src/services/hdfs/writer.rs | 7 ---
core/src/services/ipmfs/writer.rs | 7 ---
core/src/services/obs/backend.rs | 1 -
core/src/services/onedrive/writer.rs | 7 ---
core/src/services/oss/backend.rs | 1 -
core/src/services/s3/backend.rs | 1 -
core/src/services/sftp/writer.rs | 7 ---
core/src/services/supabase/writer.rs | 7 ---
core/src/services/vercel_artifacts/writer.rs | 7 ---
core/src/services/wasabi/writer.rs | 7 ---
core/src/services/webdav/backend.rs | 1 -
core/src/services/webdav/writer.rs | 7 ---
core/src/services/webhdfs/writer.rs | 7 ---
core/src/types/capability.rs | 2 -
core/src/types/writer.rs | 63 +++++++++++++++---------
core/tests/behavior/write.rs | 19 +++----
50 files changed, 46 insertions(+), 532 deletions(-)
diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 0171b9abf..bde28481e 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -30,10 +30,6 @@ impl oio::Write for BlackHoleWriter {
Ok(bs.len() as u64)
}
- async fn copy_from(&mut self, size: u64, _: oio::Reader) ->
opendal::Result<u64> {
- Ok(size)
- }
-
async fn abort(&mut self) -> opendal::Result<()> {
Ok(())
}
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 63ff3aa0d..83bf12d6a 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -734,27 +734,6 @@ where
Ok(n)
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- if let Some(total_size) = self.size {
- if self.written + size > total_size {
- return Err(Error::new(
- ErrorKind::ContentTruncated,
- &format!(
- "writer got too much data, expect: {size}, actual: {}",
- self.written + size
- ),
- ));
- }
- }
-
- let w = self.inner.as_mut().ok_or_else(|| {
- Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
- })?;
- let n = w.copy_from(size, s).await?;
- self.written += n;
- Ok(n)
- }
-
async fn abort(&mut self) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index 22f5d4c52..99fa4e413 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -293,10 +293,6 @@ impl<R: oio::Write> oio::Write for
ConcurrentLimitWrapper<R> {
self.inner.abort().await
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.copy_from(size, s).await
- }
-
async fn close(&mut self) -> Result<()> {
self.inner.close().await
}
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 925f1d191..e325cd0f7 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -419,14 +419,6 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
})
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.copy_from(size, s).await.map_err(|err| {
- err.with_operation(WriteOperation::CopyFrom)
- .with_context("service", self.scheme)
- .with_context("path", &self.path)
- })
- }
-
async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
err.with_operation(WriteOperation::Close)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 1b0a6d6c0..1f6d7456a 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1285,39 +1285,6 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- match self.inner.copy_from(size, s).await {
- Ok(n) => {
- self.written += n;
- trace!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} written={} -> data sink
{}B",
- self.ctx.scheme,
- WriteOperation::CopyFrom,
- self.path,
- self.written,
- n
- );
- Ok(n)
- }
- Err(err) => {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} written={} -> data
sink failed: {}",
- self.ctx.scheme,
- WriteOperation::CopyFrom,
- self.path,
- self.written,
- self.ctx.error_print(&err),
- )
- }
- Err(err)
- }
- }
- }
-
async fn abort(&mut self) -> Result<()> {
match self.inner.abort().await {
Ok(_) => {
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 92f1925a8..432e4bcf4 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -318,13 +318,6 @@ impl oio::Write for MadsimWriter {
}
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) ->
crate::Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "will be supported in the future",
- ))
- }
-
async fn abort(&mut self) -> crate::Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index d4d65f839..36e4957aa 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -861,20 +861,6 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
})
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner
- .copy_from(size, s)
- .await
- .map(|n| {
- self.bytes += n;
- n
- })
- .map_err(|err| {
- self.handle.increment_errors_total(self.op, err.kind());
- err
- })
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 4daf0c056..149410ac8 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -347,16 +347,6 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
.await
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner
- .copy_from(size, s)
- .in_span(Span::enter_with_parent(
- WriteOperation::CopyFrom.into_static(),
- &self.span,
- ))
- .await
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner
.abort()
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index b574e7b98..0722a11a7 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -317,10 +317,6 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
self.inner.write(bs).await
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.copy_from(size, s).await
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 35ce5144c..0d8f99c22 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -679,23 +679,6 @@ impl<R: oio::Write> oio::Write for
PrometheusMetricWrapper<R> {
})
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner
- .copy_from(size, s)
- .await
- .map(|n| {
- self.stats
- .bytes_total
- .with_label_values(&[&self.scheme,
Operation::Write.into_static()])
- .observe(n as f64);
- n
- })
- .map_err(|err| {
- self.stats.increment_errors_total(self.op, err.kind());
- err
- })
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 9d3408866..3e207df9a 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -898,67 +898,6 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
}
}
- /// > Ooooooooooops, are you crazy!? Why we need to do `Arc<Mutex<S>>`
here? Adding a lock has
- /// a lot overhead!
- ///
- /// Yes, you are right. But we have no choice. This is the only safe way
for us to add retry
- /// support for stream.
- ///
- /// And the overhead is acceptable. Based on our benchmark, adding a lock
- /// that has no conflicts will only cost 5ns.
- ///
- /// ```shell
- /// stream/without_arc_mutex
- /// time: [10.715 ns 10.729 ns 10.744 ns]
- /// thrpt: [ 90896 GiB/s 91019 GiB/s 91139
GiB/s]
- /// stream/with_arc_mutex time: [14.891 ns 14.905 ns 14.928 ns]
- /// thrpt: [ 65418 GiB/s 65517 GiB/s 65581
GiB/s]
- /// ```
- ///
- /// The overhead is constant, which means the overhead will not increase
with the size of
- /// stream. For example, if every `next` call cost 1ms, then the overhead
will only take 0.005%
- /// which is acceptable.
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- let s = oio::into_cloneable_reader_within_tokio(s);
-
- let mut backoff = self.builder.build();
-
- loop {
- match self.inner.copy_from(size, Box::new(s.clone())).await {
- Ok(n) => return Ok(n),
- Err(e) if !e.is_temporary() => return Err(e),
- Err(e) => match backoff.next() {
- None => return Err(e),
- Some(dur) => {
- {
- use oio::ReadExt;
-
- let s = s.clone().into_inner();
- let mut stream = s.lock().await;
- // Try to reset this reader.
- //
- // If error happened, we will return the pipe
error directly and stop retry.
- if
stream.seek(io::SeekFrom::Start(0)).await.is_err() {
- return Err(e);
- }
- }
-
- self.notify.intercept(
- &e,
- dur,
- &[
- ("operation",
WriteOperation::CopyFrom.into_static()),
- ("path", &self.path),
- ],
- );
- tokio::time::sleep(dur).await;
- continue;
- }
- },
- }
- }
- }
-
async fn abort(&mut self) -> Result<()> {
let mut backoff = self.builder.build();
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 4edec592e..021d1cad9 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -241,10 +241,6 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
}
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.copy_from(size, s).await
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index e15cfeac9..68ac5a41d 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -335,19 +335,6 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
})?
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- let timeout = self.io_timeout(size);
-
- tokio::time::timeout(timeout, self.inner.copy_from(size, s))
- .await
- .map_err(|_| {
- Error::new(ErrorKind::Unexpected, "operation timeout")
- .with_operation(WriteOperation::CopyFrom)
- .with_context("timeout", timeout.as_secs_f64().to_string())
- .set_temporary()
- })?
- }
-
async fn abort(&mut self) -> Result<()> {
tokio::time::timeout(self.timeout, self.inner.abort())
.await
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index ca277fbd6..c5d006980 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -328,14 +328,6 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
self.inner.write(bs).await
}
- #[tracing::instrument(
- parent = &self.span,
- level = "trace",
- skip_all)]
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.copy_from(size, s).await
- }
-
#[tracing::instrument(
parent = &self.span,
level = "trace",
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index 38b7db998..c0823d59e 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -397,13 +397,6 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
Ok(size as u64)
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
diff --git a/core/src/raw/adapters/typed_kv/backend.rs
b/core/src/raw/adapters/typed_kv/backend.rs
index 07b919f79..948c2b5ad 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -410,13 +410,6 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
Ok(size as u64)
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
self.buf.clear();
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index 952c073ae..eaf3ba0de 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -21,7 +21,6 @@ use std::fmt::Formatter;
use async_trait::async_trait;
use bytes::Bytes;
-use crate::raw::*;
use crate::*;
/// WriteOperation is the name for APIs of Writer.
@@ -30,8 +29,6 @@ use crate::*;
pub enum WriteOperation {
/// Operation for [`Write::write`]
Write,
- /// Operation for [`Write::copy_from`]
- CopyFrom,
/// Operation for [`Write::abort`]
Abort,
/// Operation for [`Write::close`]
@@ -61,7 +58,6 @@ impl From<WriteOperation> for &'static str {
match v {
Write => "Writer::write",
- CopyFrom => "Writer::copy_from",
Abort => "Writer::abort",
Close => "Writer::close",
BlockingWrite => "BlockingWriter::write",
@@ -87,17 +83,6 @@ pub trait Write: Unpin + Send + Sync {
/// repeatedly until all bytes has been written.
async fn write(&mut self, bs: Bytes) -> Result<u64>;
- /// Copy from given reader into the writer.
- ///
- /// # Behavior
- ///
- /// - `Ok(n)` means `n` bytes has been written successfully.
- /// - `Err(err)` means error happens and no bytes has been written.
- ///
- /// It's possible that `n < size`, caller should pass the remaining bytes
- /// repeatedly until all bytes has been written.
- async fn copy_from(&mut self, size: u64, src: oio::Reader) -> Result<u64>;
-
/// Abort the pending writer.
async fn abort(&mut self) -> Result<()>;
@@ -113,13 +98,6 @@ impl Write for () {
unimplemented!("write is required to be implemented for oio::Write")
}
- async fn copy_from(&mut self, _: u64, _: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support sink",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
@@ -144,10 +122,6 @@ impl<T: Write + ?Sized> Write for Box<T> {
(**self).write(bs).await
}
- async fn copy_from(&mut self, n: u64, s: oio::Reader) -> Result<u64> {
- (**self).copy_from(n, s).await
- }
-
async fn abort(&mut self) -> Result<()> {
(**self).abort().await
}
diff --git a/core/src/raw/oio/write/append_object_write.rs
b/core/src/raw/oio/write/append_object_write.rs
index b7f1ff5c6..d6bb819df 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -91,17 +91,6 @@ where
Ok(size)
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- let offset = self.offset().await?;
-
- self.inner
- .append(offset, size, AsyncBody::Stream(Box::new(s)))
- .await
- .map(|_| self.offset = Some(offset + size))?;
-
- Ok(size)
- }
-
async fn close(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/raw/oio/write/compose_write.rs
b/core/src/raw/oio/write/compose_write.rs
index 1afb7735e..dac833fcf 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -63,13 +63,6 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for
TwoWaysWriter<ONE, TWO> {
}
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- match self {
- Self::One(one) => one.copy_from(size, s).await,
- Self::Two(two) => two.copy_from(size, s).await,
- }
- }
-
async fn abort(&mut self) -> Result<()> {
match self {
Self::One(one) => one.abort().await,
@@ -109,14 +102,6 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write>
oio::Write
}
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- match self {
- Self::One(one) => one.copy_from(size, s).await,
- Self::Two(two) => two.copy_from(size, s).await,
- Self::Three(three) => three.copy_from(size, s).await,
- }
- }
-
async fn abort(&mut self) -> Result<()> {
match self {
Self::One(one) => one.abort().await,
diff --git a/core/src/raw/oio/write/exact_buf_write.rs
b/core/src/raw/oio/write/exact_buf_write.rs
index a944f3365..7ba788e3d 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -18,10 +18,8 @@
use std::cmp::min;
use async_trait::async_trait;
-use bytes::{Buf, BufMut, Bytes, BytesMut};
-use tokio::io::ReadBuf;
+use bytes::{Buf, Bytes, BytesMut};
-use crate::raw::oio::ReadExt;
use crate::raw::*;
use crate::*;
@@ -104,50 +102,6 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
}
}
- async fn copy_from(&mut self, size: u64, mut s: oio::Reader) ->
Result<u64> {
- loop {
- match &mut self.buffer {
- Buffer::Empty => {
- self.buffer = Buffer::Filling(BytesMut::new());
- }
- Buffer::Filling(fill) => {
- if fill.len() >= self.buffer_size {
- self.buffer = Buffer::Consuming(fill.split().freeze());
- continue;
- }
-
- // Reserve to enough size.
- if size > fill.remaining_mut() as u64 {
- fill.reserve(self.buffer_size - fill.len());
- }
- let dst = fill.spare_capacity_mut();
- let dst_len = dst.len();
- let mut buf = ReadBuf::uninit(dst);
-
- // Safety: the input buffer is created
with_capacity(length).
- unsafe { buf.assume_init(dst_len) };
-
- let n = s.read(buf.initialize_unfilled()).await?;
-
- // Safety: read makes sure this buffer has been filled.
- unsafe { fill.advance_mut(n) };
-
- return Ok(n as u64);
- }
- Buffer::Consuming(consume) => {
- // Make sure filled buffer has been flushed.
- //
- // TODO: maybe we can re-fill it after a successful write.
- while !consume.is_empty() {
- let n = self.inner.write(consume.clone()).await?;
- consume.advance(n as usize);
- }
- self.buffer = Buffer::Filling(BytesMut::new());
- }
- }
- }
- }
-
async fn abort(&mut self) -> Result<()> {
self.buffer = Buffer::Empty;
self.inner.abort().await
@@ -181,7 +135,6 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
#[cfg(test)]
mod tests {
- use futures::AsyncReadExt;
use log::debug;
use pretty_assertions::assert_eq;
use rand::thread_rng;
@@ -206,13 +159,6 @@ mod tests {
Ok(bs.len() as u64)
}
- async fn copy_from(&mut self, size: u64, mut s: oio::Reader) ->
Result<u64> {
- let mut bs = vec![];
- s.read_to_end(&mut bs).await.unwrap();
- assert_eq!(bs.len() as u64, size);
- self.write(bs.into()).await
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs
b/core/src/raw/oio/write/multipart_upload_write.rs
index e39d59ca5..13d1b4aa2 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -138,22 +138,6 @@ where
Ok(size as u64)
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- let upload_id = self.upload_id().await?;
-
- self.inner
- .write_part(
- &upload_id,
- self.parts.len(),
- size,
- AsyncBody::Stream(Box::new(s)),
- )
- .await
- .map(|v| self.parts.push(v))?;
-
- Ok(size)
- }
-
async fn close(&mut self) -> Result<()> {
let upload_id = if let Some(upload_id) = &self.upload_id {
upload_id
diff --git a/core/src/raw/oio/write/one_shot_write.rs
b/core/src/raw/oio/write/one_shot_write.rs
index 65ae8a113..3681242ce 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -58,11 +58,6 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
Ok(size)
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.inner.write_once(size, Box::new(s)).await?;
- Ok(size)
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index fbd480de6..4b04d5c7e 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -530,7 +530,6 @@ impl Accessor for AzblobBackend {
write: true,
write_can_append: true,
- write_can_sink: true,
write_with_cache_control: true,
write_with_content_type: true,
diff --git a/core/src/services/azblob/writer.rs
b/core/src/services/azblob/writer.rs
index fbe509de9..b088aa57a 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -180,25 +180,6 @@ impl oio::Write for AzblobWriter {
Ok(size)
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- if self.op.append() {
- self.append_oneshot(size, AsyncBody::Stream(Box::new(s)))
- .await?;
- } else {
- if self.op.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
- self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
- .await?;
- }
-
- Ok(size)
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/azdfs/writer.rs
b/core/src/services/azdfs/writer.rs
index 11c822fb1..e56f9eca9 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -88,13 +88,6 @@ impl oio::Write for AzdfsWriter {
}
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index ba162fae9..46e3b3064 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -272,7 +272,6 @@ impl Accessor for CosBackend {
write: true,
write_can_append: true,
- write_can_sink: true,
write_with_content_type: true,
write_with_cache_control: true,
write_with_content_disposition: true,
diff --git a/core/src/services/dropbox/writer.rs
b/core/src/services/dropbox/writer.rs
index 7cbf1a4d7..00998c77d 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -62,13 +62,6 @@ impl oio::Write for DropboxWriter {
}
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index f94f59d96..950cfd76e 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -263,7 +263,6 @@ impl Accessor for FsBackend {
write: true,
write_can_append: true,
- write_can_sink: true,
write_without_content_length: true,
create_dir: true,
delete: true,
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 78b1c99cb..4a9a0471d 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -22,7 +22,6 @@ use std::path::PathBuf;
use async_trait::async_trait;
use bytes::Bytes;
-use futures::StreamExt;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;
@@ -67,20 +66,6 @@ impl oio::Write for FsWriter<tokio::fs::File> {
Ok(size)
}
- async fn copy_from(&mut self, size: u64, mut s: oio::Reader) ->
Result<u64> {
- while let Some(bs) = s.next().await {
- let bs = bs?;
- self.f
- .seek(SeekFrom::Start(self.pos))
- .await
- .map_err(parse_io_error)?;
- self.f.write_all(&bs).await.map_err(parse_io_error)?;
- self.pos += bs.len() as u64;
- }
-
- Ok(size)
- }
-
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index ffc08fbf1..3488255f9 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -55,13 +55,6 @@ impl oio::Write for FtpWriter {
Ok(size as u64)
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index dc760cb15..77549908e 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -360,7 +360,6 @@ impl Accessor for GcsBackend {
read_with_if_none_match: true,
write: true,
- write_can_sink: true,
write_with_content_type: true,
write_without_content_length: true,
delete: true,
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index e9152f66c..68e13b27f 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -161,12 +161,6 @@ impl oio::Write for GcsWriter {
}
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
- .await?;
- Ok(size)
- }
-
async fn abort(&mut self) -> Result<()> {
let location = if let Some(location) = &self.location {
location
diff --git a/core/src/services/gdrive/writer.rs
b/core/src/services/gdrive/writer.rs
index 174923c90..8d69bf8e2 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -106,10 +106,6 @@ impl oio::Write for GdriveWriter {
Ok(size)
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 5738f5d58..a7db6acab 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,13 +62,6 @@ impl oio::Write for GhacWriter {
}
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 3353059e6..90499e47c 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -58,13 +58,6 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
Ok(size as u64)
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
diff --git a/core/src/services/ipmfs/writer.rs
b/core/src/services/ipmfs/writer.rs
index 98afb6cbb..d3f98e77b 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -53,13 +53,6 @@ impl oio::Write for IpmfsWriter {
}
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 4c3176cce..cc14eedd0 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -279,7 +279,6 @@ impl Accessor for ObsBackend {
write: true,
write_can_append: true,
- write_can_sink: true,
write_with_content_type: true,
write_with_cache_control: true,
write_without_content_length: true,
diff --git a/core/src/services/onedrive/writer.rs
b/core/src/services/onedrive/writer.rs
index 8a0c84493..abb846871 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -58,13 +58,6 @@ impl oio::Write for OneDriveWriter {
Ok(size as u64)
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index d924e4cc9..212c42ce0 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -404,7 +404,6 @@ impl Accessor for OssBackend {
write: true,
write_can_append: true,
- write_can_sink: true,
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_disposition: true,
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 2c82cc03c..d2d8bb039 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -913,7 +913,6 @@ impl Accessor for S3Backend {
read_with_override_content_type: true,
write: true,
- write_can_sink: true,
write_with_cache_control: true,
write_with_content_type: true,
write_without_content_length: true,
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 1c8fc9ba9..2df725ab2 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -43,13 +43,6 @@ impl oio::Write for SftpWriter {
Ok(size)
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
diff --git a/core/src/services/supabase/writer.rs
b/core/src/services/supabase/writer.rs
index 406b9e5cd..5c57d7a95 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -77,13 +77,6 @@ impl oio::Write for SupabaseWriter {
Ok(size as u64)
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
diff --git a/core/src/services/vercel_artifacts/writer.rs
b/core/src/services/vercel_artifacts/writer.rs
index 68edbbfed..efb223df6 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -62,13 +62,6 @@ impl oio::Write for VercelArtifactsWriter {
}
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/wasabi/writer.rs
b/core/src/services/wasabi/writer.rs
index 3b358a15f..55d898ccc 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -65,13 +65,6 @@ impl oio::Write for WasabiWriter {
}
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/webdav/backend.rs
b/core/src/services/webdav/backend.rs
index 0010cd2c5..509d4c612 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -238,7 +238,6 @@ impl Accessor for WebdavBackend {
read_with_range: true,
write: true,
- write_can_sink: true,
create_dir: true,
delete: true,
diff --git a/core/src/services/webdav/writer.rs
b/core/src/services/webdav/writer.rs
index 084fc08a7..30b9827f2 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -70,13 +70,6 @@ impl oio::Write for WebdavWriter {
Ok(size)
}
- async fn copy_from(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
- self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
- .await?;
-
- Ok(size)
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index 2116a30e8..8dcbc9dc3 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -64,13 +64,6 @@ impl oio::Write for WebhdfsWriter {
}
}
- async fn copy_from(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs
index d8baab96c..6d3389c2b 100644
--- a/core/src/types/capability.rs
+++ b/core/src/types/capability.rs
@@ -78,8 +78,6 @@ pub struct Capability {
pub write: bool,
/// If operator supports write by append, it will be true.
pub write_can_append: bool,
- /// If operator supports write by sink a stream into, it will be true.
- pub write_can_sink: bool,
/// If operator supports write with without content length, it will
/// be true.
///
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 9173e4611..3ae7f6f4c 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -127,25 +127,35 @@ impl Writer {
/// .content_length(2 * 4096)
/// .await?;
/// let stream = stream::iter(vec![vec![0; 4096], vec![1;
4096]]).map(Ok);
- /// w.sink(2 * 4096, stream).await?;
+ /// w.sink(stream).await?;
/// w.close().await?;
/// Ok(())
/// }
/// ```
- pub async fn sink<S, T>(&mut self, size: u64, sink_from: S) -> Result<u64>
+ pub async fn sink<S, T>(&mut self, mut sink_from: S) -> Result<u64>
where
S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static,
T: Into<Bytes>,
{
- if let State::Idle(Some(w)) = &mut self.state {
- let r = Box::new(oio::into_read_from_stream(sink_from.map_ok(|v|
v.into())));
- w.copy_from(size, r).await
+ let w = if let State::Idle(Some(w)) = &mut self.state {
+ w
} else {
unreachable!(
"writer state invalid while sink, expect Idle, actual {}",
self.state
);
+ };
+
+ let mut written = 0;
+ while let Some(bs) = sink_from.try_next().await? {
+ let mut bs = bs.into();
+ while bs.has_remaining() {
+ let n = w.write(bs.clone()).await?;
+ bs.advance(n as usize);
+ written += n;
+ }
}
+ Ok(written)
}
/// Copy into writer.
@@ -173,27 +183,20 @@ impl Writer {
/// async fn copy_example(op: Operator) -> Result<()> {
/// let mut w =
op.writer_with("path/to/file").content_length(4096).await?;
/// let reader = Cursor::new(vec![0; 4096]);
- /// w.copy(4096, reader).await?;
+ /// w.copy(reader).await?;
/// w.close().await?;
/// Ok(())
/// }
/// ```
- pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<u64>
+ pub async fn copy<R>(&mut self, read_from: R) -> Result<u64>
where
- R: futures::AsyncRead + futures::AsyncSeek + Send + Sync + Unpin +
'static,
+ R: futures::AsyncRead + Send + Sync + Unpin + 'static,
{
- if let State::Idle(Some(w)) = &mut self.state {
- let r = Box::new(oio::into_streamable_read(
- oio::into_read_from_file(read_from, 0, size),
- 64 * 1024,
- ));
- w.copy_from(size, r).await
- } else {
- unreachable!(
- "writer state invalid while copy, expect Idle, actual {}",
- self.state
- );
- }
+ futures::io::copy(read_from, self).await.map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "copy into writer failed")
+ .with_operation("copy")
+ .set_source(err)
+ })
}
/// Abort the writer and clean up all written data.
@@ -271,7 +274,10 @@ impl AsyncWrite for Writer {
self.state = State::Idle(Some(w));
return Poll::Ready(Ok(size));
}
- Err(err) => return
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+ Err(err) => {
+ self.state = State::Idle(None);
+ return
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
+ }
},
State::Close(_) => {
unreachable!("invalid state of writer: poll_write with
State::Close")
@@ -306,7 +312,10 @@ impl AsyncWrite for Writer {
self.state = State::Idle(Some(w));
return Poll::Ready(Ok(()));
}
- Err(err) => return
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+ Err(err) => {
+ self.state = State::Idle(None);
+ return
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
+ }
},
}
}
@@ -337,7 +346,10 @@ impl tokio::io::AsyncWrite for Writer {
self.state = State::Idle(Some(w));
return Poll::Ready(Ok(size));
}
- Err(err) => return
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+ Err(err) => {
+ self.state = State::Idle(None);
+ return
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
+ }
},
State::Close(_) => {
unreachable!("invalid state of writer: poll_write with
State::Close")
@@ -371,7 +383,10 @@ impl tokio::io::AsyncWrite for Writer {
self.state = State::Idle(Some(w));
return Poll::Ready(Ok(()));
}
- Err(err) => return
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+ Err(err) => {
+ self.state = State::Idle(None);
+ return
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err)));
+ }
},
}
}
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index fe4166d78..10c9e8c1a 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -1147,7 +1147,7 @@ pub async fn test_writer_write(op: Operator) ->
Result<()> {
/// Streaming data into writer
pub async fn test_writer_sink(op: Operator) -> Result<()> {
let cap = op.info().full_capability();
- if !(cap.write && cap.write_can_sink) {
+ if !(cap.write && cap.write_without_content_length) {
return Ok(());
}
@@ -1157,11 +1157,8 @@ pub async fn test_writer_sink(op: Operator) ->
Result<()> {
let content_b = gen_fixed_bytes(size);
let stream = stream::iter(vec![content_a.clone(),
content_b.clone()]).map(Ok);
- let mut w = op
- .writer_with(&path)
- .content_length(2 * size as u64)
- .await?;
- w.sink(2 * size as u64, stream).await?;
+ let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?;
+ w.sink(stream).await?;
w.close().await?;
let meta = op.stat(&path).await.expect("stat must succeed");
@@ -1187,7 +1184,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()>
{
/// Reading data into writer
pub async fn test_writer_copy(op: Operator) -> Result<()> {
let cap = op.info().full_capability();
- if !(cap.write && cap.write_can_sink) {
+ if !(cap.write && cap.write_without_content_length) {
return Ok(());
}
@@ -1196,18 +1193,14 @@ pub async fn test_writer_copy(op: Operator) ->
Result<()> {
let content_a = gen_fixed_bytes(size);
let content_b = gen_fixed_bytes(size);
- let mut w = op
- .writer_with(&path)
- .content_length(2 * size as u64)
- .await?;
+ let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?;
let mut content = Bytes::from([content_a.clone(),
content_b.clone()].concat());
while !content.is_empty() {
let reader = Cursor::new(content.clone());
- let n = w.copy(2 * size as u64, reader).await?;
+ let n = w.copy(reader).await?;
content.advance(n as usize);
}
-
w.close().await?;
let meta = op.stat(&path).await.expect("stat must succeed");