This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 2c94a2142 refactor(raw): Return written bytes in oio::Write (#3005)
2c94a2142 is described below
commit 2c94a2142b998cb9c86ddf9629582594340bb91e
Author: Xuanwo <[email protected]>
AuthorDate: Mon Sep 4 14:51:17 2023 +0800
refactor(raw): Return written bytes in oio::Write (#3005)
Signed-off-by: Xuanwo <[email protected]>
---
core/benches/oio/utils.rs | 8 ++---
core/src/layers/blocking.rs | 2 +-
core/src/layers/complete.rs | 16 ++++-----
core/src/layers/concurrent_limit.rs | 6 ++--
core/src/layers/error_context.rs | 6 ++--
core/src/layers/logging.rs | 29 ++++++++-------
core/src/layers/madsim.rs | 4 +--
core/src/layers/metrics.rs | 23 +++++++-----
core/src/layers/minitrace.rs | 6 ++--
core/src/layers/oteltrace.rs | 6 ++--
core/src/layers/prometheus.rs | 23 ++++++------
core/src/layers/retry.rs | 8 ++---
core/src/layers/throttle.rs | 6 ++--
core/src/layers/timeout.rs | 4 +--
core/src/layers/tracing.rs | 6 ++--
core/src/raw/adapters/kv/backend.rs | 12 ++++---
core/src/raw/adapters/typed_kv/backend.rs | 12 ++++---
core/src/raw/oio/write/api.rs | 46 ++++++++++++------------
core/src/raw/oio/write/append_object_write.rs | 12 ++++---
core/src/raw/oio/write/at_least_buf_write.rs | 23 ++++++++----
core/src/raw/oio/write/compose_write.rs | 8 ++---
core/src/raw/oio/write/exact_buf_write.rs | 19 +++++-----
core/src/raw/oio/write/multipart_upload_write.rs | 12 ++++---
core/src/raw/oio/write/one_shot_write.rs | 15 ++++----
core/src/services/azblob/writer.rs | 20 ++++++-----
core/src/services/azdfs/writer.rs | 8 +++--
core/src/services/dropbox/writer.rs | 10 +++---
core/src/services/fs/writer.rs | 16 +++++----
core/src/services/ftp/writer.rs | 8 +++--
core/src/services/gcs/writer.rs | 22 +++++++-----
core/src/services/gdrive/writer.rs | 11 +++---
core/src/services/ghac/writer.rs | 6 ++--
core/src/services/hdfs/writer.rs | 14 +++++---
core/src/services/ipmfs/writer.rs | 7 ++--
core/src/services/onedrive/writer.rs | 10 +++---
core/src/services/sftp/writer.rs | 7 ++--
core/src/services/supabase/writer.rs | 10 +++---
core/src/services/vercel_artifacts/writer.rs | 8 +++--
core/src/services/wasabi/writer.rs | 10 +++---
core/src/services/webdav/writer.rs | 15 +++++---
core/src/services/webhdfs/writer.rs | 10 +++---
core/src/types/writer.rs | 28 +++++++++++----
42 files changed, 310 insertions(+), 222 deletions(-)
diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 0e70bcfc7..9a14442c2 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -27,12 +27,12 @@ pub struct BlackHoleWriter;
#[async_trait]
impl oio::Write for BlackHoleWriter {
- async fn write(&mut self, _: Bytes) -> opendal::Result<()> {
- Ok(())
+ async fn write(&mut self, bs: Bytes) -> opendal::Result<u64> {
+ Ok(bs.len() as u64)
}
- async fn sink(&mut self, _: u64, _: Streamer) -> opendal::Result<()> {
- Ok(())
+ async fn sink(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> {
+ Ok(size)
}
async fn abort(&mut self) -> opendal::Result<()> {
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 7b80b5956..6e530023c 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -196,7 +196,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for
BlockingWrapper<I> {
}
impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
self.handle.block_on(self.inner.write(bs))
}
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 4c58b6e30..e986d1a47 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -711,7 +711,7 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
let n = bs.len();
if let Some(size) = self.size {
@@ -731,10 +731,10 @@ where
})?;
w.write(bs).await?;
self.written += n as u64;
- Ok(())
+ Ok(n as u64)
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
if let Some(total_size) = self.size {
if self.written + size > total_size {
return Err(Error::new(
@@ -750,9 +750,9 @@ where
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
- w.sink(size, s).await?;
- self.written += size;
- Ok(())
+ let n = w.sink(size, s).await?;
+ self.written += n;
+ Ok(n)
}
async fn abort(&mut self) -> Result<()> {
@@ -794,7 +794,7 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
where
W: oio::BlockingWrite,
{
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
let n = bs.len();
if let Some(size) = self.size {
@@ -815,7 +815,7 @@ where
w.write(bs)?;
self.written += n as u64;
- Ok(())
+ Ok(n as u64)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index 9cef0fb9b..96a682d61 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -285,7 +285,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
ConcurrentLimitWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs).await
}
@@ -293,7 +293,7 @@ impl<R: oio::Write> oio::Write for
ConcurrentLimitWrapper<R> {
self.inner.abort().await
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner.sink(size, s).await
}
@@ -303,7 +303,7 @@ impl<R: oio::Write> oio::Write for
ConcurrentLimitWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs)
}
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index bfe9be4df..2acd6dd7d 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for
ErrorContextWrapper<T> {
#[async_trait::async_trait]
impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs).await.map_err(|err| {
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
@@ -419,7 +419,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
})
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner.sink(size, s).await.map_err(|err| {
err.with_operation(WriteOperation::Sink)
.with_context("service", self.scheme)
@@ -437,7 +437,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
}
impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs).map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
.with_context("service", self.scheme)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 07323cf8c..6c63f466f 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1252,11 +1252,10 @@ impl<W> LoggingWriter<W> {
#[async_trait]
impl<W: oio::Write> oio::Write for LoggingWriter<W> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
match self.inner.write(bs).await {
- Ok(_) => {
- self.written += size as u64;
+ Ok(n) => {
+ self.written += n;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data write
{}B",
@@ -1264,9 +1263,9 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
WriteOperation::Write,
self.path,
self.written,
- size
+ n
);
- Ok(())
+ Ok(n)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1286,10 +1285,10 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
match self.inner.sink(size, s).await {
- Ok(_) => {
- self.written += size;
+ Ok(n) => {
+ self.written += n;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data sink
{}B",
@@ -1297,9 +1296,9 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
WriteOperation::Sink,
self.path,
self.written,
- size
+ n
);
- Ok(())
+ Ok(n)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1383,11 +1382,11 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}
impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
let size = bs.len();
match self.inner.write(bs) {
- Ok(_) => {
- self.written += size as u64;
+ Ok(n) => {
+ self.written += n;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data write
{}B",
@@ -1397,7 +1396,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for
LoggingWriter<W> {
self.written,
size
);
- Ok(())
+ Ok(n)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index fdf0ec5de..17835e5ba 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -302,7 +302,7 @@ pub struct MadsimWriter {
#[async_trait]
impl oio::Write for MadsimWriter {
- async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
+ async fn write(&mut self, bs: Bytes) -> crate::Result<u64> {
#[cfg(madsim)]
{
let req = Request::Write(self.path.to_string(), bs);
@@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter {
}
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()>
{
+ async fn sink(&mut self, size: u64, s: oio::Streamer) ->
crate::Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 1eade833b..181ebb3c0 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -847,23 +847,28 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
MetricWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for MetricWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner
.write(bs)
.await
- .map(|_| self.bytes += size as u64)
+ .map(|n| {
+ self.bytes += n;
+ n
+ })
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
err
})
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner
.sink(size, s)
.await
- .map(|_| self.bytes += size)
+ .map(|n| {
+ self.bytes += n;
+ n
+ })
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
err
@@ -886,11 +891,13 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner
.write(bs)
- .map(|_| self.bytes += size as u64)
+ .map(|n| {
+ self.bytes += n;
+ n
+ })
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
err
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 1213d692e..75c852c3a 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
MinitraceWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner
.write(bs)
.in_span(Span::enter_with_parent(
@@ -347,7 +347,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
.await
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner
.sink(size, s)
.in_span(Span::enter_with_parent(
@@ -379,7 +379,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
let _g = self.span.set_local_parent();
let _span =
LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
self.inner.write(bs)
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 2ae39b05c..fde87e9ba 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -313,11 +313,11 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
OtelTraceWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs).await
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner.sink(size, s).await
}
@@ -331,7 +331,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs)
}
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 005d6aa97..644532bf6 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -662,16 +662,16 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
PrometheusMetricWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner
.write(bs)
.await
- .map(|_| {
+ .map(|n| {
self.stats
.bytes_total
.with_label_values(&[&self.scheme,
Operation::Write.into_static()])
- .observe(size as f64)
+ .observe(n as f64);
+ n
})
.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
@@ -679,15 +679,16 @@ impl<R: oio::Write> oio::Write for
PrometheusMetricWrapper<R> {
})
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner
.sink(size, s)
.await
- .map(|_| {
+ .map(|n| {
self.stats
.bytes_total
.with_label_values(&[&self.scheme,
Operation::Write.into_static()])
- .observe(size as f64)
+ .observe(n as f64);
+ n
})
.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
@@ -711,15 +712,15 @@ impl<R: oio::Write> oio::Write for
PrometheusMetricWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner
.write(bs)
- .map(|_| {
+ .map(|n| {
self.stats
.bytes_total
.with_label_values(&[&self.scheme,
Operation::BlockingWrite.into_static()])
- .observe(size as f64)
+ .observe(n as f64);
+ n
})
.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 7517c2c21..07b92e24c 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -873,7 +873,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor>
oio::BlockingRead for RetryWrapp
#[async_trait]
impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
let mut backoff = self.builder.build();
loop {
@@ -919,14 +919,14 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
/// The overhead is constant, which means the overhead will not increase
with the size of
/// stream. For example, if every `next` call cost 1ms, then the overhead
will only take 0.005%
/// which is acceptable.
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
let s = Arc::new(Mutex::new(s));
let mut backoff = self.builder.build();
loop {
match self.inner.sink(size, Box::new(s.clone())).await {
- Ok(_) => return Ok(()),
+ Ok(n) => return Ok(n),
Err(e) if !e.is_temporary() => return Err(e),
Err(e) => match backoff.next() {
None => return Err(e),
@@ -1013,7 +1013,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
}
impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for
RetryWrapper<R, I> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
{ || self.inner.write(bs.clone()) }
.retry(&self.builder)
.when(|e| e.is_temporary())
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index d929226df..a88d1c701 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -217,7 +217,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
ThrottleWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
loop {
@@ -242,7 +242,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
}
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
self.inner.sink(size, s).await
}
@@ -256,7 +256,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
loop {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index c0fb739aa..be2289d04 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -322,7 +322,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
let timeout = self.io_timeout(bs.len() as u64);
tokio::time::timeout(timeout, self.inner.write(bs))
@@ -335,7 +335,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
})?
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
let timeout = self.io_timeout(size);
tokio::time::timeout(timeout, self.inner.sink(size, s))
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 9042e6e35..33dcbdebc 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -324,7 +324,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs).await
}
@@ -332,7 +332,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
self.inner.sink(size, s).await
}
@@ -358,7 +358,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
self.inner.write(bs)
}
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index d10971104..be4913ff9 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -390,13 +390,14 @@ impl<S> KvWriter<S> {
#[async_trait]
impl<S: Adapter> oio::Write for KvWriter<S> {
// TODO: we need to support append in the future.
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
self.buf = Some(bs.into());
- Ok(())
+ Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
@@ -420,10 +421,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
self.buf = Some(bs.into());
- Ok(())
+ Ok(size as u64)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs
b/core/src/raw/adapters/typed_kv/backend.rs
index 9f6186a38..48232c1fc 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -403,13 +403,14 @@ impl<S> KvWriter<S> {
#[async_trait]
impl<S: Adapter> oio::Write for KvWriter<S> {
// TODO: we need to support append in the future.
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
self.buf.push(bs);
- Ok(())
+ Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
@@ -429,10 +430,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
self.buf.push(bs);
- Ok(())
+ Ok(size as u64)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index f2bb025af..8ced843da 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -74,31 +74,29 @@ impl From<WriteOperation> for &'static str {
pub type Writer = Box<dyn Write>;
/// Write is the trait that OpenDAL returns to callers.
-///
-/// # Notes
-///
-/// There are two possible two cases:
-///
-/// - Sized: The total size of the object is known in advance.
-/// - Unsized: The total size of the object is unknown in advance.
-///
-/// And it's possible that the given bs length is less than the total
-/// content length. Users will call write multiple times to write
-/// the whole data.
#[async_trait]
pub trait Write: Unpin + Send + Sync {
/// Write given bytes into writer.
///
- /// # Notes
+ /// # Behavior
///
- /// It's possible that the given bs length is less than the total
- /// content length. And users will call write multiple times.
+ /// - `Ok(n)` means `n` bytes has been written successfully.
+ /// - `Err(err)` means error happens and no bytes has been written.
///
- /// Please make sure `write` is safe to re-enter.
- async fn write(&mut self, bs: Bytes) -> Result<()>;
+ /// It's possible that `n < bs.len()`, caller should pass the remaining
bytes
+ /// repeatedly until all bytes has been written.
+ async fn write(&mut self, bs: Bytes) -> Result<u64>;
/// Sink given stream into writer.
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>;
+ ///
+ /// # Behavior
+ ///
+ /// - `Ok(n)` means `n` bytes has been written successfully.
+ /// - `Err(err)` means error happens and no bytes has been written.
+ ///
+ /// It's possible that `n < size`, caller should pass the remaining bytes
+ /// repeatedly until all bytes has been written.
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64>;
/// Abort the pending writer.
async fn abort(&mut self) -> Result<()>;
@@ -109,13 +107,13 @@ pub trait Write: Unpin + Send + Sync {
#[async_trait]
impl Write for () {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
let _ = bs;
unimplemented!("write is required to be implemented for oio::Write")
}
- async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"output writer doesn't support sink",
@@ -142,11 +140,11 @@ impl Write for () {
/// To make Writer work as expected, we must add this impl.
#[async_trait]
impl<T: Write + ?Sized> Write for Box<T> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
(**self).write(bs).await
}
- async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<u64> {
(**self).sink(n, s).await
}
@@ -165,14 +163,14 @@ pub type BlockingWriter = Box<dyn BlockingWrite>;
/// BlockingWrite is the trait that OpenDAL returns to callers.
pub trait BlockingWrite: Send + Sync + 'static {
/// Write whole content at once.
- fn write(&mut self, bs: Bytes) -> Result<()>;
+ fn write(&mut self, bs: Bytes) -> Result<u64>;
/// Close the writer and make sure all data has been flushed.
fn close(&mut self) -> Result<()>;
}
impl BlockingWrite for () {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
let _ = bs;
unimplemented!("write is required to be implemented for
oio::BlockingWrite")
@@ -190,7 +188,7 @@ impl BlockingWrite for () {
///
/// To make BlockingWriter work as expected, we must add this impl.
impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
(**self).write(bs)
}
diff --git a/core/src/raw/oio/write/append_object_write.rs
b/core/src/raw/oio/write/append_object_write.rs
index 07fa546cc..b047ef43d 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -79,7 +79,7 @@ impl<W> oio::Write for AppendObjectWriter<W>
where
W: AppendObjectWrite,
{
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
let offset = self.offset().await?;
let size = bs.len() as u64;
@@ -87,16 +87,20 @@ where
self.inner
.append(offset, size, AsyncBody::Bytes(bs))
.await
- .map(|_| self.offset = Some(offset + size))
+ .map(|_| self.offset = Some(offset + size))?;
+
+ Ok(size)
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
let offset = self.offset().await?;
self.inner
.append(offset, size, AsyncBody::Stream(s))
.await
- .map(|_| self.offset = Some(offset + size))
+ .map(|_| self.offset = Some(offset + size))?;
+
+ Ok(size)
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs
b/core/src/raw/oio/write/at_least_buf_write.rs
index 91adddd30..51f5d2645 100644
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -64,7 +64,7 @@ impl<W: oio::Write> AtLeastBufWriter<W> {
#[async_trait]
impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
// If total size is known and equals to given bytes, we can write it
directly.
if let Some(total_size) = self.total_size {
if total_size == bs.len() as u64 {
@@ -74,8 +74,9 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
// Push the bytes into the buffer if the buffer is not full.
if self.buffer.len() + bs.len() < self.buffer_size {
+ let size = bs.len();
self.buffer.push(bs);
- return Ok(());
+ return Ok(size as u64);
}
let mut buf = self.buffer.clone();
@@ -85,10 +86,13 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
.sink(buf.len() as u64, Box::new(buf))
.await
// Clear buffer if the write is successful.
- .map(|_| self.buffer.clear())
+ .map(|v| {
+ self.buffer.clear();
+ v
+ })
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
// If total size is known and equals to given stream, we can write it
directly.
if let Some(total_size) = self.total_size {
if total_size == size {
@@ -98,8 +102,10 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
// Push the bytes into the buffer if the buffer is not full.
if self.buffer.len() as u64 + size < self.buffer_size as u64 {
- self.buffer.push(s.collect().await?);
- return Ok(());
+ let bs = s.collect().await?;
+ let size = bs.len() as u64;
+ self.buffer.push(bs);
+ return Ok(size);
}
let buf = self.buffer.clone();
@@ -110,7 +116,10 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
.sink(buffer_size + size, Box::new(stream))
.await
// Clear buffer if the write is successful.
- .map(|_| self.buffer.clear())
+ .map(|v| {
+ self.buffer.clear();
+ v
+ })
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/compose_write.rs
b/core/src/raw/oio/write/compose_write.rs
index 043df2978..79ddfc5ed 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -57,14 +57,14 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
#[async_trait]
impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
match self {
Self::One(one) => one.write(bs).await,
Self::Two(two) => two.write(bs).await,
}
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
match self {
Self::One(one) => one.sink(size, s).await,
Self::Two(two) => two.sink(size, s).await,
@@ -102,7 +102,7 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write,
THREE: oio::Write> {
impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
for ThreeWaysWriter<ONE, TWO, THREE>
{
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
match self {
Self::One(one) => one.write(bs).await,
Self::Two(two) => two.write(bs).await,
@@ -110,7 +110,7 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write>
oio::Write
}
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
match self {
Self::One(one) => one.sink(size, s).await,
Self::Two(two) => two.sink(size, s).await,
diff --git a/core/src/raw/oio/write/exact_buf_write.rs
b/core/src/raw/oio/write/exact_buf_write.rs
index 8561c1a4a..8e2d8a922 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -84,7 +84,7 @@ impl<W: oio::Write> ExactBufWriter<W> {
#[async_trait]
impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
.await
}
@@ -92,7 +92,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
/// # TODO
///
/// We know every stream size, we can collect them into a buffer without
chain them every time.
- async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<()> {
+ async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<u64> {
if self.buffer.len() >= self.buffer_size {
let mut buf = self.buffer.clone();
let to_write = buf.split_to(self.buffer_size);
@@ -101,9 +101,10 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
.sink(to_write.len() as u64, Box::new(to_write))
.await
// Replace buffer with remaining if the write is successful.
- .map(|_| {
+ .map(|v| {
self.buffer = buf;
self.chain_stream(s);
+ v
});
}
@@ -120,8 +121,9 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
//
// We don't need to chain stream here because it must be consumed.
if buf.len() < self.buffer_size {
+ let size = buf.len() as u64;
self.buffer = buf;
- return Ok(());
+ return Ok(size);
}
let to_write = buf.split_to(self.buffer_size);
@@ -129,9 +131,10 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
.sink(to_write.len() as u64, Box::new(to_write))
.await
// Replace buffer with remaining if the write is successful.
- .map(|_| {
+ .map(|v| {
self.buffer = buf;
self.chain_stream(s);
+ v
})
}
@@ -202,14 +205,14 @@ mod tests {
#[async_trait]
impl Write for MockWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len());
self.buf.extend_from_slice(&bs);
- Ok(())
+ Ok(bs.len() as u64)
}
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> {
let bs = s.collect().await?;
assert_eq!(bs.len() as u64, size);
self.write(bs).await
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs
b/core/src/raw/oio/write/multipart_upload_write.rs
index c013b24c3..7bfd0342c 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -120,7 +120,7 @@ impl<W> oio::Write for MultipartUploadWriter<W>
where
W: MultipartUploadWrite,
{
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
let upload_id = self.upload_id().await?;
let size = bs.len();
@@ -133,16 +133,20 @@ where
AsyncBody::Bytes(bs),
)
.await
- .map(|v| self.parts.push(v))
+ .map(|v| self.parts.push(v))?;
+
+ Ok(size as u64)
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
let upload_id = self.upload_id().await?;
self.inner
.write_part(&upload_id, self.parts.len(), size,
AsyncBody::Stream(s))
.await
- .map(|v| self.parts.push(v))
+ .map(|v| self.parts.push(v))?;
+
+ Ok(size)
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/one_shot_write.rs
b/core/src/raw/oio/write/one_shot_write.rs
index 35b1883bf..e6fe47616 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -49,15 +49,18 @@ impl<W: OneShotWrite> OneShotWriter<W> {
#[async_trait]
impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
let cursor = oio::Cursor::from(bs);
- self.inner
- .write_once(cursor.len() as u64, Box::new(cursor))
- .await
+
+ let size = cursor.len() as u64;
+ self.inner.write_once(size, Box::new(cursor)).await?;
+
+ Ok(size)
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
- self.inner.write_once(size, s).await
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+ self.inner.write_once(size, s).await?;
+ Ok(size)
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/azblob/writer.rs
b/core/src/services/azblob/writer.rs
index 31a56f27a..a3b8abe30 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -161,10 +161,11 @@ impl AzblobWriter {
#[async_trait]
impl oio::Write for AzblobWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len() as u64;
+
if self.op.append() {
- self.append_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
- .await
+ self.append_oneshot(size, AsyncBody::Bytes(bs)).await?;
} else {
if self.op.content_length().is_none() {
return Err(Error::new(
@@ -173,14 +174,15 @@ impl oio::Write for AzblobWriter {
));
}
- self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
- .await
+ self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
}
+
+ Ok(size)
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
if self.op.append() {
- self.append_oneshot(size, AsyncBody::Stream(s)).await
+ self.append_oneshot(size, AsyncBody::Stream(s)).await?;
} else {
if self.op.content_length().is_none() {
return Err(Error::new(
@@ -189,8 +191,10 @@ impl oio::Write for AzblobWriter {
));
}
- self.write_oneshot(size, AsyncBody::Stream(s)).await
+ self.write_oneshot(size, AsyncBody::Stream(s)).await?;
}
+
+ Ok(size)
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/azdfs/writer.rs
b/core/src/services/azdfs/writer.rs
index 3c8db1ac1..ff1125bfa 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -41,7 +41,9 @@ impl AzdfsWriter {
#[async_trait]
impl oio::Write for AzdfsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len() as u64;
+
let mut req = self.core.azdfs_create_request(
&self.path,
"file",
@@ -78,7 +80,7 @@ impl oio::Write for AzdfsWriter {
match status {
StatusCode::OK | StatusCode::ACCEPTED => {
resp.into_body().consume().await?;
- Ok(())
+ Ok(size)
}
_ => Err(parse_error(resp)
.await?
@@ -86,7 +88,7 @@ impl oio::Write for AzdfsWriter {
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/dropbox/writer.rs
b/core/src/services/dropbox/writer.rs
index 2f5e97558..1b5b6b17d 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -40,12 +40,14 @@ impl DropboxWriter {
#[async_trait]
impl oio::Write for DropboxWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
+
let resp = self
.core
.dropbox_update(
&self.path,
- Some(bs.len()),
+ Some(size),
self.op.content_type(),
AsyncBody::Bytes(bs),
)
@@ -54,13 +56,13 @@ impl oio::Write for DropboxWriter {
match status {
StatusCode::OK => {
resp.into_body().consume().await?;
- Ok(())
+ Ok(size as u64)
}
_ => Err(parse_error(resp).await?),
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 4d31444af..9ca571077 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -54,18 +54,20 @@ impl oio::Write for FsWriter<tokio::fs::File> {
///
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len() as u64;
+
self.f
.seek(SeekFrom::Start(self.pos))
.await
.map_err(parse_io_error)?;
self.f.write_all(&bs).await.map_err(parse_io_error)?;
- self.pos += bs.len() as u64;
+ self.pos += size;
- Ok(())
+ Ok(size)
}
- async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> {
while let Some(bs) = s.next().await {
let bs = bs?;
self.f
@@ -76,7 +78,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
self.pos += bs.len() as u64;
}
- Ok(())
+ Ok(size)
}
async fn abort(&mut self) -> Result<()> {
@@ -104,14 +106,14 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
///
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
self.f
.seek(SeekFrom::Start(self.pos))
.map_err(parse_io_error)?;
self.f.write_all(&bs).map_err(parse_io_error)?;
self.pos += bs.len() as u64;
- Ok(())
+ Ok(bs.len() as u64)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index cd4ba0f6a..18dd6fed9 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -41,7 +41,9 @@ impl FtpWriter {
#[async_trait]
impl oio::Write for FtpWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
+
let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
data_stream.write_all(&bs).await.map_err(|err| {
@@ -50,10 +52,10 @@ impl oio::Write for FtpWriter {
ftp_stream.finalize_put_stream(data_stream).await?;
- Ok(())
+ Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index e6cd8703c..8c4431302 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -118,16 +118,18 @@ impl GcsWriter {
#[async_trait]
impl oio::Write for GcsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len() as u64;
+
let location = match &self.location {
Some(location) => location,
None => {
if self.op.content_length().unwrap_or_default() == bs.len() as
u64
&& self.written == 0
{
- return self
- .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
- .await;
+ self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+
+ return Ok(size);
} else {
let location = self.initiate_upload().await?;
self.location = Some(location);
@@ -138,22 +140,23 @@ impl oio::Write for GcsWriter {
// Ignore empty bytes
if bs.is_empty() {
- return Ok(());
+ return Ok(0);
}
self.buffer.push(bs);
// Return directly if the buffer is not full
if self.buffer.len() <= self.write_fixed_size {
- return Ok(());
+ return Ok(size);
}
let bs = self.buffer.peak_exact(self.write_fixed_size);
+ let size = bs.len() as u64;
match self.write_part(location, bs).await {
Ok(_) => {
self.buffer.take(self.write_fixed_size);
self.written += self.write_fixed_size as u64;
- Ok(())
+ Ok(size)
}
Err(e) => {
// If the upload fails, we should pop the given bs to make sure
@@ -164,8 +167,9 @@ impl oio::Write for GcsWriter {
}
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
- self.write_oneshot(size, AsyncBody::Stream(s)).await
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+ self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+ Ok(size)
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/gdrive/writer.rs
b/core/src/services/gdrive/writer.rs
index 48d88f3ec..b33137137 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -95,15 +95,18 @@ impl GdriveWriter {
#[async_trait]
impl oio::Write for GdriveWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len() as u64;
if self.file_id.is_none() {
- self.write_create(bs.len() as u64, bs).await
+ self.write_create(size, bs).await?;
} else {
- self.write_overwrite(bs.len() as u64, bs).await
+ self.write_overwrite(size, bs).await?;
}
+
+ Ok(size)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
}
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index b2f959947..6bd4bf057 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -42,7 +42,7 @@ impl GhacWriter {
#[async_trait]
impl oio::Write for GhacWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
let size = bs.len() as u64;
let req = self
.backend
@@ -54,7 +54,7 @@ impl oio::Write for GhacWriter {
if resp.status().is_success() {
resp.into_body().consume().await?;
self.size += size;
- Ok(())
+ Ok(size)
} else {
Err(parse_error(resp)
.await
@@ -62,7 +62,7 @@ impl oio::Write for GhacWriter {
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 23c5f1d68..011c8352e 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -41,7 +41,9 @@ impl<F> HdfsWriter<F> {
#[async_trait]
impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
+
while self.pos < bs.len() {
let n = self
.f
@@ -53,10 +55,10 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
// Reset pos to 0 for next write.
self.pos = 0;
- Ok(())
+ Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
@@ -78,7 +80,9 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
}
impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
- fn write(&mut self, bs: Bytes) -> Result<()> {
+ fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
+
while self.pos < bs.len() {
let n = self.f.write(&bs[self.pos..]).map_err(parse_io_error)?;
self.pos += n;
@@ -86,7 +90,7 @@ impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
// Reset pos to 0 for next write.
self.pos = 0;
- Ok(())
+ Ok(size as u64)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ipmfs/writer.rs
b/core/src/services/ipmfs/writer.rs
index 528478142..43a46e500 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -38,7 +38,8 @@ impl IpmfsWriter {
#[async_trait]
impl oio::Write for IpmfsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len() as u64;
let resp = self.backend.ipmfs_write(&self.path, bs).await?;
let status = resp.status();
@@ -46,13 +47,13 @@ impl oio::Write for IpmfsWriter {
match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
- Ok(())
+ Ok(size)
}
_ => Err(parse_error(resp).await?),
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/onedrive/writer.rs
b/core/src/services/onedrive/writer.rs
index 620130847..1086f3fde 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -46,17 +46,19 @@ impl OneDriveWriter {
#[async_trait]
impl oio::Write for OneDriveWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
let size = bs.len();
if size <= Self::MAX_SIMPLE_SIZE {
- self.write_simple(bs).await
+ self.write_simple(bs).await?;
} else {
- self.write_chunked(bs).await
+ self.write_chunked(bs).await?;
}
+
+ Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 76da70da3..71ac41d7c 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -36,13 +36,14 @@ impl SftpWriter {
#[async_trait]
impl oio::Write for SftpWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len() as u64;
self.file.write_all(&bs).await?;
- Ok(())
+ Ok(size)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/supabase/writer.rs
b/core/src/services/supabase/writer.rs
index f4c271313..b786896c2 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -67,15 +67,17 @@ impl SupabaseWriter {
#[async_trait]
impl oio::Write for SupabaseWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
if bs.is_empty() {
- return Ok(());
+ return Ok(9);
}
- self.upload(bs).await
+ let size = bs.len();
+ self.upload(bs).await?;
+ Ok(size as u64)
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/vercel_artifacts/writer.rs
b/core/src/services/vercel_artifacts/writer.rs
index 6f32d67ba..1db2d230f 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -39,7 +39,9 @@ impl VercelArtifactsWriter {
#[async_trait]
impl oio::Write for VercelArtifactsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
+
let resp = self
.backend
.vercel_artifacts_put(
@@ -54,13 +56,13 @@ impl oio::Write for VercelArtifactsWriter {
match status {
StatusCode::OK | StatusCode::ACCEPTED => {
resp.into_body().consume().await?;
- Ok(())
+ Ok(size as u64)
}
_ => Err(parse_error(resp).await?),
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/wasabi/writer.rs
b/core/src/services/wasabi/writer.rs
index 689c334dc..130e8e911 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -41,12 +41,14 @@ impl WasabiWriter {
#[async_trait]
impl oio::Write for WasabiWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
+
let resp = self
.core
.put_object(
&self.path,
- Some(bs.len()),
+ Some(size),
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
@@ -57,13 +59,13 @@ impl oio::Write for WasabiWriter {
match resp.status() {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
- Ok(())
+ Ok(size as u64)
}
_ => Err(parse_error(resp).await?),
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/services/webdav/writer.rs
b/core/src/services/webdav/writer.rs
index a3c17bafa..8dc093e65 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -62,13 +62,18 @@ impl WebdavWriter {
#[async_trait]
impl oio::Write for WebdavWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
- self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
- .await
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len() as u64;
+
+ self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+
+ Ok(size)
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
- self.write_oneshot(size, AsyncBody::Stream(s)).await
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> {
+ self.write_oneshot(size, AsyncBody::Stream(s)).await?;
+
+ Ok(size)
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index 97eef2e3d..1b055f122 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -39,12 +39,14 @@ impl WebhdfsWriter {
#[async_trait]
impl oio::Write for WebhdfsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ let size = bs.len();
+
let req = self
.backend
.webhdfs_create_object_request(
&self.path,
- Some(bs.len()),
+ Some(size),
self.op.content_type(),
AsyncBody::Bytes(bs),
)
@@ -56,13 +58,13 @@ impl oio::Write for WebhdfsWriter {
match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
- Ok(())
+ Ok(size as u64)
}
_ => Err(parse_error(resp).await?),
}
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+ async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 37a9fe72c..f4b93dfa6 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -22,7 +22,7 @@ use std::task::ready;
use std::task::Context;
use std::task::Poll;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
use futures::future::BoxFuture;
use futures::AsyncWrite;
use futures::FutureExt;
@@ -81,14 +81,23 @@ impl Writer {
/// Write into inner writer.
pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
- if let State::Idle(Some(w)) = &mut self.state {
- w.write(bs.into()).await
+ let w = if let State::Idle(Some(w)) = &mut self.state {
+ w
} else {
unreachable!(
"writer state invalid while write, expect Idle, actual {}",
self.state
);
+ };
+
+ let mut bs = bs.into();
+
+ while !bs.is_empty() {
+ let n = w.write(bs.clone()).await?;
+ bs.advance(n as usize);
}
+
+ Ok(())
}
/// Sink into writer.
@@ -123,7 +132,7 @@ impl Writer {
/// Ok(())
/// }
/// ```
- pub async fn sink<S, T>(&mut self, size: u64, sink_from: S) -> Result<()>
+ pub async fn sink<S, T>(&mut self, size: u64, sink_from: S) -> Result<u64>
where
S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static,
T: Into<Bytes>,
@@ -169,7 +178,7 @@ impl Writer {
/// Ok(())
/// }
/// ```
- pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<()>
+ pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<u64>
where
R: futures::AsyncRead + Send + Sync + Unpin + 'static,
{
@@ -390,7 +399,14 @@ impl BlockingWriter {
/// Write into inner writer.
pub fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
- self.inner.write(bs.into())
+ let mut bs = bs.into();
+
+ while !bs.is_empty() {
+ let n = self.inner.write(bs.clone())?;
+ bs.advance(n as usize);
+ }
+
+ Ok(())
}
/// Close the writer and make sure all data have been stored.