This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 43d0327f4 refactor: Refactor oio::Write by accepting oio::Reader
instead (#3008)
43d0327f4 is described below
commit 43d0327f488f26169454cd124fe34253eeb2dfe2
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 6 11:30:25 2023 +0800
refactor: Refactor oio::Write by accepting oio::Reader instead (#3008)
* Rename to pipe
Signed-off-by: Xuanwo <[email protected]>
* Save work
Signed-off-by: Xuanwo <[email protected]>
* Save code
Signed-off-by: Xuanwo <[email protected]>
* Remove at least writer
Signed-off-by: Xuanwo <[email protected]>
* Fix test
Signed-off-by: Xuanwo <[email protected]>
* Fix gcs test
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/benches/oio/main.rs | 6 +-
core/benches/oio/utils.rs | 3 +-
core/benches/oio/write.rs | 35 +---
core/src/layers/complete.rs | 10 +-
core/src/layers/concurrent_limit.rs | 4 +-
core/src/layers/error_context.rs | 6 +-
core/src/layers/logging.rs | 8 +-
core/src/layers/madsim.rs | 2 +-
core/src/layers/metrics.rs | 4 +-
core/src/layers/minitrace.rs | 6 +-
core/src/layers/oteltrace.rs | 4 +-
core/src/layers/prometheus.rs | 4 +-
core/src/layers/retry.rs | 18 +-
core/src/layers/throttle.rs | 5 +-
core/src/layers/timeout.rs | 6 +-
core/src/layers/tracing.rs | 4 +-
core/src/raw/adapters/kv/backend.rs | 2 +-
core/src/raw/adapters/typed_kv/backend.rs | 2 +-
core/src/raw/oio/cursor.rs | 2 +-
core/src/raw/oio/read/cloneable_read.rs | 137 ++++++++++++++
core/src/raw/oio/read/into_read_from_file.rs | 4 +-
core/src/raw/oio/read/into_read_from_stream.rs | 83 +++++++++
core/src/raw/oio/read/mod.rs | 10 ++
core/src/raw/oio/stream/api.rs | 12 ++
core/src/raw/oio/write/api.rs | 14 +-
core/src/raw/oio/write/append_object_write.rs | 5 +-
core/src/raw/oio/write/at_least_buf_write.rs | 140 ---------------
core/src/raw/oio/write/compose_write.rs | 15 +-
core/src/raw/oio/write/exact_buf_write.rs | 218 +++++++++++------------
core/src/raw/oio/write/mod.rs | 3 -
core/src/raw/oio/write/multipart_upload_write.rs | 9 +-
core/src/raw/oio/write/one_shot_write.rs | 4 +-
core/src/services/azblob/writer.rs | 8 +-
core/src/services/azdfs/writer.rs | 2 +-
core/src/services/cos/backend.rs | 5 +-
core/src/services/dropbox/writer.rs | 2 +-
core/src/services/fs/writer.rs | 2 +-
core/src/services/ftp/writer.rs | 2 +-
core/src/services/gcs/writer.rs | 11 +-
core/src/services/gdrive/writer.rs | 2 +-
core/src/services/ghac/writer.rs | 2 +-
core/src/services/hdfs/writer.rs | 2 +-
core/src/services/ipmfs/writer.rs | 2 +-
core/src/services/obs/backend.rs | 5 +-
core/src/services/onedrive/writer.rs | 2 +-
core/src/services/oss/backend.rs | 5 +-
core/src/services/s3/backend.rs | 17 +-
core/src/services/sftp/writer.rs | 2 +-
core/src/services/supabase/writer.rs | 2 +-
core/src/services/vercel_artifacts/writer.rs | 2 +-
core/src/services/wasabi/writer.rs | 2 +-
core/src/services/webdav/writer.rs | 5 +-
core/src/services/webhdfs/writer.rs | 2 +-
core/src/types/writer.rs | 26 +--
core/tests/behavior/write.rs | 11 +-
55 files changed, 478 insertions(+), 428 deletions(-)
diff --git a/core/benches/oio/main.rs b/core/benches/oio/main.rs
index 982d29dfb..85ca2ebbe 100644
--- a/core/benches/oio/main.rs
+++ b/core/benches/oio/main.rs
@@ -21,9 +21,5 @@ mod write;
use criterion::criterion_group;
use criterion::criterion_main;
-criterion_group!(
- benches,
- write::bench_at_least_buf_write,
- write::bench_exact_buf_write,
-);
+criterion_group!(benches, write::bench_exact_buf_write,);
criterion_main!(benches);
diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 9a14442c2..53aa665ca 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -18,7 +18,6 @@
use async_trait::async_trait;
use bytes::Bytes;
use opendal::raw::oio;
-use opendal::raw::oio::Streamer;
use rand::prelude::ThreadRng;
use rand::RngCore;
@@ -31,7 +30,7 @@ impl oio::Write for BlackHoleWriter {
Ok(bs.len() as u64)
}
- async fn sink(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> {
+ async fn pipe(&mut self, size: u64, _: oio::Reader) ->
opendal::Result<u64> {
Ok(size)
}
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 6e26ce7e0..a227d4901 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use bytes::Buf;
use criterion::Criterion;
use once_cell::sync::Lazy;
-use opendal::raw::oio::AtLeastBufWriter;
use opendal::raw::oio::ExactBufWriter;
use opendal::raw::oio::Write;
use rand::thread_rng;
@@ -28,32 +28,6 @@ use super::utils::*;
pub static TOKIO: Lazy<tokio::runtime::Runtime> =
Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime"));
-pub fn bench_at_least_buf_write(c: &mut Criterion) {
- let mut group = c.benchmark_group("at_least_buf_write");
-
- let mut rng = thread_rng();
-
- for size in [
- Size::from_kibibytes(4),
- Size::from_kibibytes(256),
- Size::from_mebibytes(4),
- Size::from_mebibytes(16),
- ] {
- let content = gen_bytes(&mut rng, size.bytes() as usize);
-
- group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
- group.bench_with_input(size.to_string(), &content, |b, content| {
- b.to_async(&*TOKIO).iter(|| async {
- let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024);
- w.write(content.clone()).await.unwrap();
- w.close().await.unwrap();
- })
- });
- }
-
- group.finish()
-}
-
pub fn bench_exact_buf_write(c: &mut Criterion) {
let mut group = c.benchmark_group("exact_buf_write");
@@ -71,7 +45,12 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
group.bench_with_input(size.to_string(), &content, |b, content| {
b.to_async(&*TOKIO).iter(|| async {
let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024);
- w.write(content.clone()).await.unwrap();
+
+ let mut bs = content.clone();
+ while !bs.is_empty() {
+ let n = w.write(bs.clone()).await.unwrap();
+ bs.advance(n as usize);
+ }
w.close().await.unwrap();
})
});
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index e986d1a47..db470334c 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -729,12 +729,12 @@ where
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
- w.write(bs).await?;
- self.written += n as u64;
- Ok(n as u64)
+ let n = w.write(bs).await?;
+ self.written += n;
+ Ok(n)
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
if let Some(total_size) = self.size {
if self.written + size > total_size {
return Err(Error::new(
@@ -750,7 +750,7 @@ where
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
- let n = w.sink(size, s).await?;
+ let n = w.pipe(size, s).await?;
self.written += n;
Ok(n)
}
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index 96a682d61..c1504e6a2 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -293,8 +293,8 @@ impl<R: oio::Write> oio::Write for
ConcurrentLimitWrapper<R> {
self.inner.abort().await
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
- self.inner.sink(size, s).await
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.pipe(size, s).await
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 2acd6dd7d..33cebe92c 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -419,9 +419,9 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
})
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
- self.inner.sink(size, s).await.map_err(|err| {
- err.with_operation(WriteOperation::Sink)
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.pipe(size, s).await.map_err(|err| {
+ err.with_operation(WriteOperation::Pipe)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 6c63f466f..5ed80a28b 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1285,15 +1285,15 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
- match self.inner.sink(size, s).await {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ match self.inner.pipe(size, s).await {
Ok(n) => {
self.written += n;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data sink
{}B",
self.ctx.scheme,
- WriteOperation::Sink,
+ WriteOperation::Pipe,
self.path,
self.written,
n
@@ -1307,7 +1307,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={} -> data
sink failed: {}",
self.ctx.scheme,
- WriteOperation::Sink,
+ WriteOperation::Pipe,
self.path,
self.written,
self.ctx.error_print(&err),
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 17835e5ba..55741389d 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter {
}
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) ->
crate::Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> crate::Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 181ebb3c0..a96e83f8a 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -861,9 +861,9 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
})
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
self.inner
- .sink(size, s)
+ .pipe(size, s)
.await
.map(|n| {
self.bytes += n;
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 75c852c3a..f6487aa98 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -347,11 +347,11 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
.await
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
self.inner
- .sink(size, s)
+ .pipe(size, s)
.in_span(Span::enter_with_parent(
- WriteOperation::Sink.into_static(),
+ WriteOperation::Pipe.into_static(),
&self.span,
))
.await
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index fde87e9ba..9bd464be5 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -317,8 +317,8 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
self.inner.write(bs).await
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
- self.inner.sink(size, s).await
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.pipe(size, s).await
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 644532bf6..70aae731e 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -679,9 +679,9 @@ impl<R: oio::Write> oio::Write for
PrometheusMetricWrapper<R> {
})
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
self.inner
- .sink(size, s)
+ .pipe(size, s)
.await
.map(|n| {
self.stats
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 07b92e24c..d5c8a0738 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -34,7 +34,6 @@ use backon::Retryable;
use bytes::Bytes;
use futures::FutureExt;
use log::warn;
-use tokio::sync::Mutex;
use crate::raw::oio::PageOperation;
use crate::raw::oio::ReadOperation;
@@ -919,26 +918,27 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
/// The overhead is constant, which means the overhead will not increase
with the size of
/// stream. For example, if every `next` call cost 1ms, then the overhead
will only take 0.005%
/// which is acceptable.
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
- let s = Arc::new(Mutex::new(s));
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ let s = oio::into_cloneable_reader_within_tokio(s);
let mut backoff = self.builder.build();
loop {
- match self.inner.sink(size, Box::new(s.clone())).await {
+ match self.inner.pipe(size, Box::new(s.clone())).await {
Ok(n) => return Ok(n),
Err(e) if !e.is_temporary() => return Err(e),
Err(e) => match backoff.next() {
None => return Err(e),
Some(dur) => {
{
- use oio::StreamExt;
+ use oio::ReadExt;
+ let s = s.clone().into_inner();
let mut stream = s.lock().await;
- // Try to reset this stream.
+ // Try to reset this reader.
//
- // If error happened, we will return the sink
error directly and stop retry.
- if stream.reset().await.is_err() {
+ // If error happened, we will return the pipe
error directly and stop retry.
+ if
stream.seek(io::SeekFrom::Start(0)).await.is_err() {
return Err(e);
}
}
@@ -947,7 +947,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
&e,
dur,
&[
- ("operation",
WriteOperation::Sink.into_static()),
+ ("operation",
WriteOperation::Pipe.into_static()),
("path", &self.path),
],
);
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index a88d1c701..d821aacbf 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -33,7 +33,6 @@ use governor::NegativeMultiDecision;
use governor::Quota;
use governor::RateLimiter;
-use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
@@ -242,8 +241,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
}
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
- self.inner.sink(size, s).await
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.pipe(size, s).await
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index be2289d04..642b8a49b 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -335,14 +335,14 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
})?
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
let timeout = self.io_timeout(size);
- tokio::time::timeout(timeout, self.inner.sink(size, s))
+ tokio::time::timeout(timeout, self.inner.pipe(size, s))
.await
.map_err(|_| {
Error::new(ErrorKind::Unexpected, "operation timeout")
- .with_operation(WriteOperation::Sink)
+ .with_operation(WriteOperation::Pipe)
.with_context("timeout", timeout.as_secs_f64().to_string())
.set_temporary()
})?
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 33dcbdebc..467d0a153 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -332,8 +332,8 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
- self.inner.sink(size, s).await
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.pipe(size, s).await
}
#[tracing::instrument(
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index be4913ff9..ce9ad3005 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -397,7 +397,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/raw/adapters/typed_kv/backend.rs
b/core/src/raw/adapters/typed_kv/backend.rs
index 48232c1fc..494118e84 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -410,7 +410,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index c9b670ead..796b2a349 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -180,7 +180,7 @@ impl oio::Stream for Cursor {
/// ChunkedCursor is used represents a non-contiguous bytes in memory.
///
/// This is useful when we buffer users' random writes without copy.
ChunkedCursor implements
-/// [`oio::Stream`] so it can be used in [`oio::Write::sink`] directly.
+/// [`oio::Stream`] so it can be used in [`oio::Write::pipe`] directly.
///
/// # TODO
///
diff --git a/core/src/raw/oio/read/cloneable_read.rs
b/core/src/raw/oio/read/cloneable_read.rs
new file mode 100644
index 000000000..2ccd50042
--- /dev/null
+++ b/core/src/raw/oio/read/cloneable_read.rs
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use bytes::Bytes;
+use std::io::SeekFrom;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+/// Convert given reader into a wrapper with `std::sync::Mutex` for `Send +
Sync + Clone`.
+pub fn into_cloneable_reader_within_std<R>(reader: R) ->
CloneableReaderWithinStd<R> {
+ CloneableReaderWithinStd(Arc::new(std::sync::Mutex::new(reader)))
+}
+
+/// CloneableReaderWithinStd is a Send + Sync + Clone with `std::sync::Mutex`
wrapper of input
+/// reader.
+///
+/// Caller can clone this reader but only one thread can calling `oio::Read`
API at the
+/// same time, otherwise, we will return error if lock block happened.
+pub struct CloneableReaderWithinStd<R>(Arc<std::sync::Mutex<R>>);
+
+impl<R> CloneableReaderWithinStd<R> {
+ /// Consume self to get inner reader.
+ pub fn into_inner(self) -> Arc<std::sync::Mutex<R>> {
+ self.0
+ }
+}
+
+impl<R> Clone for CloneableReaderWithinStd<R> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+
+impl<R: oio::Read> oio::Read for CloneableReaderWithinStd<R> {
+ fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) ->
Poll<Result<usize>> {
+ match self.0.try_lock() {
+ Ok(mut this) => this.poll_read(cx, buf),
+ Err(_) => Poll::Ready(Err(Error::new(
+ ErrorKind::Unexpected,
+ "the cloneable reader is expected to have only one owner, but
it's not",
+ ))),
+ }
+ }
+
+ fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) ->
Poll<Result<u64>> {
+ match self.0.try_lock() {
+ Ok(mut this) => this.poll_seek(cx, pos),
+ Err(_) => Poll::Ready(Err(Error::new(
+ ErrorKind::Unexpected,
+ "the cloneable reader is expected to have only one owner, but
it's not",
+ ))),
+ }
+ }
+
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ match self.0.try_lock() {
+ Ok(mut this) => this.poll_next(cx),
+ Err(_) => Poll::Ready(Some(Err(Error::new(
+ ErrorKind::Unexpected,
+ "the cloneable reader is expected to have only one owner, but
it's not",
+ )))),
+ }
+ }
+}
+
+/// Convert given reader into a wrapper with `tokio::sync::Mutex` for `Send +
Sync + Clone`.
+pub fn into_cloneable_reader_within_tokio<R>(reader: R) ->
CloneableReaderWithinTokio<R> {
+ CloneableReaderWithinTokio(Arc::new(tokio::sync::Mutex::new(reader)))
+}
+
+/// CloneableReaderWithinTokio is a Send + Sync + Clone with
`tokio::sync::Mutex` wrapper of input
+/// reader.
+///
+/// Caller can clone this reader but only one thread can calling `oio::Read`
API at the
+/// same time, otherwise, we will return error if lock block happened.
+pub struct CloneableReaderWithinTokio<R>(Arc<tokio::sync::Mutex<R>>);
+
+impl<R> CloneableReaderWithinTokio<R> {
+ /// Consume self to get inner reader.
+ pub fn into_inner(self) -> Arc<tokio::sync::Mutex<R>> {
+ self.0
+ }
+}
+
+impl<R> Clone for CloneableReaderWithinTokio<R> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+
+impl<R: oio::Read> oio::Read for CloneableReaderWithinTokio<R> {
+ fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) ->
Poll<Result<usize>> {
+ match self.0.try_lock() {
+ Ok(mut this) => this.poll_read(cx, buf),
+ Err(_) => Poll::Ready(Err(Error::new(
+ ErrorKind::Unexpected,
+ "the cloneable reader is expected to have only one owner, but
it's not",
+ ))),
+ }
+ }
+
+ fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) ->
Poll<Result<u64>> {
+ match self.0.try_lock() {
+ Ok(mut this) => this.poll_seek(cx, pos),
+ Err(_) => Poll::Ready(Err(Error::new(
+ ErrorKind::Unexpected,
+ "the cloneable reader is expected to have only one owner, but
it's not",
+ ))),
+ }
+ }
+
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ match self.0.try_lock() {
+ Ok(mut this) => this.poll_next(cx),
+ Err(_) => Poll::Ready(Some(Err(Error::new(
+ ErrorKind::Unexpected,
+ "the cloneable reader is expected to have only one owner, but
it's not",
+ )))),
+ }
+ }
+}
diff --git a/core/src/raw/oio/read/into_read_from_file.rs
b/core/src/raw/oio/read/into_read_from_file.rs
index 96e80f515..f005ac737 100644
--- a/core/src/raw/oio/read/into_read_from_file.rs
+++ b/core/src/raw/oio/read/into_read_from_file.rs
@@ -41,7 +41,7 @@ pub fn into_read_from_file<R>(fd: R, start: u64, end: u64) ->
FromFileReader<R>
}
}
-/// FdReader is a wrapper of input fd to implement [`oio::Read`].
+/// FromFileReader is a wrapper of input fd to implement [`oio::Read`].
pub struct FromFileReader<R> {
inner: R,
@@ -126,7 +126,7 @@ where
Poll::Ready(Some(Err(Error::new(
ErrorKind::Unsupported,
- "output reader doesn't support seeking",
+ "output reader doesn't support next",
))))
}
}
diff --git a/core/src/raw/oio/read/into_read_from_stream.rs
b/core/src/raw/oio/read/into_read_from_stream.rs
new file mode 100644
index 000000000..0a231de2b
--- /dev/null
+++ b/core/src/raw/oio/read/into_read_from_stream.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use bytes::{Buf, Bytes};
+use futures::StreamExt;
+use std::io::SeekFrom;
+use std::task::{Context, Poll};
+
+/// Convert given stream `futures::Stream<Item = Result<Bytes>>` into
[`oio::Reader`].
+pub fn into_read_from_stream<S>(stream: S) -> FromStreamReader<S> {
+ FromStreamReader {
+ inner: stream,
+ buf: Bytes::new(),
+ }
+}
+
+/// FromStreamReader will convert a `futures::Stream<Item = Result<Bytes>>`
into `oio::Read`
+pub struct FromStreamReader<S> {
+ inner: S,
+ buf: Bytes,
+}
+
+impl<S, T> oio::Read for FromStreamReader<S>
+where
+ S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static,
+ T: Into<Bytes>,
+{
+ fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) ->
Poll<Result<usize>> {
+ if !self.buf.is_empty() {
+ let len = std::cmp::min(buf.len(), self.buf.len());
+ buf[..len].copy_from_slice(&self.buf[..len]);
+ self.buf.advance(len);
+ return Poll::Ready(Ok(len));
+ }
+
+ match futures::ready!(self.inner.poll_next_unpin(cx)) {
+ Some(Ok(bytes)) => {
+ let bytes = bytes.into();
+ let len = std::cmp::min(buf.len(), bytes.len());
+ buf[..len].copy_from_slice(&bytes[..len]);
+ self.buf = bytes.slice(len..);
+ Poll::Ready(Ok(len))
+ }
+ Some(Err(err)) => Poll::Ready(Err(err)),
+ None => Poll::Ready(Ok(0)),
+ }
+ }
+
+ fn poll_seek(&mut self, _: &mut Context<'_>, _: SeekFrom) ->
Poll<Result<u64>> {
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "FromStreamReader can't support operation",
+ )))
+ }
+
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ if !self.buf.is_empty() {
+ return Poll::Ready(Some(Ok(std::mem::take(&mut self.buf))));
+ }
+
+ match futures::ready!(self.inner.poll_next_unpin(cx)) {
+ Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.into()))),
+ Some(Err(err)) => Poll::Ready(Some(Err(err))),
+ None => Poll::Ready(None),
+ }
+ }
+}
diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs
index 64466bbdd..841dfdd1a 100644
--- a/core/src/raw/oio/read/mod.rs
+++ b/core/src/raw/oio/read/mod.rs
@@ -34,3 +34,13 @@ pub use into_seekable_read_by_range::ByRangeSeekableReader;
mod into_read_from_file;
pub use into_read_from_file::into_read_from_file;
pub use into_read_from_file::FromFileReader;
+
+mod into_read_from_stream;
+pub use into_read_from_stream::into_read_from_stream;
+pub use into_read_from_stream::FromStreamReader;
+
+mod cloneable_read;
+pub use cloneable_read::into_cloneable_reader_within_std;
+pub use cloneable_read::into_cloneable_reader_within_tokio;
+pub use cloneable_read::CloneableReaderWithinStd;
+pub use cloneable_read::CloneableReaderWithinTokio;
diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs
index 132fb78fc..9edfceb6a 100644
--- a/core/src/raw/oio/stream/api.rs
+++ b/core/src/raw/oio/stream/api.rs
@@ -69,6 +69,18 @@ impl<T: Stream + ?Sized> Stream for Box<T> {
}
}
+impl Stream for dyn raw::oio::Read {
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ raw::oio::Read::poll_next(self, cx)
+ }
+
+ fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let _ = raw::oio::Read::poll_seek(self, cx,
std::io::SeekFrom::Start(0))?;
+
+ Poll::Ready(Ok(()))
+ }
+}
+
impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> {
fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
match self.try_lock() {
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index 8ced843da..ba5a2b675 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -30,8 +30,8 @@ use crate::*;
pub enum WriteOperation {
/// Operation for [`Write::write`]
Write,
- /// Operation for [`Write::sink`]
- Sink,
+ /// Operation for [`Write::pipe`]
+ Pipe,
/// Operation for [`Write::abort`]
Abort,
/// Operation for [`Write::close`]
@@ -61,7 +61,7 @@ impl From<WriteOperation> for &'static str {
match v {
Write => "Writer::write",
- Sink => "Writer::sink",
+ Pipe => "Writer::pipe",
Abort => "Writer::abort",
Close => "Writer::close",
BlockingWrite => "BlockingWriter::write",
@@ -96,7 +96,7 @@ pub trait Write: Unpin + Send + Sync {
///
/// It's possible that `n < size`, caller should pass the remaining bytes
/// repeatedly until all bytes has been written.
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64>;
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64>;
/// Abort the pending writer.
async fn abort(&mut self) -> Result<()>;
@@ -113,7 +113,7 @@ impl Write for () {
unimplemented!("write is required to be implemented for oio::Write")
}
- async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _: u64, _: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"output writer doesn't support sink",
@@ -144,8 +144,8 @@ impl<T: Write + ?Sized> Write for Box<T> {
(**self).write(bs).await
}
- async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<u64> {
- (**self).sink(n, s).await
+ async fn pipe(&mut self, n: u64, s: oio::Reader) -> Result<u64> {
+ (**self).pipe(n, s).await
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/append_object_write.rs
b/core/src/raw/oio/write/append_object_write.rs
index b047ef43d..0e5fd9ed3 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -18,7 +18,6 @@
use async_trait::async_trait;
use bytes::Bytes;
-use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
@@ -92,11 +91,11 @@ where
Ok(size)
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
let offset = self.offset().await?;
self.inner
- .append(offset, size, AsyncBody::Stream(s))
+ .append(offset, size, AsyncBody::Stream(Box::new(s)))
.await
.map(|_| self.offset = Some(offset + size))?;
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs
b/core/src/raw/oio/write/at_least_buf_write.rs
deleted file mode 100644
index 51f5d2645..000000000
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ /dev/null
@@ -1,140 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use async_trait::async_trait;
-use bytes::Bytes;
-
-use crate::raw::oio::StreamExt;
-use crate::raw::oio::Streamer;
-use crate::raw::*;
-use crate::*;
-
-/// AtLeastBufWriter is used to implement [`oio::Write`] based on at least
buffer strategy: flush
-/// the underlying storage when the buffered size is larger.
-///
-/// AtLeastBufWriter makes sure that the size of the data written to the
underlying storage is at
-/// least `buffer_size` bytes. It's useful when the underlying storage has a
minimum size limit.
-///
-/// For example, S3 requires at least 5MiB for multipart uploads.
-pub struct AtLeastBufWriter<W: oio::Write> {
- inner: W,
-
- /// The total size of the data.
- ///
- /// If the total size is known, we will write to underlying storage
directly without buffer it
- /// when possible.
- total_size: Option<u64>,
-
- /// The size for buffer, we will flush the underlying storage if the
buffer is full.
- buffer_size: usize,
- buffer: oio::ChunkedCursor,
-}
-
-impl<W: oio::Write> AtLeastBufWriter<W> {
- /// Create a new at least buf writer.
- pub fn new(inner: W, buffer_size: usize) -> Self {
- Self {
- inner,
- total_size: None,
- buffer_size,
- buffer: oio::ChunkedCursor::new(),
- }
- }
-
- /// Configure the total size for writer.
- pub fn with_total_size(mut self, total_size: Option<u64>) -> Self {
- self.total_size = total_size;
- self
- }
-}
-
-#[async_trait]
-impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- // If total size is known and equals to given bytes, we can write it
directly.
- if let Some(total_size) = self.total_size {
- if total_size == bs.len() as u64 {
- return self.inner.write(bs).await;
- }
- }
-
- // Push the bytes into the buffer if the buffer is not full.
- if self.buffer.len() + bs.len() < self.buffer_size {
- let size = bs.len();
- self.buffer.push(bs);
- return Ok(size as u64);
- }
-
- let mut buf = self.buffer.clone();
- buf.push(bs);
-
- self.inner
- .sink(buf.len() as u64, Box::new(buf))
- .await
- // Clear buffer if the write is successful.
- .map(|v| {
- self.buffer.clear();
- v
- })
- }
-
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
- // If total size is known and equals to given stream, we can write it
directly.
- if let Some(total_size) = self.total_size {
- if total_size == size {
- return self.inner.sink(size, s).await;
- }
- }
-
- // Push the bytes into the buffer if the buffer is not full.
- if self.buffer.len() as u64 + size < self.buffer_size as u64 {
- let bs = s.collect().await?;
- let size = bs.len() as u64;
- self.buffer.push(bs);
- return Ok(size);
- }
-
- let buf = self.buffer.clone();
- let buffer_size = buf.len() as u64;
- let stream = buf.chain(s);
-
- self.inner
- .sink(buffer_size + size, Box::new(stream))
- .await
- // Clear buffer if the write is successful.
- .map(|v| {
- self.buffer.clear();
- v
- })
- }
-
- async fn abort(&mut self) -> Result<()> {
- self.buffer.clear();
- self.inner.abort().await
- }
-
- async fn close(&mut self) -> Result<()> {
- if !self.buffer.is_empty() {
- self.inner
- .sink(self.buffer.len() as u64, Box::new(self.buffer.clone()))
- .await?;
- self.buffer.clear();
- }
-
- self.inner.close().await
- }
-}
diff --git a/core/src/raw/oio/write/compose_write.rs
b/core/src/raw/oio/write/compose_write.rs
index 79ddfc5ed..05dfdd775 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -41,7 +41,6 @@
use async_trait::async_trait;
use bytes::Bytes;
-use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
@@ -64,10 +63,10 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for
TwoWaysWriter<ONE, TWO> {
}
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
match self {
- Self::One(one) => one.sink(size, s).await,
- Self::Two(two) => two.sink(size, s).await,
+ Self::One(one) => one.pipe(size, s).await,
+ Self::Two(two) => two.pipe(size, s).await,
}
}
@@ -110,11 +109,11 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write>
oio::Write
}
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
match self {
- Self::One(one) => one.sink(size, s).await,
- Self::Two(two) => two.sink(size, s).await,
- Self::Three(three) => three.sink(size, s).await,
+ Self::One(one) => one.pipe(size, s).await,
+ Self::Two(two) => two.pipe(size, s).await,
+ Self::Three(three) => three.pipe(size, s).await,
}
}
diff --git a/core/src/raw/oio/write/exact_buf_write.rs
b/core/src/raw/oio/write/exact_buf_write.rs
index 8e2d8a922..b2209ed73 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -18,10 +18,10 @@
use std::cmp::min;
use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use tokio::io::ReadBuf;
-use crate::raw::oio::StreamExt;
-use crate::raw::oio::Streamer;
+use crate::raw::oio::ReadExt;
use crate::raw::*;
use crate::*;
@@ -41,9 +41,7 @@ pub struct ExactBufWriter<W: oio::Write> {
/// The size for buffer, we will flush the underlying storage at the size
of this buffer.
buffer_size: usize,
- buffer: oio::ChunkedCursor,
-
- buffer_stream: Option<Streamer>,
+ buffer: Buffer,
}
impl<W: oio::Write> ExactBufWriter<W> {
@@ -52,133 +50,108 @@ impl<W: oio::Write> ExactBufWriter<W> {
Self {
inner,
buffer_size,
- buffer: oio::ChunkedCursor::new(),
- buffer_stream: None,
- }
- }
-
- /// Next bytes is used to fetch bytes from buffer or input streamer.
- ///
- /// We need this function because we need to make sure our write is
reentrant.
- /// We can't mutate state unless we are sure that the write is successful.
- async fn next_bytes(&mut self, s: &mut Streamer) -> Option<Result<Bytes>> {
- match self.buffer_stream.as_mut() {
- None => s.next().await,
- Some(bs) => match bs.next().await {
- None => {
- self.buffer_stream = None;
- s.next().await
- }
- Some(v) => Some(v),
- },
+ buffer: Buffer::Filling(BytesMut::new()),
}
}
+}
- fn chain_stream(&mut self, s: Streamer) {
- self.buffer_stream = match self.buffer_stream.take() {
- Some(stream) => Some(Box::new(stream.chain(s))),
- None => Some(s),
- }
- }
+enum Buffer {
+ Filling(BytesMut),
+ Consuming(Bytes),
}
#[async_trait]
impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
- .await
+ async fn write(&mut self, mut bs: Bytes) -> Result<u64> {
+ loop {
+ match &mut self.buffer {
+ Buffer::Filling(fill) => {
+ if fill.len() >= self.buffer_size {
+ self.buffer = Buffer::Consuming(fill.split().freeze());
+ continue;
+ }
+
+ let size = min(self.buffer_size - fill.len(), bs.len());
+ fill.extend_from_slice(&bs[..size]);
+ bs.advance(size);
+ return Ok(size as u64);
+ }
+ Buffer::Consuming(consume) => {
+ // Make sure filled buffer has been flushed.
+ //
+ // TODO: maybe we can re-fill it after a successful write.
+ while !consume.is_empty() {
+ let n = self.inner.write(consume.clone()).await?;
+ consume.advance(n as usize);
+ }
+ self.buffer = Buffer::Filling(BytesMut::new());
+ }
+ }
+ }
}
- /// # TODO
- ///
- /// We know every stream size, we can collect them into a buffer without
chain them every time.
- async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<u64> {
- if self.buffer.len() >= self.buffer_size {
- let mut buf = self.buffer.clone();
- let to_write = buf.split_to(self.buffer_size);
- return self
- .inner
- .sink(to_write.len() as u64, Box::new(to_write))
- .await
- // Replace buffer with remaining if the write is successful.
- .map(|v| {
- self.buffer = buf;
- self.chain_stream(s);
- v
- });
- }
+ async fn pipe(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
+ loop {
+ match &mut self.buffer {
+ Buffer::Filling(fill) => {
+ if fill.len() >= self.buffer_size {
+ self.buffer = Buffer::Consuming(fill.split().freeze());
+ continue;
+ }
- let mut buf = self.buffer.clone();
- while buf.len() < self.buffer_size {
- let bs = self.next_bytes(&mut s).await.transpose()?;
- match bs {
- None => break,
- Some(bs) => buf.push(bs),
- }
- }
+ // Reserve to buffer size.
+ fill.reserve(self.buffer_size - fill.len());
+ let dst = fill.spare_capacity_mut();
+ let dst_len = dst.len();
+ let mut buf = ReadBuf::uninit(dst);
- // Return directly if the buffer is not full.
- //
- // We don't need to chain stream here because it must be consumed.
- if buf.len() < self.buffer_size {
- let size = buf.len() as u64;
- self.buffer = buf;
- return Ok(size);
- }
+ // Safety: the input buffer is created
with_capacity(length).
+ unsafe { buf.assume_init(dst_len) };
- let to_write = buf.split_to(self.buffer_size);
- self.inner
- .sink(to_write.len() as u64, Box::new(to_write))
- .await
- // Replace buffer with remaining if the write is successful.
- .map(|v| {
- self.buffer = buf;
- self.chain_stream(s);
- v
- })
+ let n = s.read(buf.initialize_unfilled()).await?;
+
+ // Safety: read makes sure this buffer has been filled.
+ unsafe { fill.advance_mut(n) };
+
+ return Ok(n as u64);
+ }
+ Buffer::Consuming(consume) => {
+ // Make sure filled buffer has been flushed.
+ //
+ // TODO: maybe we can re-fill it after a successful write.
+ while !consume.is_empty() {
+ let n = self.inner.write(consume.clone()).await?;
+ consume.advance(n as usize);
+ }
+ self.buffer = Buffer::Filling(BytesMut::new());
+ }
+ }
+ }
}
async fn abort(&mut self) -> Result<()> {
- self.buffer.clear();
- self.buffer_stream = None;
-
+ self.buffer = Buffer::Filling(BytesMut::new());
self.inner.abort().await
}
async fn close(&mut self) -> Result<()> {
- while let Some(stream) = self.buffer_stream.as_mut() {
- let bs = stream.next().await.transpose()?;
- match bs {
- None => {
- self.buffer_stream = None;
+ loop {
+ match &mut self.buffer {
+ Buffer::Filling(fill) => {
+ self.buffer = Buffer::Consuming(fill.split().freeze());
+ continue;
}
- Some(bs) => {
- self.buffer.push(bs);
+ Buffer::Consuming(consume) => {
+ // Make sure filled buffer has been flushed.
+ //
+ // TODO: maybe we can re-fill it after a successful write.
+ while !consume.is_empty() {
+ let n = self.inner.write(consume.clone()).await?;
+ consume.advance(n as usize);
+ }
+ break;
}
}
-
- if self.buffer.len() >= self.buffer_size {
- let mut buf = self.buffer.clone();
- let to_write = buf.split_to(self.buffer_size);
- self.inner
- .sink(to_write.len() as u64, Box::new(to_write))
- .await
- // Replace buffer with remaining if the write is
successful.
- .map(|_| {
- self.buffer = buf;
- })?;
- }
- }
-
- while !self.buffer.is_empty() {
- let mut buf = self.buffer.clone();
- let to_write = buf.split_to(min(self.buffer_size, buf.len()));
-
- self.inner
- .sink(to_write.len() as u64, Box::new(to_write))
- .await
- // Replace buffer with remaining if the write is successful.
- .map(|_| self.buffer = buf)?;
}
self.inner.close().await
@@ -187,6 +160,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
#[cfg(test)]
mod tests {
+ use futures::AsyncReadExt;
use log::debug;
use pretty_assertions::assert_eq;
use rand::thread_rng;
@@ -196,7 +170,6 @@ mod tests {
use sha2::Sha256;
use super::*;
- use crate::raw::oio::StreamExt;
use crate::raw::oio::Write;
struct MockWriter {
@@ -212,10 +185,11 @@ mod tests {
Ok(bs.len() as u64)
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
- let bs = s.collect().await?;
+ async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64>
{
+ let mut bs = vec![];
+ s.read_to_end(&mut bs).await.unwrap();
assert_eq!(bs.len() as u64, size);
- self.write(bs).await
+ self.write(bs.into()).await
}
async fn abort(&mut self) -> Result<()> {
@@ -241,7 +215,12 @@ mod tests {
let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10);
- w.write(Bytes::from(expected.clone())).await?;
+ let mut bs = Bytes::from(expected.clone());
+ while !bs.is_empty() {
+ let n = w.write(bs.clone()).await?;
+ bs.advance(n as usize);
+ }
+
w.close().await?;
assert_eq!(w.inner.buf.len(), expected.len());
@@ -274,7 +253,12 @@ mod tests {
rng.fill_bytes(&mut content);
expected.extend_from_slice(&content);
- writer.write(Bytes::from(content)).await?;
+
+ let mut bs = Bytes::from(content.clone());
+ while !bs.is_empty() {
+ let n = writer.write(bs.clone()).await?;
+ bs.advance(n as usize);
+ }
}
writer.close().await?;
diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs
index d06bacb2c..dfaf6c4ee 100644
--- a/core/src/raw/oio/write/mod.rs
+++ b/core/src/raw/oio/write/mod.rs
@@ -39,8 +39,5 @@ mod one_shot_write;
pub use one_shot_write::OneShotWrite;
pub use one_shot_write::OneShotWriter;
-mod at_least_buf_write;
-pub use at_least_buf_write::AtLeastBufWriter;
-
mod exact_buf_write;
pub use exact_buf_write::ExactBufWriter;
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs
b/core/src/raw/oio/write/multipart_upload_write.rs
index 7bfd0342c..0b314c163 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -138,11 +138,16 @@ where
Ok(size as u64)
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
let upload_id = self.upload_id().await?;
self.inner
- .write_part(&upload_id, self.parts.len(), size,
AsyncBody::Stream(s))
+ .write_part(
+ &upload_id,
+ self.parts.len(),
+ size,
+ AsyncBody::Stream(Box::new(s)),
+ )
.await
.map(|v| self.parts.push(v))?;
diff --git a/core/src/raw/oio/write/one_shot_write.rs
b/core/src/raw/oio/write/one_shot_write.rs
index e6fe47616..a85feecf3 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -58,8 +58,8 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
Ok(size)
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
- self.inner.write_once(size, s).await?;
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.inner.write_once(size, Box::new(s)).await?;
Ok(size)
}
diff --git a/core/src/services/azblob/writer.rs
b/core/src/services/azblob/writer.rs
index a3b8abe30..11776755f 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -180,9 +180,10 @@ impl oio::Write for AzblobWriter {
Ok(size)
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
if self.op.append() {
- self.append_oneshot(size, AsyncBody::Stream(s)).await?;
+ self.append_oneshot(size, AsyncBody::Stream(Box::new(s)))
+ .await?;
} else {
if self.op.content_length().is_none() {
return Err(Error::new(
@@ -191,7 +192,8 @@ impl oio::Write for AzblobWriter {
));
}
- self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+ self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
+ .await?;
}
Ok(size)
diff --git a/core/src/services/azdfs/writer.rs
b/core/src/services/azdfs/writer.rs
index ff1125bfa..4b9b90f9f 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -88,7 +88,7 @@ impl oio::Write for AzdfsWriter {
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 3360d9dd5..ba162fae9 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -249,7 +249,7 @@ pub struct CosBackend {
impl Accessor for CosBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<CosWriters,
oio::AtLeastBufWriter<CosWriters>>;
+ type Writer = oio::TwoWaysWriter<CosWriters,
oio::ExactBufWriter<CosWriters>>;
type BlockingWriter = ();
type Pager = CosPager;
type BlockingPager = ();
@@ -348,8 +348,7 @@ impl Accessor for CosBackend {
let w = if let Some(buffer_size) = args.buffer_size() {
let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
- let w =
- oio::AtLeastBufWriter::new(w,
buffer_size).with_total_size(args.content_length());
+ let w = oio::ExactBufWriter::new(w, buffer_size);
oio::TwoWaysWriter::Two(w)
} else {
diff --git a/core/src/services/dropbox/writer.rs
b/core/src/services/dropbox/writer.rs
index 1b5b6b17d..37abab6a9 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for DropboxWriter {
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 9ca571077..ab27f2b65 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -67,7 +67,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
Ok(size)
}
- async fn sink(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> {
while let Some(bs) = s.next().await {
let bs = bs?;
self.f
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 18dd6fed9..3bb11582d 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -55,7 +55,7 @@ impl oio::Write for FtpWriter {
Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 8c4431302..2e95f0b79 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -138,11 +138,6 @@ impl oio::Write for GcsWriter {
}
};
- // Ignore empty bytes
- if bs.is_empty() {
- return Ok(0);
- }
-
self.buffer.push(bs);
// Return directly if the buffer is not full
if self.buffer.len() <= self.write_fixed_size {
@@ -150,7 +145,6 @@ impl oio::Write for GcsWriter {
}
let bs = self.buffer.peak_exact(self.write_fixed_size);
- let size = bs.len() as u64;
match self.write_part(location, bs).await {
Ok(_) => {
@@ -167,8 +161,9 @@ impl oio::Write for GcsWriter {
}
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
- self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
+ .await?;
Ok(size)
}
diff --git a/core/src/services/gdrive/writer.rs
b/core/src/services/gdrive/writer.rs
index b33137137..df974d940 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -106,7 +106,7 @@ impl oio::Write for GdriveWriter {
Ok(size)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
}
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 6bd4bf057..bf9116ec2 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for GhacWriter {
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 011c8352e..3b60a4df3 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/ipmfs/writer.rs
b/core/src/services/ipmfs/writer.rs
index 43a46e500..1434e980a 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for IpmfsWriter {
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 6600ddd9b..4c3176cce 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -256,7 +256,7 @@ pub struct ObsBackend {
impl Accessor for ObsBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<ObsWriters,
oio::AtLeastBufWriter<ObsWriters>>;
+ type Writer = oio::TwoWaysWriter<ObsWriters,
oio::ExactBufWriter<ObsWriters>>;
type BlockingWriter = ();
type Pager = ObsPager;
type BlockingPager = ();
@@ -386,8 +386,7 @@ impl Accessor for ObsBackend {
let w = if let Some(buffer_size) = args.buffer_size() {
let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
- let w =
- oio::AtLeastBufWriter::new(w,
buffer_size).with_total_size(args.content_length());
+ let w = oio::ExactBufWriter::new(w, buffer_size);
oio::TwoWaysWriter::Two(w)
} else {
diff --git a/core/src/services/onedrive/writer.rs
b/core/src/services/onedrive/writer.rs
index 1086f3fde..75a5e023a 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -58,7 +58,7 @@ impl oio::Write for OneDriveWriter {
Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index ee1131ee1..d924e4cc9 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -381,7 +381,7 @@ pub struct OssBackend {
impl Accessor for OssBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<OssWriters,
oio::AtLeastBufWriter<OssWriters>>;
+ type Writer = oio::TwoWaysWriter<OssWriters,
oio::ExactBufWriter<OssWriters>>;
type BlockingWriter = ();
type Pager = OssPager;
type BlockingPager = ();
@@ -484,8 +484,7 @@ impl Accessor for OssBackend {
let w = if let Some(buffer_size) = args.buffer_size() {
let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
- let w =
- oio::AtLeastBufWriter::new(w,
buffer_size).with_total_size(args.content_length());
+ let w = oio::ExactBufWriter::new(w, buffer_size);
oio::TwoWaysWriter::Two(w)
} else {
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 600fac3f3..2c82cc03c 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -888,11 +888,7 @@ pub struct S3Backend {
impl Accessor for S3Backend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::ThreeWaysWriter<
- S3Writers,
- oio::AtLeastBufWriter<S3Writers>,
- oio::ExactBufWriter<S3Writers>,
- >;
+ type Writer = oio::TwoWaysWriter<S3Writers,
oio::ExactBufWriter<S3Writers>>;
type BlockingWriter = ();
type Pager = S3Pager;
type BlockingPager = ();
@@ -991,16 +987,9 @@ impl Accessor for S3Backend {
let w = if let Some(buffer_size) = args.buffer_size() {
let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
- if self.core.enable_exact_buf_write {
- oio::ThreeWaysWriter::Three(oio::ExactBufWriter::new(w,
buffer_size))
- } else {
- oio::ThreeWaysWriter::Two(
- oio::AtLeastBufWriter::new(w, buffer_size)
- .with_total_size(args.content_length()),
- )
- }
+ oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
} else {
- oio::ThreeWaysWriter::One(w)
+ oio::TwoWaysWriter::One(w)
};
Ok((RpWrite::default(), w))
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 71ac41d7c..5ee5d84ac 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -43,7 +43,7 @@ impl oio::Write for SftpWriter {
Ok(size)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/supabase/writer.rs
b/core/src/services/supabase/writer.rs
index b786896c2..36a572bfe 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -77,7 +77,7 @@ impl oio::Write for SupabaseWriter {
Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/vercel_artifacts/writer.rs
b/core/src/services/vercel_artifacts/writer.rs
index 1db2d230f..936cddf01 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -62,7 +62,7 @@ impl oio::Write for VercelArtifactsWriter {
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/wasabi/writer.rs
b/core/src/services/wasabi/writer.rs
index 130e8e911..a39c835dc 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -65,7 +65,7 @@ impl oio::Write for WasabiWriter {
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/webdav/writer.rs
b/core/src/services/webdav/writer.rs
index 8dc093e65..bbe79eb31 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -70,8 +70,9 @@ impl oio::Write for WebdavWriter {
Ok(size)
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
- self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+ async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+ self.write_oneshot(size, AsyncBody::Stream(Box::new(s)))
+ .await?;
Ok(size)
}
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index 1b055f122..ae3396dc3 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -64,7 +64,7 @@ impl oio::Write for WebhdfsWriter {
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
+ async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index f4b93dfa6..9479b7c73 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -138,8 +138,8 @@ impl Writer {
T: Into<Bytes>,
{
if let State::Idle(Some(w)) = &mut self.state {
- let s = Box::new(oio::into_stream(sink_from.map_ok(|v| v.into())));
- w.sink(size, s).await
+ let r = Box::new(oio::into_read_from_stream(sink_from.map_ok(|v|
v.into())));
+ w.pipe(size, r).await
} else {
unreachable!(
"writer state invalid while sink, expect Idle, actual {}",
@@ -180,11 +180,14 @@ impl Writer {
/// ```
pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<u64>
where
- R: futures::AsyncRead + Send + Sync + Unpin + 'static,
+ R: futures::AsyncRead + futures::AsyncSeek + Send + Sync + Unpin +
'static,
{
if let State::Idle(Some(w)) = &mut self.state {
- let s = Box::new(oio::into_stream_from_reader(read_from));
- w.sink(size, s).await
+ let r = Box::new(oio::into_streamable_read(
+ oio::into_read_from_file(read_from, 0, size),
+ 64 * 1024,
+ ));
+ w.pipe(size, r).await
} else {
unreachable!(
"writer state invalid while copy, expect Idle, actual {}",
@@ -257,10 +260,9 @@ impl AsyncWrite for Writer {
.take()
.expect("invalid state of writer: Idle state with
empty write");
let bs = Bytes::from(buf.to_vec());
- let size = bs.len();
let fut = async move {
- w.write(bs).await?;
- Ok((size, w))
+ let n = w.write(bs).await?;
+ Ok((n as usize, w))
};
self.state = State::Write(Box::pin(fut));
}
@@ -324,10 +326,9 @@ impl tokio::io::AsyncWrite for Writer {
.take()
.expect("invalid state of writer: Idle state with
empty write");
let bs = Bytes::from(buf.to_vec());
- let size = bs.len();
let fut = async move {
- w.write(bs).await?;
- Ok((size, w))
+ let n = w.write(bs).await?;
+ Ok((n as usize, w))
};
self.state = State::Write(Box::pin(fut));
}
@@ -417,10 +418,9 @@ impl BlockingWriter {
impl io::Write for BlockingWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- let size = buf.len();
self.inner
.write(Bytes::from(buf.to_vec()))
- .map(|_| size)
+ .map(|n| n as usize)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
}
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 0ad042295..fe4166d78 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -19,6 +19,7 @@ use std::str::FromStr;
use std::time::Duration;
use anyhow::Result;
+use bytes::{Buf, Bytes};
use futures::io::BufReader;
use futures::io::Cursor;
use futures::stream;
@@ -1194,13 +1195,19 @@ pub async fn test_writer_copy(op: Operator) ->
Result<()> {
let size = 5 * 1024 * 1024; // write file with 5 MiB
let content_a = gen_fixed_bytes(size);
let content_b = gen_fixed_bytes(size);
- let reader = Cursor::new([content_a.clone(), content_b.clone()].concat());
let mut w = op
.writer_with(&path)
.content_length(2 * size as u64)
.await?;
- w.copy(2 * size as u64, reader).await?;
+
+ let mut content = Bytes::from([content_a.clone(),
content_b.clone()].concat());
+ while !content.is_empty() {
+ let reader = Cursor::new(content.clone());
+ let n = w.copy(2 * size as u64, reader).await?;
+ content.advance(n as usize);
+ }
+
w.close().await?;
let meta = op.stat(&path).await.expect("stat must succeed");