This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit b0379a2466fa7597745f2d67336badf7fbdbd2dc Author: Xuanwo <[email protected]> AuthorDate: Tue Aug 29 17:51:39 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/utils.rs | 4 --- core/benches/oio/write.rs | 16 +++++++++-- core/src/layers/blocking.rs | 5 +++- core/src/layers/complete.rs | 23 ---------------- core/src/layers/concurrent_limit.rs | 8 ++---- core/src/layers/error_context.rs | 8 ------ core/src/layers/logging.rs | 34 ------------------------ core/src/layers/madsim.rs | 16 ----------- core/src/layers/metrics.rs | 12 --------- core/src/layers/minitrace.rs | 10 ------- core/src/layers/oteltrace.rs | 4 --- core/src/layers/prometheus.rs | 17 ------------ core/src/layers/retry.rs | 26 ------------------ core/src/layers/throttle.rs | 25 ----------------- core/src/layers/timeout.rs | 13 --------- core/src/layers/tracing.rs | 8 ------ core/src/raw/adapters/kv/backend.rs | 7 ----- core/src/raw/adapters/typed_kv/backend.rs | 7 ----- core/src/raw/oio/write/api.rs | 15 +---------- core/src/raw/oio/write/append_object_write.rs | 12 --------- core/src/raw/oio/write/at_least_buf_write.rs | 25 ----------------- core/src/raw/oio/write/compose_write.rs | 16 ----------- core/src/raw/oio/write/exact_buf_write.rs | 29 ++++++++++---------- core/src/raw/oio/write/multipart_upload_write.rs | 17 ------------ core/src/raw/oio/write/one_shot_write.rs | 8 ------ core/src/services/azblob/writer.rs | 18 ------------- core/src/services/azdfs/writer.rs | 6 ++--- core/src/services/dropbox/writer.rs | 6 ++--- core/src/services/fs/writer.rs | 15 ----------- core/src/services/ftp/writer.rs | 6 ++--- core/src/services/gcs/writer.rs | 6 ++--- core/src/services/gdrive/writer.rs | 6 ++--- core/src/services/ghac/writer.rs | 6 ++--- core/src/services/hdfs/writer.rs | 6 +++-- core/src/services/ipmfs/writer.rs | 6 ++--- core/src/services/onedrive/writer.rs | 6 ++--- core/src/services/sftp/writer.rs | 6 ++--- core/src/services/supabase/writer.rs | 6 ++--- core/src/services/vercel_artifacts/writer.rs | 6 ++--- core/src/services/wasabi/writer.rs | 6 ++--- core/src/services/webdav/writer.rs | 6 ++--- core/src/services/webhdfs/writer.rs | 6 ++--- core/src/types/operator/operator.rs | 4 ++- core/src/types/writer.rs | 10 ++++--- 44 files changed, 91 insertions(+), 411 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index 0e70bcfc7..e704f8d52 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -27,10 +27,6 @@ pub struct BlackHoleWriter; #[async_trait] impl oio::Write for BlackHoleWriter { - async fn write(&mut self, _: Bytes) -> opendal::Result<()> { - Ok(()) - } - async fn sink(&mut self, _: u64, _: Streamer) -> opendal::Result<()> { Ok(()) } diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs index 6e26ce7e0..befc6c587 100644 --- a/core/benches/oio/write.rs +++ b/core/benches/oio/write.rs @@ -17,6 +17,7 @@ use criterion::Criterion; use once_cell::sync::Lazy; +use opendal::raw::oio; use opendal::raw::oio::AtLeastBufWriter; use opendal::raw::oio::ExactBufWriter; use opendal::raw::oio::Write; @@ -45,7 +46,13 @@ pub fn bench_at_least_buf_write(c: &mut Criterion) { group.bench_with_input(size.to_string(), &content, |b, content| { b.to_async(&*TOKIO).iter(|| async { let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024); - w.write(content.clone()).await.unwrap(); + + w.sink( + content.len() as u64, + Box::new(oio::Cursor::from(content.clone())), + ) + .await + .unwrap(); w.close().await.unwrap(); }) }); @@ -71,7 +78,12 @@ pub fn bench_exact_buf_write(c: &mut Criterion) { group.bench_with_input(size.to_string(), &content, |b, content| { b.to_async(&*TOKIO).iter(|| async { let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024); - w.write(content.clone()).await.unwrap(); + w.sink( + content.len() as u64, + Box::new(oio::Cursor::from(content.clone())), + ) + .await + .unwrap(); w.close().await.unwrap(); }) }); diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 7b80b5956..2dab3ea45 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -197,7 +197,10 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> { impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> { fn write(&mut self, bs: Bytes) -> Result<()> { - self.handle.block_on(self.inner.write(bs)) + self.handle.block_on( + self.inner + .sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))), + ) } fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 4c58b6e30..13e009746 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -711,29 +711,6 @@ impl<W> oio::Write for CompleteWriter<W> where W: oio::Write, { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let n = bs.len(); - - if let Some(size) = self.size { - if self.written + n as u64 > size { - return Err(Error::new( - ErrorKind::ContentTruncated, - &format!( - "writer got too much data, expect: {size}, actual: {}", - self.written + n as u64 - ), - )); - } - } - - let w = self.inner.as_mut().ok_or_else(|| { - Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") - })?; - w.write(bs).await?; - self.written += n as u64; - Ok(()) - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { if let Some(total_size) = self.size { if self.written + size > total_size { diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 9cef0fb9b..781fb7610 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -285,18 +285,14 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.inner.write(bs).await + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.sink(size, s).await } async fn abort(&mut self) -> Result<()> { self.inner.abort().await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.inner.sink(size, s).await - } - async fn close(&mut self) -> Result<()> { self.inner.close().await } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index bfe9be4df..a0da6a0a2 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -403,14 +403,6 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> { #[async_trait::async_trait] impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.inner.write(bs).await.map_err(|err| { - err.with_operation(WriteOperation::Write) - .with_context("service", self.scheme) - .with_context("path", &self.path) - }) - } - async fn abort(&mut self) -> Result<()> { self.inner.abort().await.map_err(|err| { err.with_operation(WriteOperation::Abort) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 07323cf8c..5b2f39e63 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1252,40 +1252,6 @@ impl<W> LoggingWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for LoggingWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); - match self.inner.write(bs).await { - Ok(_) => { - self.written += size as u64; - trace!( - target: LOGGING_TARGET, - "service={} operation={} path={} written={} -> data write {}B", - self.ctx.scheme, - WriteOperation::Write, - self.path, - self.written, - size - ); - Ok(()) - } - Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} written={} -> data write failed: {}", - self.ctx.scheme, - WriteOperation::Write, - self.path, - self.written, - self.ctx.error_print(&err), - ) - } - Err(err) - } - } - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { match self.inner.sink(size, s).await { Ok(_) => { diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index fdf0ec5de..e4b490c87 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -302,22 +302,6 @@ pub struct MadsimWriter { #[async_trait] impl oio::Write for MadsimWriter { - async fn write(&mut self, bs: Bytes) -> crate::Result<()> { - #[cfg(madsim)] - { - let req = Request::Write(self.path.to_string(), bs); - let ep = Endpoint::bind(self.addr).await?; - let (tx, mut rx) = ep.connect1(self.addr).await?; - tx.send(Box::new(req)).await?; - rx.recv().await?; - Ok(()) - } - #[cfg(not(madsim))] - { - unreachable!("madsim is not enabled") - } - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 1eade833b..40ad53693 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -847,18 +847,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MetricWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); - self.inner - .write(bs) - .await - .map(|_| self.bytes += size as u64) - .map_err(|err| { - self.handle.increment_errors_total(self.op, err.kind()); - err - }) - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner .sink(size, s) diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 1213d692e..da216cd61 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -337,16 +337,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.inner - .write(bs) - .in_span(Span::enter_with_parent( - WriteOperation::Write.into_static(), - &self.span, - )) - .await - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner .sink(size, s) diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 2ae39b05c..39455cd54 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -313,10 +313,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.inner.write(bs).await - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner.sink(size, s).await } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 005d6aa97..7f6b5aecb 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -662,23 +662,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); - self.inner - .write(bs) - .await - .map(|_| { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .observe(size as f64) - }) - .map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); - err - }) - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner .sink(size, s) diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 7517c2c21..5127ca6e3 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -873,32 +873,6 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapp #[async_trait] impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let mut backoff = self.builder.build(); - - loop { - match self.inner.write(bs.clone()).await { - Ok(v) => return Ok(v), - Err(e) if !e.is_temporary() => return Err(e), - Err(e) => match backoff.next() { - None => return Err(e), - Some(dur) => { - self.notify.intercept( - &e, - dur, - &[ - ("operation", WriteOperation::Write.into_static()), - ("path", &self.path), - ], - ); - tokio::time::sleep(dur).await; - continue; - } - }, - } - } - } - /// > Ooooooooooops, are you crazy!? Why we need to do `Arc<Mutex<S>>` here? Adding a lock has /// a lot overhead! /// diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index d929226df..89fabe71d 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -217,31 +217,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); - - loop { - match self.limiter.check_n(buf_length) { - Ok(_) => return self.inner.write(bs).await, - Err(negative) => match negative { - // the query is valid but the Decider can not accommodate them. - NegativeMultiDecision::BatchNonConforming(_, not_until) => { - let wait_time = not_until.wait_time_from(DefaultClock::default().now()); - // TODO: Should lock the limiter and wait for the wait_time, or should let other small requests go first? - tokio::time::sleep(wait_time).await; - } - // the query was invalid as the rate limit parameters can "never" accommodate the number of cells queried for. - NegativeMultiDecision::InsufficientCapacity(_) => { - return Err(Error::new( - ErrorKind::RateLimited, - "InsufficientCapacity due to burst size being smaller than the request size", - )) - } - }, - } - } - } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { self.inner.sink(size, s).await } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index c0fb739aa..8a894d17b 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -322,19 +322,6 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let timeout = self.io_timeout(bs.len() as u64); - - tokio::time::timeout(timeout, self.inner.write(bs)) - .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(WriteOperation::Write) - .with_context("timeout", timeout.as_secs_f64().to_string()) - .set_temporary() - })? - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { let timeout = self.io_timeout(size); diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 9042e6e35..e3812fd48 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -320,14 +320,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for TracingWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for TracingWrapper<R> { - #[tracing::instrument( - parent = &self.span, - level = "trace", - skip_all)] - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.inner.write(bs).await - } - #[tracing::instrument( parent = &self.span, level = "trace", diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 41c4c6a7d..c1314b2de 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -389,13 +389,6 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { - // TODO: we need to support append in the future. - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.buf = Some(bs.into()); - - Ok(()) - } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 9f6186a38..76aee64f9 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -402,13 +402,6 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { - // TODO: we need to support append in the future. - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.buf.push(bs); - - Ok(()) - } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index f2bb025af..14bff1ec7 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -87,7 +87,7 @@ pub type Writer = Box<dyn Write>; /// the whole data. #[async_trait] pub trait Write: Unpin + Send + Sync { - /// Write given bytes into writer. + /// Sink given stream into writer. /// /// # Notes /// @@ -95,9 +95,6 @@ pub trait Write: Unpin + Send + Sync { /// content length. And users will call write multiple times. /// /// Please make sure `write` is safe to re-enter. - async fn write(&mut self, bs: Bytes) -> Result<()>; - - /// Sink given stream into writer. async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>; /// Abort the pending writer. @@ -109,12 +106,6 @@ pub trait Write: Unpin + Send + Sync { #[async_trait] impl Write for () { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - unimplemented!("write is required to be implemented for oio::Write") - } - async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, @@ -142,10 +133,6 @@ impl Write for () { /// To make Writer work as expected, we must add this impl. #[async_trait] impl<T: Write + ?Sized> Write for Box<T> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - (**self).write(bs).await - } - async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<()> { (**self).sink(n, s).await } diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index 07fa546cc..7e02fc0ea 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use crate::raw::oio::Streamer; use crate::raw::*; @@ -79,17 +78,6 @@ impl<W> oio::Write for AppendObjectWriter<W> where W: AppendObjectWrite, { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let offset = self.offset().await?; - - let size = bs.len() as u64; - - self.inner - .append(offset, size, AsyncBody::Bytes(bs)) - .await - .map(|_| self.offset = Some(offset + size)) - } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { let offset = self.offset().await?; diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 91adddd30..94be240f4 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use crate::raw::oio::StreamExt; use crate::raw::oio::Streamer; @@ -64,30 +63,6 @@ impl<W: oio::Write> AtLeastBufWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - // If total size is known and equals to given bytes, we can write it directly. - if let Some(total_size) = self.total_size { - if total_size == bs.len() as u64 { - return self.inner.write(bs).await; - } - } - - // Push the bytes into the buffer if the buffer is not full. - if self.buffer.len() + bs.len() < self.buffer_size { - self.buffer.push(bs); - return Ok(()); - } - - let mut buf = self.buffer.clone(); - buf.push(bs); - - self.inner - .sink(buf.len() as u64, Box::new(buf)) - .await - // Clear buffer if the write is successful. - .map(|_| self.buffer.clear()) - } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { // If total size is known and equals to given stream, we can write it directly. if let Some(total_size) = self.total_size { diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index 043df2978..ea5f3bf25 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -39,7 +39,6 @@ //! type_alias_impl_trait has been stabilized. use async_trait::async_trait; -use bytes::Bytes; use crate::raw::oio::Streamer; use crate::raw::*; @@ -57,13 +56,6 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> { #[async_trait] impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - match self { - Self::One(one) => one.write(bs).await, - Self::Two(two) => two.write(bs).await, - } - } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { match self { Self::One(one) => one.sink(size, s).await, @@ -102,14 +94,6 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> { impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write for ThreeWaysWriter<ONE, TWO, THREE> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - match self { - Self::One(one) => one.write(bs).await, - Self::Two(two) => two.write(bs).await, - Self::Three(three) => three.write(bs).await, - } - } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { match self { Self::One(one) => one.sink(size, s).await, diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 8561c1a4a..59533db6a 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -84,11 +84,6 @@ impl<W: oio::Write> ExactBufWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for ExactBufWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))) - .await - } - /// # TODO /// /// We know every stream size, we can collect them into a buffer without chain them every time. @@ -202,17 +197,12 @@ mod tests { #[async_trait] impl Write for MockWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { - debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len()); - - self.buf.extend_from_slice(&bs); - Ok(()) - } - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { let bs = s.collect().await?; assert_eq!(bs.len() as u64, size); - self.write(bs).await + self.buf.extend_from_slice(&bs); + + Ok(()) } async fn abort(&mut self) -> Result<()> { @@ -238,7 +228,11 @@ mod tests { let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10); - w.write(Bytes::from(expected.clone())).await?; + w.sink( + expected.len() as u64, + Box::new(oio::Cursor::from(Bytes::from(expected.clone()))), + ) + .await?; w.close().await?; assert_eq!(w.inner.buf.len(), expected.len()); @@ -271,7 +265,12 @@ mod tests { rng.fill_bytes(&mut content); expected.extend_from_slice(&content); - writer.write(Bytes::from(content)).await?; + writer + .sink( + expected.len() as u64, + Box::new(oio::Cursor::from(Bytes::from(expected.clone()))), + ) + .await?; } writer.close().await?; diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index c013b24c3..0a0d49680 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use crate::raw::*; use crate::*; @@ -120,22 +119,6 @@ impl<W> oio::Write for MultipartUploadWriter<W> where W: MultipartUploadWrite, { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let upload_id = self.upload_id().await?; - - let size = bs.len(); - - self.inner - .write_part( - &upload_id, - self.parts.len(), - size as u64, - AsyncBody::Bytes(bs), - ) - .await - .map(|v| self.parts.push(v)) - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { let upload_id = self.upload_id().await?; diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 35b1883bf..a3fcd1b2c 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use crate::raw::*; use crate::*; @@ -49,13 +48,6 @@ impl<W: OneShotWrite> OneShotWriter<W> { #[async_trait] impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let cursor = oio::Cursor::from(bs); - self.inner - .write_once(cursor.len() as u64, Box::new(cursor)) - .await - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner.write_once(size, s).await } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 31a56f27a..2db716b11 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::AzblobCore; @@ -161,23 +160,6 @@ impl AzblobWriter { #[async_trait] impl oio::Write for AzblobWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { - if self.op.append() { - self.append_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await - } else { - if self.op.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - - self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await - } - } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { if self.op.append() { self.append_oneshot(size, AsyncBody::Stream(s)).await diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 3c8db1ac1..2eda93e40 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -37,10 +37,7 @@ impl AzdfsWriter { pub fn new(core: Arc<AzdfsCore>, op: OpWrite, path: String) -> Self { AzdfsWriter { core, op, path } } -} -#[async_trait] -impl oio::Write for AzdfsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let mut req = self.core.azdfs_create_request( &self.path, @@ -85,7 +82,10 @@ impl oio::Write for AzdfsWriter { .with_operation("Backend::azdfs_update_request")), } } +} +#[async_trait] +impl oio::Write for AzdfsWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 2f5e97558..f4a4e225b 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -36,10 +36,7 @@ impl DropboxWriter { pub fn new(core: Arc<DropboxCore>, op: OpWrite, path: String) -> Self { DropboxWriter { core, op, path } } -} -#[async_trait] -impl oio::Write for DropboxWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let resp = self .core @@ -59,7 +56,10 @@ impl oio::Write for DropboxWriter { _ => Err(parse_error(resp).await?), } } +} +#[async_trait] +impl oio::Write for DropboxWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 4d31444af..6b518b436 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -50,21 +50,6 @@ impl<F> FsWriter<F> { #[async_trait] impl oio::Write for FsWriter<tokio::fs::File> { - /// # Notes - /// - /// File could be partial written, so we will seek to start to make sure - /// we write the same content. - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.f - .seek(SeekFrom::Start(self.pos)) - .await - .map_err(parse_io_error)?; - self.f.write_all(&bs).await.map_err(parse_io_error)?; - self.pos += bs.len() as u64; - - Ok(()) - } - async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> { while let Some(bs) = s.next().await { let bs = bs?; diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index cd4ba0f6a..7816277c1 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -37,10 +37,7 @@ impl FtpWriter { pub fn new(backend: FtpBackend, path: String) -> Self { FtpWriter { backend, path } } -} -#[async_trait] -impl oio::Write for FtpWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?; let mut data_stream = ftp_stream.append_with_stream(&self.path).await?; @@ -52,7 +49,10 @@ impl oio::Write for FtpWriter { Ok(()) } +} +#[async_trait] +impl oio::Write for FtpWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 236016294..70d79798a 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -114,10 +114,7 @@ impl GcsWriter { _ => Err(parse_error(resp).await?), } } -} -#[async_trait] -impl oio::Write for GcsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let location = match &self.location { Some(location) => location, @@ -163,7 +160,10 @@ impl oio::Write for GcsWriter { } } } +} +#[async_trait] +impl oio::Write for GcsWriter { async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.write_oneshot(size, AsyncBody::Stream(s)).await } diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index 48d88f3ec..718307dc8 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -91,10 +91,7 @@ impl GdriveWriter { _ => Err(parse_error(resp).await?), } } -} -#[async_trait] -impl oio::Write for GdriveWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { if self.file_id.is_none() { self.write_create(bs.len() as u64, bs).await @@ -102,7 +99,10 @@ impl oio::Write for GdriveWriter { self.write_overwrite(bs.len() as u64, bs).await } } +} +#[async_trait] +impl oio::Write for GdriveWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new(ErrorKind::Unsupported, "sink is not supported")) } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index b2f959947..9a088a0be 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -38,10 +38,7 @@ impl GhacWriter { size: 0, } } -} -#[async_trait] -impl oio::Write for GhacWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let size = bs.len() as u64; let req = self @@ -61,7 +58,10 @@ impl oio::Write for GhacWriter { .map(|err| err.with_operation("Backend::ghac_upload"))?) } } +} +#[async_trait] +impl oio::Write for GhacWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 23c5f1d68..8800ab241 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -39,8 +39,7 @@ impl<F> HdfsWriter<F> { } } -#[async_trait] -impl oio::Write for HdfsWriter<hdrs::AsyncFile> { +impl HdfsWriter<hdrs::AsyncFile> { async fn write(&mut self, bs: Bytes) -> Result<()> { while self.pos < bs.len() { let n = self @@ -55,7 +54,10 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { Ok(()) } +} +#[async_trait] +impl oio::Write for HdfsWriter<hdrs::AsyncFile> { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 528478142..011624c23 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -34,10 +34,7 @@ impl IpmfsWriter { pub fn new(backend: IpmfsBackend, path: String) -> Self { IpmfsWriter { backend, path } } -} -#[async_trait] -impl oio::Write for IpmfsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let resp = self.backend.ipmfs_write(&self.path, bs).await?; @@ -51,7 +48,10 @@ impl oio::Write for IpmfsWriter { _ => Err(parse_error(resp).await?), } } +} +#[async_trait] +impl oio::Write for IpmfsWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 620130847..8b38e30ee 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -42,10 +42,7 @@ impl OneDriveWriter { pub fn new(backend: OnedriveBackend, op: OpWrite, path: String) -> Self { OneDriveWriter { backend, op, path } } -} -#[async_trait] -impl oio::Write for OneDriveWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let size = bs.len(); @@ -55,7 +52,10 @@ impl oio::Write for OneDriveWriter { self.write_chunked(bs).await } } +} +#[async_trait] +impl oio::Write for OneDriveWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 76da70da3..cad5c2cca 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -32,16 +32,16 @@ impl SftpWriter { pub fn new(file: File) -> Self { SftpWriter { file } } -} -#[async_trait] -impl oio::Write for SftpWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { self.file.write_all(&bs).await?; Ok(()) } +} +#[async_trait] +impl oio::Write for SftpWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index f4c271313..9eb661c10 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -63,10 +63,7 @@ impl SupabaseWriter { _ => Err(parse_error(resp).await?), } } -} -#[async_trait] -impl oio::Write for SupabaseWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { if bs.is_empty() { return Ok(()); @@ -74,7 +71,10 @@ impl oio::Write for SupabaseWriter { self.upload(bs).await } +} +#[async_trait] +impl oio::Write for SupabaseWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 6f32d67ba..0d55ca277 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -35,10 +35,7 @@ impl VercelArtifactsWriter { pub fn new(backend: VercelArtifactsBackend, op: OpWrite, path: String) -> Self { VercelArtifactsWriter { backend, op, path } } -} -#[async_trait] -impl oio::Write for VercelArtifactsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let resp = self .backend @@ -59,7 +56,10 @@ impl oio::Write for VercelArtifactsWriter { _ => Err(parse_error(resp).await?), } } +} +#[async_trait] +impl oio::Write for VercelArtifactsWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 689c334dc..4f3f1697a 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -37,10 +37,7 @@ impl WasabiWriter { pub fn new(core: Arc<WasabiCore>, op: OpWrite, path: String) -> Self { WasabiWriter { core, op, path } } -} -#[async_trait] -impl oio::Write for WasabiWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let resp = self .core @@ -62,7 +59,10 @@ impl oio::Write for WasabiWriter { _ => Err(parse_error(resp).await?), } } +} +#[async_trait] +impl oio::Write for WasabiWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index a3c17bafa..aeaf0d781 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -58,15 +58,15 @@ impl WebdavWriter { _ => Err(parse_error(resp).await?), } } -} -#[async_trait] -impl oio::Write for WebdavWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) .await } +} +#[async_trait] +impl oio::Write for WebdavWriter { async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.write_oneshot(size, AsyncBody::Stream(s)).await } diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 97eef2e3d..85e6834f4 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -35,10 +35,7 @@ impl WebhdfsWriter { pub fn new(backend: WebhdfsBackend, op: OpWrite, path: String) -> Self { WebhdfsWriter { backend, op, path } } -} -#[async_trait] -impl oio::Write for WebhdfsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let req = self .backend @@ -61,7 +58,10 @@ impl oio::Write for WebhdfsWriter { _ => Err(parse_error(resp).await?), } } +} +#[async_trait] +impl oio::Write for WebhdfsWriter { async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index ab97a2df2..ab6bca5ac 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -729,7 +729,9 @@ impl Operator { } let (_, mut w) = inner.write(&path, args).await?; - w.write(bs).await?; + // FIXME: we should bench here to measure the perf. + w.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))) + .await?; w.close().await?; Ok(()) diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 37a9fe72c..8517a2f04 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -82,7 +82,9 @@ impl Writer { /// Write into inner writer. pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> { if let State::Idle(Some(w)) = &mut self.state { - w.write(bs.into()).await + let bs = bs.into(); + w.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))) + .await } else { unreachable!( "writer state invalid while write, expect Idle, actual {}", @@ -250,7 +252,8 @@ impl AsyncWrite for Writer { let bs = Bytes::from(buf.to_vec()); let size = bs.len(); let fut = async move { - w.write(bs).await?; + // FIXME: we should bench here to measure the perf. + w.sink(size as u64, Box::new(oio::Cursor::from(bs))).await?; Ok((size, w)) }; self.state = State::Write(Box::pin(fut)); @@ -317,7 +320,8 @@ impl tokio::io::AsyncWrite for Writer { let bs = Bytes::from(buf.to_vec()); let size = bs.len(); let fut = async move { - w.write(bs).await?; + // FIXME: we should bench here to measure the perf. + w.sink(size as u64, Box::new(oio::Cursor::from(bs))).await?; Ok((size, w)) }; self.state = State::Write(Box::pin(fut));
