This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 49f091ddd7a6d00f74416a482c7fc440ed6c85fe Author: Xuanwo <[email protected]> AuthorDate: Mon Sep 4 14:16:15 2023 +0800 refactor(raw): Return written bytes in oio::Write Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/utils.rs | 8 ++--- core/src/layers/blocking.rs | 2 +- core/src/layers/complete.rs | 16 ++++----- core/src/layers/concurrent_limit.rs | 6 ++-- core/src/layers/error_context.rs | 6 ++-- core/src/layers/logging.rs | 29 ++++++++------- core/src/layers/madsim.rs | 4 +-- core/src/layers/metrics.rs | 23 +++++++----- core/src/layers/minitrace.rs | 6 ++-- core/src/layers/oteltrace.rs | 6 ++-- core/src/layers/prometheus.rs | 23 ++++++------ core/src/layers/retry.rs | 8 ++--- core/src/layers/throttle.rs | 6 ++-- core/src/layers/timeout.rs | 4 +-- core/src/layers/tracing.rs | 6 ++-- core/src/raw/adapters/kv/backend.rs | 12 ++++--- core/src/raw/adapters/typed_kv/backend.rs | 12 ++++--- core/src/raw/oio/write/api.rs | 46 ++++++++++++------------ core/src/raw/oio/write/append_object_write.rs | 12 ++++--- core/src/raw/oio/write/at_least_buf_write.rs | 23 ++++++++---- core/src/raw/oio/write/compose_write.rs | 8 ++--- core/src/raw/oio/write/exact_buf_write.rs | 19 +++++----- core/src/raw/oio/write/multipart_upload_write.rs | 12 ++++--- core/src/raw/oio/write/one_shot_write.rs | 15 ++++---- core/src/services/azblob/writer.rs | 20 ++++++----- core/src/services/azdfs/writer.rs | 8 +++-- core/src/services/dropbox/writer.rs | 10 +++--- core/src/services/fs/writer.rs | 16 +++++---- core/src/services/ftp/writer.rs | 8 +++-- core/src/services/gcs/writer.rs | 22 +++++++----- core/src/services/gdrive/writer.rs | 11 +++--- core/src/services/ghac/writer.rs | 6 ++-- core/src/services/hdfs/writer.rs | 14 +++++--- core/src/services/ipmfs/writer.rs | 7 ++-- core/src/services/onedrive/writer.rs | 10 +++--- core/src/services/sftp/writer.rs | 7 ++-- core/src/services/supabase/writer.rs | 10 +++--- core/src/services/vercel_artifacts/writer.rs | 8 +++-- core/src/services/wasabi/writer.rs | 10 +++--- core/src/services/webdav/writer.rs | 15 +++++--- core/src/services/webhdfs/writer.rs | 10 +++--- core/src/types/writer.rs | 28 +++++++++++---- 42 files changed, 310 insertions(+), 222 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index 0e70bcfc7..9a14442c2 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -27,12 +27,12 @@ pub struct BlackHoleWriter; #[async_trait] impl oio::Write for BlackHoleWriter { - async fn write(&mut self, _: Bytes) -> opendal::Result<()> { - Ok(()) + async fn write(&mut self, bs: Bytes) -> opendal::Result<u64> { + Ok(bs.len() as u64) } - async fn sink(&mut self, _: u64, _: Streamer) -> opendal::Result<()> { - Ok(()) + async fn sink(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> { + Ok(size) } async fn abort(&mut self) -> opendal::Result<()> { diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 7b80b5956..6e530023c 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -196,7 +196,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> { } impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { self.handle.block_on(self.inner.write(bs)) } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 4c58b6e30..e986d1a47 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -711,7 +711,7 @@ impl<W> oio::Write for CompleteWriter<W> where W: oio::Write, { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { let n = bs.len(); if let Some(size) = self.size { @@ -731,10 +731,10 @@ where })?; w.write(bs).await?; self.written += n as u64; - Ok(()) + Ok(n as u64) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { if let Some(total_size) = self.size { if self.written + size > total_size { return Err(Error::new( @@ -750,9 +750,9 @@ where let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - w.sink(size, s).await?; - self.written += size; - Ok(()) + let n = w.sink(size, s).await?; + self.written += n; + Ok(n) } async fn abort(&mut self) -> Result<()> { @@ -794,7 +794,7 @@ impl<W> oio::BlockingWrite for CompleteWriter<W> where W: oio::BlockingWrite, { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { let n = bs.len(); if let Some(size) = self.size { @@ -815,7 +815,7 @@ where w.write(bs)?; self.written += n as u64; - Ok(()) + Ok(n as u64) } fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 9cef0fb9b..96a682d61 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -285,7 +285,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner.write(bs).await } @@ -293,7 +293,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { self.inner.abort().await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner.sink(size, s).await } @@ -303,7 +303,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner.write(bs) } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index bfe9be4df..2acd6dd7d 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> { #[async_trait::async_trait] impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner.write(bs).await.map_err(|err| { err.with_operation(WriteOperation::Write) .with_context("service", self.scheme) @@ -419,7 +419,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { }) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner.sink(size, s).await.map_err(|err| { err.with_operation(WriteOperation::Sink) .with_context("service", self.scheme) @@ -437,7 +437,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { } impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner.write(bs).map_err(|err| { err.with_operation(WriteOperation::BlockingWrite) .with_context("service", self.scheme) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 07323cf8c..6c63f466f 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1252,11 +1252,10 @@ impl<W> LoggingWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for LoggingWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write(&mut self, bs: Bytes) -> Result<u64> { match self.inner.write(bs).await { - Ok(_) => { - self.written += size as u64; + Ok(n) => { + self.written += n; trace!( target: LOGGING_TARGET, "service={} operation={} path={} written={} -> data write {}B", @@ -1264,9 +1263,9 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { WriteOperation::Write, self.path, self.written, - size + n ); - Ok(()) + Ok(n) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1286,10 +1285,10 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { match self.inner.sink(size, s).await { - Ok(_) => { - self.written += size; + Ok(n) => { + self.written += n; trace!( target: LOGGING_TARGET, "service={} operation={} path={} written={} -> data sink {}B", @@ -1297,9 +1296,9 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { WriteOperation::Sink, self.path, self.written, - size + n ); - Ok(()) + Ok(n) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1383,11 +1382,11 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { let size = bs.len(); match self.inner.write(bs) { - Ok(_) => { - self.written += size as u64; + Ok(n) => { + self.written += n; trace!( target: LOGGING_TARGET, "service={} operation={} path={} written={} -> data write {}B", @@ -1397,7 +1396,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> { self.written, size ); - Ok(()) + Ok(n) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index fdf0ec5de..17835e5ba 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -302,7 +302,7 @@ pub struct MadsimWriter { #[async_trait] impl oio::Write for MadsimWriter { - async fn write(&mut self, bs: Bytes) -> crate::Result<()> { + async fn write(&mut self, bs: Bytes) -> crate::Result<u64> { #[cfg(madsim)] { let req = Request::Write(self.path.to_string(), bs); @@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter { } } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<u64> { Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 1eade833b..181ebb3c0 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -847,23 +847,28 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MetricWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner .write(bs) .await - .map(|_| self.bytes += size as u64) + .map(|n| { + self.bytes += n; + n + }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err }) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner .sink(size, s) .await - .map(|_| self.bytes += size) + .map(|n| { + self.bytes += n; + n + }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err @@ -886,11 +891,13 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); + fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner .write(bs) - .map(|_| self.bytes += size as u64) + .map(|n| { + self.bytes += n; + n + }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); err diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 1213d692e..75c852c3a 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner .write(bs) .in_span(Span::enter_with_parent( @@ -347,7 +347,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { .await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner .sink(size, s) .in_span(Span::enter_with_parent( @@ -379,7 +379,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static()); self.inner.write(bs) diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 2ae39b05c..fde87e9ba 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -313,11 +313,11 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner.write(bs).await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner.sink(size, s).await } @@ -331,7 +331,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner.write(bs) } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 005d6aa97..644532bf6 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -662,16 +662,16 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); + async fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner .write(bs) .await - .map(|_| { + .map(|n| { self.stats .bytes_total .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .observe(size as f64) + .observe(n as f64); + n }) .map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); @@ -679,15 +679,16 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { }) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner .sink(size, s) .await - .map(|_| { + .map(|n| { self.stats .bytes_total .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .observe(size as f64) + .observe(n as f64); + n }) .map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); @@ -711,15 +712,15 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); + fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner .write(bs) - .map(|_| { + .map(|n| { self.stats .bytes_total .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .observe(size as f64) + .observe(n as f64); + n }) .map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 7517c2c21..07b92e24c 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -873,7 +873,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapp #[async_trait] impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { let mut backoff = self.builder.build(); loop { @@ -919,14 +919,14 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { /// The overhead is constant, which means the overhead will not increase with the size of /// stream. For example, if every `next` call cost 1ms, then the overhead will only take 0.005% /// which is acceptable. - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { let s = Arc::new(Mutex::new(s)); let mut backoff = self.builder.build(); loop { match self.inner.sink(size, Box::new(s.clone())).await { - Ok(_) => return Ok(()), + Ok(n) => return Ok(n), Err(e) if !e.is_temporary() => return Err(e), Err(e) => match backoff.next() { None => return Err(e), @@ -1013,7 +1013,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { } impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWrapper<R, I> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { { || self.inner.write(bs.clone()) } .retry(&self.builder) .when(|e| e.is_temporary()) diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index d929226df..a88d1c701 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -217,7 +217,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); loop { @@ -242,7 +242,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { } } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { self.inner.sink(size, s).await } @@ -256,7 +256,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); loop { diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index c0fb739aa..be2289d04 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -322,7 +322,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { let timeout = self.io_timeout(bs.len() as u64); tokio::time::timeout(timeout, self.inner.write(bs)) @@ -335,7 +335,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { })? } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { let timeout = self.io_timeout(size); tokio::time::timeout(timeout, self.inner.sink(size, s)) diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 9042e6e35..33dcbdebc 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -324,7 +324,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner.write(bs).await } @@ -332,7 +332,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { self.inner.sink(size, s).await } @@ -358,7 +358,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { self.inner.write(bs) } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index d10971104..be4913ff9 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -390,13 +390,14 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { // TODO: we need to support append in the future. - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); self.buf = Some(bs.into()); - Ok(()) + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", @@ -420,10 +421,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> { } impl<S: Adapter> oio::BlockingWrite for KvWriter<S> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); self.buf = Some(bs.into()); - Ok(()) + Ok(size as u64) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 9f6186a38..48232c1fc 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -403,13 +403,14 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { // TODO: we need to support append in the future. - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); self.buf.push(bs); - Ok(()) + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", @@ -429,10 +430,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> { } impl<S: Adapter> oio::BlockingWrite for KvWriter<S> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); self.buf.push(bs); - Ok(()) + Ok(size as u64) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index f2bb025af..8ced843da 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -74,31 +74,29 @@ impl From<WriteOperation> for &'static str { pub type Writer = Box<dyn Write>; /// Write is the trait that OpenDAL returns to callers. -/// -/// # Notes -/// -/// There are two possible two cases: -/// -/// - Sized: The total size of the object is known in advance. -/// - Unsized: The total size of the object is unknown in advance. -/// -/// And it's possible that the given bs length is less than the total -/// content length. Users will call write multiple times to write -/// the whole data. #[async_trait] pub trait Write: Unpin + Send + Sync { /// Write given bytes into writer. /// - /// # Notes + /// # Behavior /// - /// It's possible that the given bs length is less than the total - /// content length. And users will call write multiple times. + /// - `Ok(n)` means `n` bytes has been written successfully. + /// - `Err(err)` means error happens and no bytes has been written. /// - /// Please make sure `write` is safe to re-enter. - async fn write(&mut self, bs: Bytes) -> Result<()>; + /// It's possible that `n < bs.len()`, caller should pass the remaining bytes + /// repeatedly until all bytes has been written. + async fn write(&mut self, bs: Bytes) -> Result<u64>; /// Sink given stream into writer. - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>; + /// + /// # Behavior + /// + /// - `Ok(n)` means `n` bytes has been written successfully. + /// - `Err(err)` means error happens and no bytes has been written. + /// + /// It's possible that `n < size`, caller should pass the remaining bytes + /// repeatedly until all bytes has been written. + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64>; /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -109,13 +107,13 @@ pub trait Write: Unpin + Send + Sync { #[async_trait] impl Write for () { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { let _ = bs; unimplemented!("write is required to be implemented for oio::Write") } - async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support sink", @@ -142,11 +140,11 @@ impl Write for () { /// To make Writer work as expected, we must add this impl. #[async_trait] impl<T: Write + ?Sized> Write for Box<T> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { (**self).write(bs).await } - async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<u64> { (**self).sink(n, s).await } @@ -165,14 +163,14 @@ pub type BlockingWriter = Box<dyn BlockingWrite>; /// BlockingWrite is the trait that OpenDAL returns to callers. pub trait BlockingWrite: Send + Sync + 'static { /// Write whole content at once. - fn write(&mut self, bs: Bytes) -> Result<()>; + fn write(&mut self, bs: Bytes) -> Result<u64>; /// Close the writer and make sure all data has been flushed. fn close(&mut self) -> Result<()>; } impl BlockingWrite for () { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { let _ = bs; unimplemented!("write is required to be implemented for oio::BlockingWrite") @@ -190,7 +188,7 @@ impl BlockingWrite for () { /// /// To make BlockingWriter work as expected, we must add this impl. impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { (**self).write(bs) } diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index 07fa546cc..b047ef43d 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -79,7 +79,7 @@ impl<W> oio::Write for AppendObjectWriter<W> where W: AppendObjectWrite, { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { let offset = self.offset().await?; let size = bs.len() as u64; @@ -87,16 +87,20 @@ where self.inner .append(offset, size, AsyncBody::Bytes(bs)) .await - .map(|_| self.offset = Some(offset + size)) + .map(|_| self.offset = Some(offset + size))?; + + Ok(size) } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { let offset = self.offset().await?; self.inner .append(offset, size, AsyncBody::Stream(s)) .await - .map(|_| self.offset = Some(offset + size)) + .map(|_| self.offset = Some(offset + size))?; + + Ok(size) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 91adddd30..51f5d2645 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -64,7 +64,7 @@ impl<W: oio::Write> AtLeastBufWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { // If total size is known and equals to given bytes, we can write it directly. if let Some(total_size) = self.total_size { if total_size == bs.len() as u64 { @@ -74,8 +74,9 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { // Push the bytes into the buffer if the buffer is not full. if self.buffer.len() + bs.len() < self.buffer_size { + let size = bs.len(); self.buffer.push(bs); - return Ok(()); + return Ok(size as u64); } let mut buf = self.buffer.clone(); @@ -85,10 +86,13 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { .sink(buf.len() as u64, Box::new(buf)) .await // Clear buffer if the write is successful. - .map(|_| self.buffer.clear()) + .map(|v| { + self.buffer.clear(); + v + }) } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { // If total size is known and equals to given stream, we can write it directly. if let Some(total_size) = self.total_size { if total_size == size { @@ -98,8 +102,10 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { // Push the bytes into the buffer if the buffer is not full. if self.buffer.len() as u64 + size < self.buffer_size as u64 { - self.buffer.push(s.collect().await?); - return Ok(()); + let bs = s.collect().await?; + let size = bs.len() as u64; + self.buffer.push(bs); + return Ok(size); } let buf = self.buffer.clone(); @@ -110,7 +116,10 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { .sink(buffer_size + size, Box::new(stream)) .await // Clear buffer if the write is successful. - .map(|_| self.buffer.clear()) + .map(|v| { + self.buffer.clear(); + v + }) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index 043df2978..79ddfc5ed 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -57,14 +57,14 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> { #[async_trait] impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { match self { Self::One(one) => one.write(bs).await, Self::Two(two) => two.write(bs).await, } } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { match self { Self::One(one) => one.sink(size, s).await, Self::Two(two) => two.sink(size, s).await, @@ -102,7 +102,7 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> { impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write for ThreeWaysWriter<ONE, TWO, THREE> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { match self { Self::One(one) => one.write(bs).await, Self::Two(two) => two.write(bs).await, @@ -110,7 +110,7 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write } } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { match self { Self::One(one) => one.sink(size, s).await, Self::Two(two) => two.sink(size, s).await, diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 8561c1a4a..8e2d8a922 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -84,7 +84,7 @@ impl<W: oio::Write> ExactBufWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for ExactBufWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))) .await } @@ -92,7 +92,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { /// # TODO /// /// We know every stream size, we can collect them into a buffer without chain them every time. - async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<()> { + async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<u64> { if self.buffer.len() >= self.buffer_size { let mut buf = self.buffer.clone(); let to_write = buf.split_to(self.buffer_size); @@ -101,9 +101,10 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { .sink(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. - .map(|_| { + .map(|v| { self.buffer = buf; self.chain_stream(s); + v }); } @@ -120,8 +121,9 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { // // We don't need to chain stream here because it must be consumed. if buf.len() < self.buffer_size { + let size = buf.len() as u64; self.buffer = buf; - return Ok(()); + return Ok(size); } let to_write = buf.split_to(self.buffer_size); @@ -129,9 +131,10 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { .sink(to_write.len() as u64, Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. - .map(|_| { + .map(|v| { self.buffer = buf; self.chain_stream(s); + v }) } @@ -202,14 +205,14 @@ mod tests { #[async_trait] impl Write for MockWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len()); self.buf.extend_from_slice(&bs); - Ok(()) + Ok(bs.len() as u64) } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: Streamer) -> Result<u64> { let bs = s.collect().await?; assert_eq!(bs.len() as u64, size); self.write(bs).await diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index c013b24c3..7bfd0342c 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -120,7 +120,7 @@ impl<W> oio::Write for MultipartUploadWriter<W> where W: MultipartUploadWrite, { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { let upload_id = self.upload_id().await?; let size = bs.len(); @@ -133,16 +133,20 @@ where AsyncBody::Bytes(bs), ) .await - .map(|v| self.parts.push(v)) + .map(|v| self.parts.push(v))?; + + Ok(size as u64) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { let upload_id = self.upload_id().await?; self.inner .write_part(&upload_id, self.parts.len(), size, AsyncBody::Stream(s)) .await - .map(|v| self.parts.push(v)) + .map(|v| self.parts.push(v))?; + + Ok(size) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 35b1883bf..e6fe47616 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -49,15 +49,18 @@ impl<W: OneShotWrite> OneShotWriter<W> { #[async_trait] impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { let cursor = oio::Cursor::from(bs); - self.inner - .write_once(cursor.len() as u64, Box::new(cursor)) - .await + + let size = cursor.len() as u64; + self.inner.write_once(size, Box::new(cursor)).await?; + + Ok(size) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.write_once(size, s).await + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + self.inner.write_once(size, s).await?; + Ok(size) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 31a56f27a..a3b8abe30 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -161,10 +161,11 @@ impl AzblobWriter { #[async_trait] impl oio::Write for AzblobWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len() as u64; + if self.op.append() { - self.append_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await + self.append_oneshot(size, AsyncBody::Bytes(bs)).await?; } else { if self.op.content_length().is_none() { return Err(Error::new( @@ -173,14 +174,15 @@ impl oio::Write for AzblobWriter { )); } - self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await + self.write_oneshot(size, AsyncBody::Bytes(bs)).await?; } + + Ok(size) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { if self.op.append() { - self.append_oneshot(size, AsyncBody::Stream(s)).await + self.append_oneshot(size, AsyncBody::Stream(s)).await?; } else { if self.op.content_length().is_none() { return Err(Error::new( @@ -189,8 +191,10 @@ impl oio::Write for AzblobWriter { )); } - self.write_oneshot(size, AsyncBody::Stream(s)).await + self.write_oneshot(size, AsyncBody::Stream(s)).await?; } + + Ok(size) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 3c8db1ac1..ff1125bfa 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -41,7 +41,9 @@ impl AzdfsWriter { #[async_trait] impl oio::Write for AzdfsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len() as u64; + let mut req = self.core.azdfs_create_request( &self.path, "file", @@ -78,7 +80,7 @@ impl oio::Write for AzdfsWriter { match status { StatusCode::OK | StatusCode::ACCEPTED => { resp.into_body().consume().await?; - Ok(()) + Ok(size) } _ => Err(parse_error(resp) .await? @@ -86,7 +88,7 @@ impl oio::Write for AzdfsWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 2f5e97558..1b5b6b17d 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -40,12 +40,14 @@ impl DropboxWriter { #[async_trait] impl oio::Write for DropboxWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); + let resp = self .core .dropbox_update( &self.path, - Some(bs.len()), + Some(size), self.op.content_type(), AsyncBody::Bytes(bs), ) @@ -54,13 +56,13 @@ impl oio::Write for DropboxWriter { match status { StatusCode::OK => { resp.into_body().consume().await?; - Ok(()) + Ok(size as u64) } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 4d31444af..9ca571077 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -54,18 +54,20 @@ impl oio::Write for FsWriter<tokio::fs::File> { /// /// File could be partial written, so we will seek to start to make sure /// we write the same content. - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len() as u64; + self.f .seek(SeekFrom::Start(self.pos)) .await .map_err(parse_io_error)?; self.f.write_all(&bs).await.map_err(parse_io_error)?; - self.pos += bs.len() as u64; + self.pos += size; - Ok(()) + Ok(size) } - async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> { while let Some(bs) = s.next().await { let bs = bs?; self.f @@ -76,7 +78,7 @@ impl oio::Write for FsWriter<tokio::fs::File> { self.pos += bs.len() as u64; } - Ok(()) + Ok(size) } async fn abort(&mut self) -> Result<()> { @@ -104,14 +106,14 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> { /// /// File could be partial written, so we will seek to start to make sure /// we write the same content. - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { self.f .seek(SeekFrom::Start(self.pos)) .map_err(parse_io_error)?; self.f.write_all(&bs).map_err(parse_io_error)?; self.pos += bs.len() as u64; - Ok(()) + Ok(bs.len() as u64) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index cd4ba0f6a..18dd6fed9 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -41,7 +41,9 @@ impl FtpWriter { #[async_trait] impl oio::Write for FtpWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); + let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?; let mut data_stream = ftp_stream.append_with_stream(&self.path).await?; data_stream.write_all(&bs).await.map_err(|err| { @@ -50,10 +52,10 @@ impl oio::Write for FtpWriter { ftp_stream.finalize_put_stream(data_stream).await?; - Ok(()) + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index e6cd8703c..8c4431302 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -118,16 +118,18 @@ impl GcsWriter { #[async_trait] impl oio::Write for GcsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len() as u64; + let location = match &self.location { Some(location) => location, None => { if self.op.content_length().unwrap_or_default() == bs.len() as u64 && self.written == 0 { - return self - .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await; + self.write_oneshot(size, AsyncBody::Bytes(bs)).await?; + + return Ok(size); } else { let location = self.initiate_upload().await?; self.location = Some(location); @@ -138,22 +140,23 @@ impl oio::Write for GcsWriter { // Ignore empty bytes if bs.is_empty() { - return Ok(()); + return Ok(0); } self.buffer.push(bs); // Return directly if the buffer is not full if self.buffer.len() <= self.write_fixed_size { - return Ok(()); + return Ok(size); } let bs = self.buffer.peak_exact(self.write_fixed_size); + let size = bs.len() as u64; match self.write_part(location, bs).await { Ok(_) => { self.buffer.take(self.write_fixed_size); self.written += self.write_fixed_size as u64; - Ok(()) + Ok(size) } Err(e) => { // If the upload fails, we should pop the given bs to make sure @@ -164,8 +167,9 @@ impl oio::Write for GcsWriter { } } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.write_oneshot(size, AsyncBody::Stream(s)).await + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + self.write_oneshot(size, AsyncBody::Stream(s)).await?; + Ok(size) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index 48d88f3ec..b33137137 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -95,15 +95,18 @@ impl GdriveWriter { #[async_trait] impl oio::Write for GdriveWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len() as u64; if self.file_id.is_none() { - self.write_create(bs.len() as u64, bs).await + self.write_create(size, bs).await?; } else { - self.write_overwrite(bs.len() as u64, bs).await + self.write_overwrite(size, bs).await?; } + + Ok(size) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new(ErrorKind::Unsupported, "sink is not supported")) } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index b2f959947..6bd4bf057 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -42,7 +42,7 @@ impl GhacWriter { #[async_trait] impl oio::Write for GhacWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { let size = bs.len() as u64; let req = self .backend @@ -54,7 +54,7 @@ impl oio::Write for GhacWriter { if resp.status().is_success() { resp.into_body().consume().await?; self.size += size; - Ok(()) + Ok(size) } else { Err(parse_error(resp) .await @@ -62,7 +62,7 @@ impl oio::Write for GhacWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 23c5f1d68..011c8352e 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -41,7 +41,9 @@ impl<F> HdfsWriter<F> { #[async_trait] impl oio::Write for HdfsWriter<hdrs::AsyncFile> { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); + while self.pos < bs.len() { let n = self .f @@ -53,10 +55,10 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { // Reset pos to 0 for next write. self.pos = 0; - Ok(()) + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", @@ -78,7 +80,9 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { } impl oio::BlockingWrite for HdfsWriter<hdrs::File> { - fn write(&mut self, bs: Bytes) -> Result<()> { + fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); + while self.pos < bs.len() { let n = self.f.write(&bs[self.pos..]).map_err(parse_io_error)?; self.pos += n; @@ -86,7 +90,7 @@ impl oio::BlockingWrite for HdfsWriter<hdrs::File> { // Reset pos to 0 for next write. self.pos = 0; - Ok(()) + Ok(size as u64) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 528478142..43a46e500 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -38,7 +38,8 @@ impl IpmfsWriter { #[async_trait] impl oio::Write for IpmfsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len() as u64; let resp = self.backend.ipmfs_write(&self.path, bs).await?; let status = resp.status(); @@ -46,13 +47,13 @@ impl oio::Write for IpmfsWriter { match status { StatusCode::CREATED | StatusCode::OK => { resp.into_body().consume().await?; - Ok(()) + Ok(size) } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 620130847..1086f3fde 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -46,17 +46,19 @@ impl OneDriveWriter { #[async_trait] impl oio::Write for OneDriveWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { let size = bs.len(); if size <= Self::MAX_SIMPLE_SIZE { - self.write_simple(bs).await + self.write_simple(bs).await?; } else { - self.write_chunked(bs).await + self.write_chunked(bs).await?; } + + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 76da70da3..71ac41d7c 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -36,13 +36,14 @@ impl SftpWriter { #[async_trait] impl oio::Write for SftpWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len() as u64; self.file.write_all(&bs).await?; - Ok(()) + Ok(size) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index f4c271313..b786896c2 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -67,15 +67,17 @@ impl SupabaseWriter { #[async_trait] impl oio::Write for SupabaseWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { if bs.is_empty() { - return Ok(()); + return Ok(9); } - self.upload(bs).await + let size = bs.len(); + self.upload(bs).await?; + Ok(size as u64) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 6f32d67ba..1db2d230f 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -39,7 +39,9 @@ impl VercelArtifactsWriter { #[async_trait] impl oio::Write for VercelArtifactsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); + let resp = self .backend .vercel_artifacts_put( @@ -54,13 +56,13 @@ impl oio::Write for VercelArtifactsWriter { match status { StatusCode::OK | StatusCode::ACCEPTED => { resp.into_body().consume().await?; - Ok(()) + Ok(size as u64) } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 689c334dc..130e8e911 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -41,12 +41,14 @@ impl WasabiWriter { #[async_trait] impl oio::Write for WasabiWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); + let resp = self .core .put_object( &self.path, - Some(bs.len()), + Some(size), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), @@ -57,13 +59,13 @@ impl oio::Write for WasabiWriter { match resp.status() { StatusCode::CREATED | StatusCode::OK => { resp.into_body().consume().await?; - Ok(()) + Ok(size as u64) } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index a3c17bafa..8dc093e65 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -62,13 +62,18 @@ impl WebdavWriter { #[async_trait] impl oio::Write for WebdavWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len() as u64; + + self.write_oneshot(size, AsyncBody::Bytes(bs)).await?; + + Ok(size) } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.write_oneshot(size, AsyncBody::Stream(s)).await + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + self.write_oneshot(size, AsyncBody::Stream(s)).await?; + + Ok(size) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 97eef2e3d..1b055f122 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -39,12 +39,14 @@ impl WebhdfsWriter { #[async_trait] impl oio::Write for WebhdfsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write(&mut self, bs: Bytes) -> Result<u64> { + let size = bs.len(); + let req = self .backend .webhdfs_create_object_request( &self.path, - Some(bs.len()), + Some(size), self.op.content_type(), AsyncBody::Bytes(bs), ) @@ -56,13 +58,13 @@ impl oio::Write for WebhdfsWriter { match status { StatusCode::CREATED | StatusCode::OK => { resp.into_body().consume().await?; - Ok(()) + Ok(size as u64) } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 37a9fe72c..f4b93dfa6 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -22,7 +22,7 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use futures::future::BoxFuture; use futures::AsyncWrite; use futures::FutureExt; @@ -81,14 +81,23 @@ impl Writer { /// Write into inner writer. pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> { - if let State::Idle(Some(w)) = &mut self.state { - w.write(bs.into()).await + let w = if let State::Idle(Some(w)) = &mut self.state { + w } else { unreachable!( "writer state invalid while write, expect Idle, actual {}", self.state ); + }; + + let mut bs = bs.into(); + + while !bs.is_empty() { + let n = w.write(bs.clone()).await?; + bs.advance(n as usize); } + + Ok(()) } /// Sink into writer. @@ -123,7 +132,7 @@ impl Writer { /// Ok(()) /// } /// ``` - pub async fn sink<S, T>(&mut self, size: u64, sink_from: S) -> Result<()> + pub async fn sink<S, T>(&mut self, size: u64, sink_from: S) -> Result<u64> where S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static, T: Into<Bytes>, @@ -169,7 +178,7 @@ impl Writer { /// Ok(()) /// } /// ``` - pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<()> + pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<u64> where R: futures::AsyncRead + Send + Sync + Unpin + 'static, { @@ -390,7 +399,14 @@ impl BlockingWriter { /// Write into inner writer. pub fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> { - self.inner.write(bs.into()) + let mut bs = bs.into(); + + while !bs.is_empty() { + let n = self.inner.write(bs.clone())?; + bs.advance(n as usize); + } + + Ok(()) } /// Close the writer and make sure all data have been stored.
