This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 84539ecd2b2382d229e02372d441cf2aadc13850 Author: Xuanwo <[email protected]> AuthorDate: Tue Aug 29 20:32:01 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/utils.rs | 2 +- core/benches/oio/write.rs | 18 ++++++---------- core/src/layers/blocking.rs | 6 ++---- core/src/layers/complete.rs | 13 +++++------ core/src/layers/concurrent_limit.rs | 4 ++-- core/src/layers/error_context.rs | 4 ++-- core/src/layers/logging.rs | 5 +++-- core/src/layers/madsim.rs | 2 +- core/src/layers/metrics.rs | 5 +++-- core/src/layers/minitrace.rs | 4 ++-- core/src/layers/oteltrace.rs | 4 ++-- core/src/layers/prometheus.rs | 5 +++-- core/src/layers/retry.rs | 4 ++-- core/src/layers/throttle.rs | 4 ++-- core/src/layers/timeout.rs | 6 +++--- core/src/layers/tracing.rs | 4 ++-- core/src/raw/adapters/kv/backend.rs | 2 +- core/src/raw/adapters/typed_kv/backend.rs | 2 +- core/src/raw/http_util/multipart.rs | 6 +++--- core/src/raw/oio/cursor.rs | 4 ++-- core/src/raw/oio/stream/api.rs | 19 ++++++---------- core/src/raw/oio/stream/into_stream.rs | 2 +- core/src/raw/oio/stream/into_stream_from_reader.rs | 2 +- core/src/raw/oio/write/api.rs | 15 ++++--------- core/src/raw/oio/write/append_object_write.rs | 7 +++--- core/src/raw/oio/write/at_least_buf_write.rs | 16 ++++++-------- core/src/raw/oio/write/compose_write.rs | 14 ++++++------ core/src/raw/oio/write/exact_buf_write.rs | 25 ++++++++-------------- core/src/raw/oio/write/multipart_upload_write.rs | 7 +++--- core/src/raw/oio/write/one_shot_write.rs | 6 +++--- core/src/services/azblob/writer.rs | 7 +++--- core/src/services/azdfs/writer.rs | 2 +- core/src/services/cos/writer.rs | 17 +++++++++------ core/src/services/dropbox/writer.rs | 2 +- core/src/services/fs/writer.rs | 2 +- core/src/services/ftp/writer.rs | 2 +- core/src/services/gcs/writer.rs | 5 +++-- core/src/services/gdrive/writer.rs | 2 +- core/src/services/ghac/writer.rs | 2 +- core/src/services/hdfs/writer.rs | 2 +- core/src/services/ipmfs/writer.rs | 2 +- core/src/services/obs/writer.rs | 17 +++++++++------ core/src/services/onedrive/writer.rs | 2 +- core/src/services/oss/writer.rs | 18 ++++++++++------ core/src/services/s3/writer.rs | 19 +++++++++------- core/src/services/sftp/writer.rs | 2 +- core/src/services/supabase/writer.rs | 2 +- core/src/services/vercel_artifacts/writer.rs | 2 +- core/src/services/wasabi/writer.rs | 2 +- core/src/services/webdav/writer.rs | 5 +++-- core/src/services/webhdfs/writer.rs | 2 +- core/src/types/operator/operator.rs | 3 +-- core/src/types/writer.rs | 13 +++++------ 53 files changed, 171 insertions(+), 178 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index e90c439ca..efbf15330 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -27,7 +27,7 @@ pub struct BlackHoleWriter; #[async_trait] impl oio::Write for BlackHoleWriter { - async fn write(&mut self, _: u64, _: Streamer) -> opendal::Result<()> { + async fn write(&mut self, _: Streamer) -> opendal::Result<()> { Ok(()) } diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs index 2fd5392f5..ac0ed15dc 100644 --- a/core/benches/oio/write.rs +++ b/core/benches/oio/write.rs @@ -47,12 +47,9 @@ pub fn bench_at_least_buf_write(c: &mut Criterion) { b.to_async(&*TOKIO).iter(|| async { let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024); - w.write( - content.len() as u64, - Box::new(oio::Cursor::from(content.clone())), - ) - .await - .unwrap(); + w.write(Box::new(oio::Cursor::from(content.clone()))) + .await + .unwrap(); w.close().await.unwrap(); }) }); @@ -78,12 +75,9 @@ pub fn bench_exact_buf_write(c: &mut Criterion) { group.bench_with_input(size.to_string(), &content, |b, content| { b.to_async(&*TOKIO).iter(|| async { let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024); - w.write( - content.len() as u64, - Box::new(oio::Cursor::from(content.clone())), - ) - .await - .unwrap(); + w.write(Box::new(oio::Cursor::from(content.clone()))) + .await + .unwrap(); w.close().await.unwrap(); }) }); diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index f5f264395..41c3e75d2 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -197,10 +197,8 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> { impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> { fn write(&mut self, bs: Bytes) -> Result<()> { - self.handle.block_on( - self.inner - .write(bs.len() as u64, Box::new(oio::Cursor::from(bs))), - ) + self.handle + .block_on(self.inner.write(Box::new(oio::Cursor::from(bs)))) } fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index d0d39784c..65ada747f 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -25,13 +25,13 @@ use std::task::Poll; use async_trait::async_trait; use bytes::Bytes; -use crate::raw::oio::into_flat_page; use crate::raw::oio::into_hierarchy_page; use crate::raw::oio::ByRangeSeekableReader; use crate::raw::oio::Entry; use crate::raw::oio::FlatPager; use crate::raw::oio::HierarchyPager; use crate::raw::oio::StreamableReader; +use crate::raw::oio::{into_flat_page, Stream}; use crate::raw::*; use crate::*; @@ -711,14 +711,14 @@ impl<W> oio::Write for CompleteWriter<W> where W: oio::Write, { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { if let Some(total_size) = self.size { - if self.written + size > total_size { + if self.written + s.size() > total_size { return Err(Error::new( ErrorKind::ContentTruncated, &format!( - "writer got too much data, expect: {size}, actual: {}", - self.written + size + "writer got too much data, expect: {total_size}, actual: {}", + self.written + s.size() ), )); } @@ -727,7 +727,8 @@ where let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - w.write(size, s).await?; + let size = s.size(); + w.write(s).await?; self.written += size; Ok(()) } diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index e6521cfbb..d08bcd6b3 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -285,8 +285,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.write(size, s).await + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + self.inner.write(s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 0e588fddc..adc768a2b 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -411,8 +411,8 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { }) } - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.write(size, s).await.map_err(|err| { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + self.inner.write(s).await.map_err(|err| { err.with_operation(WriteOperation::Sink) .with_context("service", self.scheme) .with_context("path", &self.path) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 316a2c073..b1fc1ade4 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1252,8 +1252,9 @@ impl<W> LoggingWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for LoggingWriter<W> { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - match self.inner.write(size, s).await { + async fn write(&mut self, mut s: oio::Streamer) -> Result<()> { + let size = s.size(); + match self.inner.write(s).await { Ok(_) => { self.written += size; trace!( diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 9ca4105ff..eee47e10e 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -302,7 +302,7 @@ pub struct MadsimWriter { #[async_trait] impl oio::Write for MadsimWriter { - async fn write(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> { + async fn write(&mut self, s: oio::Streamer) -> crate::Result<()> { Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 580e42f6b..0b7ceb872 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -847,9 +847,10 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MetricWrapper<R> { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + let size = s.size(); self.inner - .write(size, s) + .write(s) .await .map(|_| self.bytes += size) .map_err(|err| { diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index b40d567c9..a5355460b 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -337,9 +337,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { self.inner - .write(size, s) + .write(s) .in_span(Span::enter_with_parent( WriteOperation::Sink.into_static(), &self.span, diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 903ab098d..f0a8240fd 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -313,8 +313,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.write(size, s).await + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + self.inner.write(s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 82f249f03..68194a67b 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -662,9 +662,10 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + let size = s.size(); self.inner - .write(size, s) + .write(s) .await .map(|_| { self.stats diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 38511f527..85cf1b2a0 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -893,13 +893,13 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { /// The overhead is constant, which means the overhead will not increase with the size of /// stream. For example, if every `next` call cost 1ms, then the overhead will only take 0.005% /// which is acceptable. - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { let s = Arc::new(Mutex::new(s)); let mut backoff = self.builder.build(); loop { - match self.inner.write(size, Box::new(s.clone())).await { + match self.inner.write(Box::new(s.clone())).await { Ok(_) => return Ok(()), Err(e) if !e.is_temporary() => return Err(e), Err(e) => match backoff.next() { diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 3429c14a4..28a5f8351 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -217,8 +217,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { - async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { - self.inner.write(size, s).await + async fn write(&mut self, s: Streamer) -> Result<()> { + self.inner.write(s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 0dfec0212..802940ecf 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -322,10 +322,10 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - let timeout = self.io_timeout(size); + async fn write(&mut self, mut s: oio::Streamer) -> Result<()> { + let timeout = self.io_timeout(s.size()); - tokio::time::timeout(timeout, self.inner.write(size, s)) + tokio::time::timeout(timeout, self.inner.write(s)) .await .map_err(|_| { Error::new(ErrorKind::Unexpected, "operation timeout") diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index c11574dba..b1c248974 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -324,8 +324,8 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.write(size, s).await + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + self.inner.write(s).await } #[tracing::instrument( diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index da402e86b..8ed445db6 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -389,7 +389,7 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 6500a2f1a..fad9373a7 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -402,7 +402,7 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index 6ec4e9c09..77b0709f0 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -182,7 +182,7 @@ pub struct MultipartStream<T: Part> { } impl<T: Part> Stream for MultipartStream<T> { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { self.size } @@ -338,7 +338,7 @@ pub struct FormDataPartStream { #[async_trait] impl Stream for FormDataPartStream { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { self.size } @@ -701,7 +701,7 @@ pub struct MixedPartStream { } impl Stream for MixedPartStream { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { self.size } diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index 74f97ef4d..5e7a665f0 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -161,7 +161,7 @@ impl oio::BlockingRead for Cursor { } impl oio::Stream for Cursor { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { self.inner.len() as u64 - self.pos } @@ -331,7 +331,7 @@ impl ChunkedCursor { } impl oio::Stream for ChunkedCursor { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { self.len() as u64 } diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs index fcf7b8b5c..79946d847 100644 --- a/core/src/raw/oio/stream/api.rs +++ b/core/src/raw/oio/stream/api.rs @@ -37,12 +37,7 @@ pub type Streamer = Box<dyn Stream>; /// `Unpin` + `Send` + `Sync`. And the item is `Result<Bytes>`. pub trait Stream: Unpin + Send + Sync { /// Fetch remaining size of this stream. - /// - /// # NOTES - /// - /// It's by design that we take `&mut self` here to make sure we don't have other - /// threads reading the same stream at the same time. - fn size(&mut self) -> u64; + fn size(&self) -> u64; /// Fetch next item `Result<Bytes>` from the stream. fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>>; @@ -52,7 +47,7 @@ pub trait Stream: Unpin + Send + Sync { } impl Stream for () { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { unimplemented!("size is required to be implemented for oio::Stream") } @@ -72,7 +67,7 @@ impl Stream for () { /// `Box<dyn Stream>` won't implement `Stream` automatically. /// To make Streamer work as expected, we must add this impl. impl<T: Stream + ?Sized> Stream for Box<T> { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { (**self).size() } @@ -86,7 +81,7 @@ impl<T: Stream + ?Sized> Stream for Box<T> { } impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { match self.try_lock() { Ok(mut this) => this.size(), Err(_) => panic!("the stream is expected to have only one consumer, but it's not"), @@ -115,7 +110,7 @@ impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> { } impl<T: Stream + ?Sized> Stream for Arc<tokio::sync::Mutex<T>> { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { match self.try_lock() { Ok(mut this) => this.size(), Err(_) => panic!("the stream is expected to have only one consumer, but it's not"), @@ -238,8 +233,8 @@ pub struct Chain<S1: Stream, S2: Stream> { } impl<S1: Stream, S2: Stream> Stream for Chain<S1, S2> { - fn size(&mut self) -> u64 { - self.first.as_mut().map(|v| v.size()).unwrap_or_default() + self.second.size() + fn size(&self) -> u64 { + self.first.as_ref().map(|v| v.size()).unwrap_or_default() + self.second.size() } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { diff --git a/core/src/raw/oio/stream/into_stream.rs b/core/src/raw/oio/stream/into_stream.rs index 81e2301ae..6b073ffd1 100644 --- a/core/src/raw/oio/stream/into_stream.rs +++ b/core/src/raw/oio/stream/into_stream.rs @@ -44,7 +44,7 @@ impl<S> oio::Stream for IntoStream<S> where S: futures::Stream<Item = Result<Bytes>> + Send + Sync + Unpin, { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { self.size } diff --git a/core/src/raw/oio/stream/into_stream_from_reader.rs b/core/src/raw/oio/stream/into_stream_from_reader.rs index a038695ef..958d4b91c 100644 --- a/core/src/raw/oio/stream/into_stream_from_reader.rs +++ b/core/src/raw/oio/stream/into_stream_from_reader.rs @@ -54,7 +54,7 @@ impl<S> oio::Stream for FromReaderStream<S> where S: AsyncRead + Send + Sync + Unpin, { - fn size(&mut self) -> u64 { + fn size(&self) -> u64 { self.size } diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index ba4705679..12d6ea3c4 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -88,14 +88,7 @@ pub type Writer = Box<dyn Write>; #[async_trait] pub trait Write: Unpin + Send + Sync { /// Sink given stream into writer. - /// - /// # Notes - /// - /// It's possible that the given bs length is less than the total - /// content length. And users will call write multiple times. - /// - /// Please make sure `write` is safe to re-enter. - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()>; + async fn write(&mut self, s: oio::Streamer) -> Result<()>; /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -106,7 +99,7 @@ pub trait Write: Unpin + Send + Sync { #[async_trait] impl Write for () { - async fn write(&mut self, _: u64, _: oio::Streamer) -> Result<()> { + async fn write(&mut self, _: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support sink", @@ -133,8 +126,8 @@ impl Write for () { /// To make Writer work as expected, we must add this impl. #[async_trait] impl<T: Write + ?Sized> Write for Box<T> { - async fn write(&mut self, n: u64, s: oio::Streamer) -> Result<()> { - (**self).write(n, s).await + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + (**self).write(s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index d1380c584..f4fffc704 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; -use crate::raw::oio::Streamer; +use crate::raw::oio::{Stream, Streamer}; use crate::raw::*; use crate::*; @@ -78,11 +78,12 @@ impl<W> oio::Write for AppendObjectWriter<W> where W: AppendObjectWrite, { - async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn write(&mut self, s: Streamer) -> Result<()> { let offset = self.offset().await?; + let size = s.size(); self.inner - .append(offset, size, AsyncBody::Stream(s)) + .append(offset, s.size(), AsyncBody::Stream(s)) .await .map(|_| self.offset = Some(offset + size)) } diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 2c269cc25..87c5a523f 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -17,8 +17,8 @@ use async_trait::async_trait; -use crate::raw::oio::StreamExt; use crate::raw::oio::Streamer; +use crate::raw::oio::{Stream, StreamExt}; use crate::raw::*; use crate::*; @@ -63,16 +63,16 @@ impl<W: oio::Write> AtLeastBufWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { - async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn write(&mut self, s: Streamer) -> Result<()> { // If total size is known and equals to given stream, we can write it directly. if let Some(total_size) = self.total_size { - if total_size == size { - return self.inner.write(size, s).await; + if total_size == s.size() { + return self.inner.write(s).await; } } // Push the bytes into the buffer if the buffer is not full. - if self.buffer.len() as u64 + size < self.buffer_size as u64 { + if self.buffer.len() as u64 + s.size() < self.buffer_size as u64 { self.buffer.push(s.collect().await?); return Ok(()); } @@ -82,7 +82,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { let stream = buf.chain(s); self.inner - .write(buffer_size + size, Box::new(stream)) + .write(Box::new(stream)) .await // Clear buffer if the write is successful. .map(|_| self.buffer.clear()) @@ -95,9 +95,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { async fn close(&mut self) -> Result<()> { if !self.buffer.is_empty() { - self.inner - .write(self.buffer.len() as u64, Box::new(self.buffer.clone())) - .await?; + self.inner.write(Box::new(self.buffer.clone())).await?; self.buffer.clear(); } diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index 1cd7c6f72..bd6cc9b77 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -56,10 +56,10 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> { #[async_trait] impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { - async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn write(&mut self, s: Streamer) -> Result<()> { match self { - Self::One(one) => one.write(size, s).await, - Self::Two(two) => two.write(size, s).await, + Self::One(one) => one.write(s).await, + Self::Two(two) => two.write(s).await, } } @@ -94,11 +94,11 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> { impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write for ThreeWaysWriter<ONE, TWO, THREE> { - async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn write(&mut self, s: Streamer) -> Result<()> { match self { - Self::One(one) => one.write(size, s).await, - Self::Two(two) => two.write(size, s).await, - Self::Three(three) => three.write(size, s).await, + Self::One(one) => one.write(s).await, + Self::Two(two) => two.write(s).await, + Self::Three(three) => three.write(s).await, } } diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 1d25751a6..0fa358c91 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -87,13 +87,13 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { /// # TODO /// /// We know every stream size, we can collect them into a buffer without chain them every time. - async fn write(&mut self, _: u64, mut s: Streamer) -> Result<()> { + async fn write(&mut self, mut s: Streamer) -> Result<()> { if self.buffer.len() >= self.buffer_size { let mut buf = self.buffer.clone(); let to_write = buf.split_to(self.buffer_size); return self .inner - .write(to_write.len() as u64, Box::new(to_write)) + .write(Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|_| { @@ -121,7 +121,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let to_write = buf.split_to(self.buffer_size); self.inner - .write(to_write.len() as u64, Box::new(to_write)) + .write(Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|_| { @@ -153,7 +153,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let mut buf = self.buffer.clone(); let to_write = buf.split_to(self.buffer_size); self.inner - .write(to_write.len() as u64, Box::new(to_write)) + .write(Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|_| { @@ -167,7 +167,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { let to_write = buf.split_to(min(self.buffer_size, buf.len())); self.inner - .write(to_write.len() as u64, Box::new(to_write)) + .write(Box::new(to_write)) .await // Replace buffer with remaining if the write is successful. .map(|_| self.buffer = buf)?; @@ -197,9 +197,8 @@ mod tests { #[async_trait] impl Write for MockWriter { - async fn write(&mut self, size: u64, s: Streamer) -> Result<()> { + async fn write(&mut self, s: Streamer) -> Result<()> { let bs = s.collect().await?; - assert_eq!(bs.len() as u64, size); self.buf.extend_from_slice(&bs); Ok(()) @@ -228,11 +227,8 @@ mod tests { let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10); - w.write( - expected.len() as u64, - Box::new(oio::Cursor::from(Bytes::from(expected.clone()))), - ) - .await?; + w.write(Box::new(oio::Cursor::from(Bytes::from(expected.clone())))) + .await?; w.close().await?; assert_eq!(w.inner.buf.len(), expected.len()); @@ -266,10 +262,7 @@ mod tests { expected.extend_from_slice(&content); writer - .write( - expected.len() as u64, - Box::new(oio::Cursor::from(Bytes::from(expected.clone()))), - ) + .write(Box::new(oio::Cursor::from(Bytes::from(expected.clone())))) .await?; } writer.close().await?; diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index da8f01695..a0e3b3017 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -51,8 +51,7 @@ pub trait MultipartUploadWrite: Send + Sync + Unpin { &self, upload_id: &str, part_number: usize, - size: u64, - body: AsyncBody, + content: oio::Streamer, ) -> Result<MultipartUploadPart>; /// complete_part will complete the multipart upload to build the final @@ -119,11 +118,11 @@ impl<W> oio::Write for MultipartUploadWriter<W> where W: MultipartUploadWrite, { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { let upload_id = self.upload_id().await?; self.inner - .write_part(&upload_id, self.parts.len(), size, AsyncBody::Stream(s)) + .write_part(&upload_id, self.parts.len(), s) .await .map(|v| self.parts.push(v)) } diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 971c59878..b9caae866 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -31,7 +31,7 @@ pub trait OneShotWrite: Send + Sync + Unpin { /// write_once write all data at once. /// /// Implementations should make sure that the data is written correctly at once. - async fn write_once(&self, size: u64, stream: oio::Streamer) -> Result<()>; + async fn write_once(&self, stream: oio::Streamer) -> Result<()>; } /// OneShotWrite is used to implement [`Write`] based on one shot. @@ -48,8 +48,8 @@ impl<W: OneShotWrite> OneShotWriter<W> { #[async_trait] impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.write_once(size, s).await + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + self.inner.write_once(s).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 0b243072f..c2519294a 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -22,6 +22,7 @@ use http::StatusCode; use super::core::AzblobCore; use super::error::parse_error; +use crate::raw::oio::Stream; use crate::raw::*; use crate::*; @@ -160,9 +161,9 @@ impl AzblobWriter { #[async_trait] impl oio::Write for AzblobWriter { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { if self.op.append() { - self.append_oneshot(size, AsyncBody::Stream(s)).await + self.append_oneshot(s.size(), AsyncBody::Stream(s)).await } else { if self.op.content_length().is_none() { return Err(Error::new( @@ -171,7 +172,7 @@ impl oio::Write for AzblobWriter { )); } - self.write_oneshot(size, AsyncBody::Stream(s)).await + self.write_oneshot(s.size(), AsyncBody::Stream(s)).await } } diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index cf1c68cf4..e6ae3f84a 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -86,7 +86,7 @@ impl AzdfsWriter { #[async_trait] impl oio::Write for AzdfsWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index af2a6ebd0..29f63f78f 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -23,7 +23,7 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; -use crate::raw::oio::Streamer; +use crate::raw::oio::{Stream, Streamer}; use crate::raw::*; use crate::*; @@ -52,10 +52,10 @@ impl CosWriter { #[async_trait] impl oio::OneShotWrite for CosWriter { - async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { + async fn write_once(&self, stream: Streamer) -> Result<()> { let mut req = self.core.cos_put_object_request( &self.path, - Some(size), + Some(stream.size()), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), @@ -110,15 +110,20 @@ impl oio::MultipartUploadWrite for CosWriter { &self, upload_id: &str, part_number: usize, - size: u64, - body: AsyncBody, + s: Streamer, ) -> Result<oio::MultipartUploadPart> { // COS requires part number must between [1..=10000] let part_number = part_number + 1; let resp = self .core - .cos_upload_part_request(&self.path, upload_id, part_number, size, body) + .cos_upload_part_request( + &self.path, + upload_id, + part_number, + s.size(), + AsyncBody::Stream(s), + ) .await?; let status = resp.status(); diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index cea9cb199..72309ff0d 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -60,7 +60,7 @@ impl DropboxWriter { #[async_trait] impl oio::Write for DropboxWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 567e63bd9..395261c63 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -50,7 +50,7 @@ impl<F> FsWriter<F> { #[async_trait] impl oio::Write for FsWriter<tokio::fs::File> { - async fn write(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> { + async fn write(&mut self, mut s: oio::Streamer) -> Result<()> { while let Some(bs) = s.next().await { let bs = bs?; self.f diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 8c56f8e5d..6ac3f38c8 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -53,7 +53,7 @@ impl FtpWriter { #[async_trait] impl oio::Write for FtpWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index e7e68f320..9191d70fe 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -23,6 +23,7 @@ use http::StatusCode; use super::core::GcsCore; use super::error::parse_error; +use crate::raw::oio::Stream; use crate::raw::*; use crate::*; @@ -164,8 +165,8 @@ impl GcsWriter { #[async_trait] impl oio::Write for GcsWriter { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.write_oneshot(size, AsyncBody::Stream(s)).await + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + self.write_oneshot(s.size(), AsyncBody::Stream(s)).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index f4bc19b12..12dd1cc43 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -103,7 +103,7 @@ impl GdriveWriter { #[async_trait] impl oio::Write for GdriveWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new(ErrorKind::Unsupported, "sink is not supported")) } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 2bdd44507..83bde0bb3 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -62,7 +62,7 @@ impl GhacWriter { #[async_trait] impl oio::Write for GhacWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 6c9c679dc..a3928ed13 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -58,7 +58,7 @@ impl HdfsWriter<hdrs::AsyncFile> { #[async_trait] impl oio::Write for HdfsWriter<hdrs::AsyncFile> { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index a876aac3a..2e2d4c18e 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -52,7 +52,7 @@ impl IpmfsWriter { #[async_trait] impl oio::Write for IpmfsWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index d3b1e119f..3b0e02e7c 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -23,8 +23,8 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; -use crate::raw::oio::MultipartUploadPart; use crate::raw::oio::Streamer; +use crate::raw::oio::{MultipartUploadPart, Stream}; use crate::raw::*; use crate::*; @@ -53,10 +53,10 @@ impl ObsWriter { #[async_trait] impl oio::OneShotWrite for ObsWriter { - async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { + async fn write_once(&self, stream: Streamer) -> Result<()> { let mut req = self.core.obs_put_object_request( &self.path, - Some(size), + Some(stream.size()), self.op.content_type(), self.op.cache_control(), AsyncBody::Stream(stream), @@ -105,15 +105,20 @@ impl oio::MultipartUploadWrite for ObsWriter { &self, upload_id: &str, part_number: usize, - size: u64, - body: AsyncBody, + stream: Streamer, ) -> Result<MultipartUploadPart> { // Obs service requires part number must between [1..=10000] let part_number = part_number + 1; let resp = self .core - .obs_upload_part_request(&self.path, upload_id, part_number, Some(size), body) + .obs_upload_part_request( + &self.path, + upload_id, + part_number, + Some(stream.size()), + AsyncBody::Stream(stream), + ) .await?; let status = resp.status(); diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index a320e3b87..219855dd3 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -56,7 +56,7 @@ impl OneDriveWriter { #[async_trait] impl oio::Write for OneDriveWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 27aa09011..2d965a7cf 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -23,7 +23,7 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; -use crate::raw::oio::Streamer; +use crate::raw::oio::{Stream, Streamer}; use crate::raw::*; use crate::*; @@ -52,10 +52,10 @@ impl OssWriter { #[async_trait] impl oio::OneShotWrite for OssWriter { - async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { + async fn write_once(&self, stream: Streamer) -> Result<()> { let mut req = self.core.oss_put_object_request( &self.path, - Some(size), + Some(stream.size()), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), @@ -112,15 +112,21 @@ impl oio::MultipartUploadWrite for OssWriter { &self, upload_id: &str, part_number: usize, - size: u64, - body: AsyncBody, + stream: Streamer, ) -> Result<oio::MultipartUploadPart> { // OSS requires part number must between [1..=10000] let part_number = part_number + 1; let resp = self .core - .oss_upload_part_request(&self.path, upload_id, part_number, false, size, body) + .oss_upload_part_request( + &self.path, + upload_id, + part_number, + false, + stream.size(), + AsyncBody::Stream(stream), + ) .await?; let status = resp.status(); diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index a27341d71..4f96fece7 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -23,7 +23,7 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; -use crate::raw::oio::Streamer; +use crate::raw::oio::{Stream, Streamer}; use crate::raw::*; use crate::*; @@ -49,10 +49,10 @@ impl S3Writer { #[async_trait] impl oio::OneShotWrite for S3Writer { - async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { + async fn write_once(&self, stream: Streamer) -> Result<()> { let mut req = self.core.s3_put_object_request( &self.path, - Some(size), + Some(stream.size()), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), @@ -107,15 +107,18 @@ impl oio::MultipartUploadWrite for S3Writer { &self, upload_id: &str, part_number: usize, - size: u64, - body: AsyncBody, + stream: Streamer, ) -> Result<oio::MultipartUploadPart> { // AWS S3 requires part number must between [1..=10000] let part_number = part_number + 1; - let mut req = - self.core - .s3_upload_part_request(&self.path, upload_id, part_number, size, body)?; + let mut req = self.core.s3_upload_part_request( + &self.path, + upload_id, + part_number, + stream.size(), + AsyncBody::Stream(stream), + )?; self.core.sign(&mut req).await?; diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index c36b02c3b..ad1447d10 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -42,7 +42,7 @@ impl SftpWriter { #[async_trait] impl oio::Write for SftpWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index 5c364ab8d..9eb736c73 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -75,7 +75,7 @@ impl SupabaseWriter { #[async_trait] impl oio::Write for SupabaseWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 6cb65dca6..71f3e2606 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -60,7 +60,7 @@ impl VercelArtifactsWriter { #[async_trait] impl oio::Write for VercelArtifactsWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 1cf7b740f..04f9bcb80 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -63,7 +63,7 @@ impl WasabiWriter { #[async_trait] impl oio::Write for WasabiWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index 2deb0bfec..5050b70cf 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -21,6 +21,7 @@ use http::StatusCode; use super::backend::WebdavBackend; use super::error::parse_error; +use crate::raw::oio::Stream; use crate::raw::*; use crate::*; @@ -67,8 +68,8 @@ impl WebdavWriter { #[async_trait] impl oio::Write for WebdavWriter { - async fn write(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.write_oneshot(size, AsyncBody::Stream(s)).await + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + self.write_oneshot(s.size(), AsyncBody::Stream(s)).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 03cbfdf87..d27a84480 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -62,7 +62,7 @@ impl WebhdfsWriter { #[async_trait] impl oio::Write for WebhdfsWriter { - async fn write(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + async fn write(&mut self, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 442bc2188..fb7735b9f 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -730,8 +730,7 @@ impl Operator { let (_, mut w) = inner.write(&path, args).await?; // FIXME: we should bench here to measure the perf. - w.write(bs.len() as u64, Box::new(oio::Cursor::from(bs))) - .await?; + w.write(Box::new(oio::Cursor::from(bs))).await?; w.close().await?; Ok(()) diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index f3011e69e..3ce085dd8 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -83,8 +83,7 @@ impl Writer { pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> { if let State::Idle(Some(w)) = &mut self.state { let bs = bs.into(); - w.write(bs.len() as u64, Box::new(oio::Cursor::from(bs))) - .await + w.write(Box::new(oio::Cursor::from(bs))).await } else { unreachable!( "writer state invalid while write, expect Idle, actual {}", @@ -132,7 +131,7 @@ impl Writer { { if let State::Idle(Some(w)) = &mut self.state { let s = Box::new(oio::into_stream(size, sink_from.map_ok(|v| v.into()))); - w.write(size, s).await + w.write(s).await } else { unreachable!( "writer state invalid while sink, expect Idle, actual {}", @@ -177,7 +176,7 @@ impl Writer { { if let State::Idle(Some(w)) = &mut self.state { let s = Box::new(oio::into_stream_from_reader(size, read_from)); - w.write(size, s).await + w.write(s).await } else { unreachable!( "writer state invalid while copy, expect Idle, actual {}", @@ -253,8 +252,7 @@ impl AsyncWrite for Writer { let size = bs.len(); let fut = async move { // FIXME: we should bench here to measure the perf. - w.write(size as u64, Box::new(oio::Cursor::from(bs))) - .await?; + w.write(Box::new(oio::Cursor::from(bs))).await?; Ok((size, w)) }; self.state = State::Write(Box::pin(fut)); @@ -322,8 +320,7 @@ impl tokio::io::AsyncWrite for Writer { let size = bs.len(); let fut = async move { // FIXME: we should bench here to measure the perf. - w.write(size as u64, Box::new(oio::Cursor::from(bs))) - .await?; + w.write(Box::new(oio::Cursor::from(bs))).await?; Ok((size, w)) }; self.state = State::Write(Box::pin(fut));
