This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 527fc8d34a8cf37bb24019df9710ea72cbafe394 Author: Xuanwo <[email protected]> AuthorDate: Thu Sep 7 16:12:28 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/blocking.rs | 2 +- core/src/layers/complete.rs | 33 +++++++-------- core/src/layers/concurrent_limit.rs | 4 +- core/src/layers/error_context.rs | 4 +- core/src/layers/logging.rs | 11 +++-- core/src/layers/madsim.rs | 2 +- core/src/layers/metrics.rs | 8 ++-- core/src/layers/minitrace.rs | 4 +- core/src/layers/oteltrace.rs | 4 +- core/src/layers/prometheus.rs | 4 +- core/src/layers/retry.rs | 4 +- core/src/layers/throttle.rs | 8 ++-- core/src/layers/timeout.rs | 4 +- core/src/layers/tracing.rs | 4 +- core/src/raw/adapters/kv/backend.rs | 24 +++++++---- core/src/raw/adapters/typed_kv/backend.rs | 40 ++++++++++-------- core/src/raw/oio/mod.rs | 3 -- core/src/raw/oio/write/api.rs | 13 +++--- core/src/raw/oio/write/append_object_write.rs | 12 ++++-- core/src/raw/oio/write/compose_write.rs | 4 +- core/src/raw/oio/write/exact_buf_write.rs | 53 ++++++++++++------------ core/src/raw/oio/write/multipart_upload_write.rs | 8 ++-- core/src/raw/oio/write/one_shot_write.rs | 11 ++--- core/src/services/azblob/writer.rs | 10 +++-- core/src/services/azdfs/writer.rs | 14 ++++--- core/src/services/cos/writer.rs | 12 +++--- core/src/services/dropbox/writer.rs | 8 ++-- core/src/services/fs/writer.rs | 31 ++------------ core/src/services/ftp/writer.rs | 7 ++-- core/src/services/gcs/writer.rs | 12 +++--- core/src/services/gdrive/writer.rs | 10 +++-- core/src/services/ghac/writer.rs | 13 ++++-- core/src/services/hdfs/writer.rs | 30 ++------------ core/src/services/ipmfs/writer.rs | 9 ++-- core/src/services/obs/writer.rs | 12 +++--- core/src/services/onedrive/writer.rs | 10 ++--- core/src/services/oss/writer.rs | 12 +++--- core/src/services/s3/writer.rs | 13 +++--- core/src/services/sftp/writer.rs | 9 ++-- core/src/services/supabase/writer.rs | 12 ++---- core/src/services/vercel_artifacts/writer.rs | 8 ++-- core/src/services/wasabi/writer.rs | 8 ++-- core/src/services/webdav/writer.rs | 7 ++-- core/src/services/webhdfs/writer.rs | 8 ++-- core/src/{raw/oio => types}/buf.rs | 25 ++++++----- core/src/types/mod.rs | 3 ++ core/src/types/operator/blocking_operator.rs | 7 +++- core/src/types/operator/operator.rs | 10 +++-- core/src/types/reader.rs | 4 +- core/src/types/writer.rs | 41 ++++++++---------- 50 files changed, 294 insertions(+), 315 deletions(-) diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 6e530023c..504111199 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -196,7 +196,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> { } impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> { - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.handle.block_on(self.inner.write(bs)) } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 83bf12d6a..f1e76d5a5 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -711,11 +711,15 @@ impl<W> oio::Write for CompleteWriter<W> where W: oio::Write, { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let n = bs.len(); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let w = self.inner.as_mut().ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") + })?; + let n = w.write(bs).await?; + self.written += n as u64; if let Some(size) = self.size { - if self.written + n as u64 > size { + if self.written > size { return Err(Error::new( ErrorKind::ContentTruncated, &format!( @@ -726,11 +730,6 @@ where } } - let w = self.inner.as_mut().ok_or_else(|| { - Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") - })?; - let n = w.write(bs).await?; - self.written += n; Ok(n) } @@ -773,11 +772,15 @@ impl<W> oio::BlockingWrite for CompleteWriter<W> where W: oio::BlockingWrite, { - fn write(&mut self, bs: Bytes) -> Result<u64> { - let n = bs.len(); + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let w = self.inner.as_mut().ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") + })?; + let n = w.write(bs)?; + self.written += n as u64; if let Some(size) = self.size { - if self.written + n as u64 > size { + if self.written > size { return Err(Error::new( ErrorKind::ContentTruncated, &format!( @@ -788,13 +791,7 @@ where } } - let w = self.inner.as_mut().ok_or_else(|| { - Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") - })?; - - w.write(bs)?; - self.written += n as u64; - Ok(n as u64) + Ok(n) } fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 99fa4e413..ec965a95a 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -285,7 +285,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner.write(bs).await } @@ -299,7 +299,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner.write(bs) } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index e325cd0f7..1e01f68d3 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> { #[async_trait::async_trait] impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner.write(bs).await.map_err(|err| { err.with_operation(WriteOperation::Write) .with_context("service", self.scheme) @@ -429,7 +429,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { } impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> { - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner.write(bs).map_err(|err| { err.with_operation(WriteOperation::BlockingWrite) .with_context("service", self.scheme) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 1f6d7456a..c69372123 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1252,10 +1252,10 @@ impl<W> LoggingWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for LoggingWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { match self.inner.write(bs).await { Ok(n) => { - self.written += n; + self.written += n as u64; trace!( target: LOGGING_TARGET, "service={} operation={} path={} written={} -> data write {}B", @@ -1349,11 +1349,10 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> { - fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { match self.inner.write(bs) { Ok(n) => { - self.written += n; + self.written += n as u64; trace!( target: LOGGING_TARGET, "service={} operation={} path={} written={} -> data write {}B", @@ -1361,7 +1360,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> { WriteOperation::BlockingWrite, self.path, self.written, - size + n ); Ok(n) } diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 432e4bcf4..e6047bb42 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -302,7 +302,7 @@ pub struct MadsimWriter { #[async_trait] impl oio::Write for MadsimWriter { - async fn write(&mut self, bs: Bytes) -> crate::Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> crate::Result<usize> { #[cfg(madsim)] { let req = Request::Write(self.path.to_string(), bs); diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 36e4957aa..dcba90bc2 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -847,12 +847,12 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MetricWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner .write(bs) .await .map(|n| { - self.bytes += n; + self.bytes += n as u64; n }) .map_err(|err| { @@ -877,11 +877,11 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner .write(bs) .map(|n| { - self.bytes += n; + self.bytes += n as u64; n }) .map_err(|err| { diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 149410ac8..9d1f397be 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner .write(bs) .in_span(Span::enter_with_parent( @@ -369,7 +369,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static()); self.inner.write(bs) diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 0722a11a7..a0e5a1fe1 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -313,7 +313,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner.write(bs).await } @@ -327,7 +327,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner.write(bs) } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 0d8f99c22..2c7d7d7cf 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -662,7 +662,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner .write(bs) .await @@ -695,7 +695,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner .write(bs) .map(|n| { diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 3e207df9a..04495df8d 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -872,7 +872,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapp #[async_trait] impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { let mut backoff = self.builder.build(); loop { @@ -952,7 +952,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { } impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWrapper<R, I> { - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { { || self.inner.write(bs.clone()) } .retry(&self.builder) .when(|e| e.is_temporary()) diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 021d1cad9..96bd9d00b 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -216,8 +216,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap(); loop { match self.limiter.check_n(buf_length) { @@ -251,8 +251,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> { - fn write(&mut self, bs: Bytes) -> Result<u64> { - let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap(); loop { match self.limiter.check_n(buf_length) { diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 68ac5a41d..a6179010a 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -322,8 +322,8 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let timeout = self.io_timeout(bs.len() as u64); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let timeout = self.io_timeout(bs.remaining() as u64); tokio::time::timeout(timeout, self.inner.write(bs)) .await diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index c5d006980..a512f74ba 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -324,7 +324,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner.write(bs).await } @@ -350,7 +350,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { self.inner.write(bs) } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index c0823d59e..9b8740f10 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -390,11 +390,15 @@ impl<S> KvWriter<S> { #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { // TODO: we need to support append in the future. - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); - self.buf = Some(bs.into()); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.chunk().len(); - Ok(size as u64) + let mut buf = self.buf.unwrap_or_else(|| Vec::with_capacity(size)); + buf.extend_from_slice(bs.chunk()); + + self.buf = Some(buf); + + Ok(size) } async fn abort(&mut self) -> Result<()> { @@ -414,11 +418,15 @@ impl<S: Adapter> oio::Write for KvWriter<S> { } impl<S: Adapter> oio::BlockingWrite for KvWriter<S> { - fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); - self.buf = Some(bs.into()); + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.chunk().len(); + + let mut buf = self.buf.unwrap_or_else(|| Vec::with_capacity(size)); + buf.extend_from_slice(bs.chunk()); + + self.buf = Some(buf); - Ok(size as u64) + Ok(size) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 948c2b5ad..336b04a50 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -363,7 +363,7 @@ pub struct KvWriter<S> { path: String, op: OpWrite, - buf: VectorCursor, + buf: Option<Vec<u8>>, } impl<S> KvWriter<S> { @@ -372,11 +372,13 @@ impl<S> KvWriter<S> { kv, path, op, - buf: VectorCursor::new(), + buf: None, } } fn build(&self) -> Value { + let value = self.buf.map(Bytes::from).unwrap_or_default(); + let mut metadata = Metadata::new(EntryMode::FILE); if let Some(v) = self.op.cache_control() { metadata.set_cache_control(v); @@ -390,29 +392,29 @@ impl<S> KvWriter<S> { if let Some(v) = self.op.content_length() { metadata.set_content_length(v); } else { - metadata.set_content_length(self.buf.len() as u64); + metadata.set_content_length(value.len() as u64); } - Value { - metadata, - value: self.buf.peak_all(), - } + Value { metadata, value } } } #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { // TODO: we need to support append in the future. - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); - self.buf.push(bs); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.chunk().len(); + + let mut buf = self.buf.unwrap_or_else(|| Vec::with_capacity(size)); + buf.extend_from_slice(bs.chunk()); + + self.buf = Some(buf); - Ok(size as u64) + Ok(size) } async fn abort(&mut self) -> Result<()> { - self.buf.clear(); - + self.buf = None; Ok(()) } @@ -423,11 +425,15 @@ impl<S: Adapter> oio::Write for KvWriter<S> { } impl<S: Adapter> oio::BlockingWrite for KvWriter<S> { - fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); - self.buf.push(bs); + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.chunk().len(); + + let mut buf = self.buf.unwrap_or_else(|| Vec::with_capacity(size)); + buf.extend_from_slice(bs.chunk()); + + self.buf = Some(buf); - Ok(size as u64) + Ok(size) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index b08a0d222..1b24bec9c 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -41,6 +41,3 @@ pub use cursor::VectorCursor; mod entry; pub use entry::Entry; - -mod buf; -pub use buf::Buf; diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index eaf3ba0de..606f7e9bc 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -21,6 +21,7 @@ use std::fmt::Formatter; use async_trait::async_trait; use bytes::Bytes; +use crate::raw::oio; use crate::*; /// WriteOperation is the name for APIs of Writer. @@ -81,7 +82,7 @@ pub trait Write: Unpin + Send + Sync { /// /// It's possible that `n < bs.len()`, caller should pass the remaining bytes /// repeatedly until all bytes has been written. - async fn write(&mut self, bs: Bytes) -> Result<u64>; + async fn write(&mut self, bs: &dyn Buf) -> Result<usize>; /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -92,7 +93,7 @@ pub trait Write: Unpin + Send + Sync { #[async_trait] impl Write for () { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { let _ = bs; unimplemented!("write is required to be implemented for oio::Write") @@ -118,7 +119,7 @@ impl Write for () { /// To make Writer work as expected, we must add this impl. #[async_trait] impl<T: Write + ?Sized> Write for Box<T> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { (**self).write(bs).await } @@ -137,14 +138,14 @@ pub type BlockingWriter = Box<dyn BlockingWrite>; /// BlockingWrite is the trait that OpenDAL returns to callers. pub trait BlockingWrite: Send + Sync + 'static { /// Write whole content at once. - fn write(&mut self, bs: Bytes) -> Result<u64>; + fn write(&mut self, bs: &dyn Buf) -> Result<usize>; /// Close the writer and make sure all data has been flushed. fn close(&mut self) -> Result<()>; } impl BlockingWrite for () { - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { let _ = bs; unimplemented!("write is required to be implemented for oio::BlockingWrite") @@ -162,7 +163,7 @@ impl BlockingWrite for () { /// /// To make BlockingWriter work as expected, we must add this impl. impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> { - fn write(&mut self, bs: Bytes) -> Result<u64> { + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { (**self).write(bs) } diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index d6bb819df..7f9e8c043 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -78,15 +78,19 @@ impl<W> oio::Write for AppendObjectWriter<W> where W: AppendObjectWrite, { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { let offset = self.offset().await?; - let size = bs.len() as u64; + let size = bs.remaining(); self.inner - .append(offset, size, AsyncBody::Bytes(bs)) + .append( + offset, + size as u64, + AsyncBody::Bytes(bs.copy_to_bytes(size)), + ) .await - .map(|_| self.offset = Some(offset + size))?; + .map(|_| self.offset = Some(offset + size as u64))?; Ok(size) } diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index dac833fcf..44c2289bf 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -56,7 +56,7 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> { #[async_trait] impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { match self { Self::One(one) => one.write(bs).await, Self::Two(two) => two.write(bs).await, @@ -94,7 +94,7 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> { impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write for ThreeWaysWriter<ONE, TWO, THREE> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { match self { Self::One(one) => one.write(bs).await, Self::Two(two) => two.write(bs).await, diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 7ba788e3d..4b3fe6eee 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -18,7 +18,7 @@ use std::cmp::min; use async_trait::async_trait; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Bytes, BytesMut}; use crate::raw::*; use crate::*; @@ -61,21 +61,20 @@ enum Buffer { #[async_trait] impl<W: oio::Write> oio::Write for ExactBufWriter<W> { - async fn write(&mut self, mut bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { loop { match &mut self.buffer { Buffer::Empty => { - if bs.len() >= self.buffer_size { - bs.truncate(self.buffer_size); - self.buffer = Buffer::Consuming(bs); - return Ok(self.buffer_size as u64); + if bs.remaining() >= self.buffer_size { + self.buffer = Buffer::Consuming(bs.copy_to_bytes(self.buffer_size)); + return Ok(self.buffer_size); } - let size = bs.len() as u64; - let mut fill = BytesMut::with_capacity(bs.len()); - fill.extend_from_slice(&bs); + let chunk = bs.chunk(); + let mut fill = BytesMut::with_capacity(chunk.len()); + fill.extend_from_slice(chunk); self.buffer = Buffer::Filling(fill); - return Ok(size); + return Ok(chunk.len()); } Buffer::Filling(fill) => { if fill.len() >= self.buffer_size { @@ -83,18 +82,17 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { continue; } - let size = min(self.buffer_size - fill.len(), bs.len()); - fill.extend_from_slice(&bs[..size]); - bs.advance(size); - return Ok(size as u64); + let size = min(self.buffer_size - fill.len(), bs.chunk().len()); + fill.extend_from_slice(&bs.chunk()[..size]); + return Ok(size); } Buffer::Consuming(consume) => { // Make sure filled buffer has been flushed. // // TODO: maybe we can re-fill it after a successful write. while !consume.is_empty() { - let n = self.inner.write(consume.clone()).await?; - consume.advance(n as usize); + let n = self.inner.write(consume).await?; + consume.advance(n); } self.buffer = Buffer::Empty; } @@ -120,8 +118,8 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { // // TODO: maybe we can re-fill it after a successful write. while !consume.is_empty() { - let n = self.inner.write(consume.clone()).await?; - consume.advance(n as usize); + let n = self.inner.write(&consume).await?; + consume.advance(n); } self.buffer = Buffer::Empty; break; @@ -152,11 +150,14 @@ mod tests { #[async_trait] impl Write for MockWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len()); - - self.buf.extend_from_slice(&bs); - Ok(bs.len() as u64) + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + debug!( + "test_fuzz_exact_buf_writer: flush size: {}", + bs.chunk().len() + ); + + self.buf.extend_from_slice(bs.chunk()); + Ok(bs.chunk().len()) } async fn abort(&mut self) -> Result<()> { @@ -184,8 +185,8 @@ mod tests { let mut bs = Bytes::from(expected.clone()); while !bs.is_empty() { - let n = w.write(bs.clone()).await?; - bs.advance(n as usize); + let n = w.write(&bs).await?; + bs.advance(n); } w.close().await?; @@ -223,7 +224,7 @@ mod tests { let mut bs = Bytes::from(content.clone()); while !bs.is_empty() { - let n = writer.write(bs.clone()).await?; + let n = writer.write(&bs).await?; bs.advance(n as usize); } } diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 13d1b4aa2..e5a9aedba 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -120,22 +120,22 @@ impl<W> oio::Write for MultipartUploadWriter<W> where W: MultipartUploadWrite, { - async fn write(&mut self, bs: Bytes) -> Result<u64> { + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { let upload_id = self.upload_id().await?; - let size = bs.len(); + let size = bs.remaining(); self.inner .write_part( &upload_id, self.parts.len(), size as u64, - AsyncBody::Bytes(bs), + AsyncBody::Bytes(bs.copy_to_bytes(size)), ) .await .map(|v| self.parts.push(v))?; - Ok(size as u64) + Ok(size) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 3681242ce..5845784d1 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -32,7 +32,7 @@ pub trait OneShotWrite: Send + Sync + Unpin { /// write_once write all data at once. /// /// Implementations should make sure that the data is written correctly at once. - async fn write_once(&self, size: u64, stream: oio::Streamer) -> Result<()>; + async fn write_once(&self, body: &dyn Buf) -> Result<()>; } /// OneShotWrite is used to implement [`Write`] based on one shot. @@ -49,12 +49,9 @@ impl<W: OneShotWrite> OneShotWriter<W> { #[async_trait] impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let cursor = oio::Cursor::from(bs); - - let size = cursor.len() as u64; - self.inner.write_once(size, Box::new(cursor)).await?; - + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); + self.inner.write_once(bs).await?; Ok(size) } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index b088aa57a..423449014 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -161,11 +161,12 @@ impl AzblobWriter { #[async_trait] impl oio::Write for AzblobWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len() as u64; + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); if self.op.append() { - self.append_oneshot(size, AsyncBody::Bytes(bs)).await?; + self.append_oneshot(size as u64, AsyncBody::Bytes(bs.copy_to_bytes(size))) + .await?; } else { if self.op.content_length().is_none() { return Err(Error::new( @@ -174,7 +175,8 @@ impl oio::Write for AzblobWriter { )); } - self.write_oneshot(size, AsyncBody::Bytes(bs)).await?; + self.write_oneshot(size as u64, AsyncBody::Bytes(bs.copy_to_bytes(size))) + .await?; } Ok(size) diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index e56f9eca9..7d1e22c65 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -41,9 +41,7 @@ impl AzdfsWriter { #[async_trait] impl oio::Write for AzdfsWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len() as u64; - + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { let mut req = self.core.azdfs_create_request( &self.path, "file", @@ -68,9 +66,13 @@ impl oio::Write for AzdfsWriter { } } - let mut req = - self.core - .azdfs_update_request(&self.path, Some(bs.len()), AsyncBody::Bytes(bs))?; + let size = bs.remaining(); + + let mut req = self.core.azdfs_update_request( + &self.path, + Some(size), + AsyncBody::Bytes(bs.copy_to_bytes(size)), + )?; self.core.sign(&mut req).await?; diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index af2a6ebd0..2c187fc31 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -18,12 +18,10 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Buf; use http::StatusCode; use super::core::*; use super::error::parse_error; -use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -52,14 +50,15 @@ impl CosWriter { #[async_trait] impl oio::OneShotWrite for CosWriter { - async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { + async fn write_once(&self, buf: &dyn Buf) -> Result<()> { + let size = buf.remaining(); let mut req = self.core.cos_put_object_request( &self.path, - Some(size), + Some(size as u64), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - AsyncBody::Stream(stream), + AsyncBody::Bytes(buf.copy_to_bytes(size)), )?; self.core.sign(&mut req).await?; @@ -98,7 +97,8 @@ impl oio::MultipartUploadWrite for CosWriter { let bs = resp.into_body().bytes().await?; let result: InitiateMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + quick_xml::de::from_reader(bytes::Buf::reader(bs)) + .map_err(new_xml_deserialize_error)?; Ok(result.upload_id) } diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 00998c77d..2c51efc41 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -40,8 +40,8 @@ impl DropboxWriter { #[async_trait] impl oio::Write for DropboxWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); let resp = self .core @@ -49,14 +49,14 @@ impl oio::Write for DropboxWriter { &self.path, Some(size), self.op.content_type(), - AsyncBody::Bytes(bs), + AsyncBody::Bytes(bs.copy_to_bytes(size)), ) .await?; let status = resp.status(); match status { StatusCode::OK => { resp.into_body().consume().await?; - Ok(size as u64) + Ok(size) } _ => Err(parse_error(resp).await?), } diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 4a9a0471d..b317bf6ab 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -49,21 +49,8 @@ impl<F> FsWriter<F> { #[async_trait] impl oio::Write for FsWriter<tokio::fs::File> { - /// # Notes - /// - /// File could be partial written, so we will seek to start to make sure - /// we write the same content. - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len() as u64; - - self.f - .seek(SeekFrom::Start(self.pos)) - .await - .map_err(parse_io_error)?; - self.f.write_all(&bs).await.map_err(parse_io_error)?; - self.pos += size; - - Ok(size) + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + self.f.write(&bs.chunk()).await.map_err(parse_io_error) } async fn abort(&mut self) -> Result<()> { @@ -87,18 +74,8 @@ impl oio::Write for FsWriter<tokio::fs::File> { } impl oio::BlockingWrite for FsWriter<std::fs::File> { - /// # Notes - /// - /// File could be partial written, so we will seek to start to make sure - /// we write the same content. - fn write(&mut self, bs: Bytes) -> Result<u64> { - self.f - .seek(SeekFrom::Start(self.pos)) - .map_err(parse_io_error)?; - self.f.write_all(&bs).map_err(parse_io_error)?; - self.pos += bs.len() as u64; - - Ok(bs.len() as u64) + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + self.f.write(&bs.chunk()).map_err(parse_io_error) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 3488255f9..d77703c57 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -41,8 +41,9 @@ impl FtpWriter { #[async_trait] impl oio::Write for FtpWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); + let bs = bs.copy_to_bytes(size); let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?; let mut data_stream = ftp_stream.append_with_stream(&self.path).await?; @@ -52,7 +53,7 @@ impl oio::Write for FtpWriter { ftp_stream.finalize_put_stream(data_stream).await?; - Ok(size as u64) + Ok(size) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 68e13b27f..317c6b1a0 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -118,16 +118,16 @@ impl GcsWriter { #[async_trait] impl oio::Write for GcsWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len() as u64; + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); let location = match &self.location { Some(location) => location, None => { - if self.op.content_length().unwrap_or_default() == bs.len() as u64 - && self.written == 0 + if self.op.content_length().unwrap_or_default() == size as u64 && self.written == 0 { - self.write_oneshot(size, AsyncBody::Bytes(bs)).await?; + self.write_oneshot(size as u64, AsyncBody::Bytes(bs.copy_to_bytes(size))) + .await?; return Ok(size); } else { @@ -138,7 +138,7 @@ impl oio::Write for GcsWriter { } }; - self.buffer.push(bs); + self.buffer.push(bs.copy_to_bytes(size)); // Return directly if the buffer is not full if self.buffer.len() <= self.write_fixed_size { return Ok(size); diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index 8d69bf8e2..606b3bf10 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -95,12 +95,14 @@ impl GdriveWriter { #[async_trait] impl oio::Write for GdriveWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len() as u64; + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); if self.file_id.is_none() { - self.write_create(size, bs).await?; + self.write_create(size as u64, bs.copy_to_bytes(size)) + .await?; } else { - self.write_overwrite(size, bs).await?; + self.write_overwrite(size as u64, bs.copy_to_bytes(size)) + .await?; } Ok(size) diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index a7db6acab..faad830c2 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -42,18 +42,23 @@ impl GhacWriter { #[async_trait] impl oio::Write for GhacWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len() as u64; + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); + let req = self .backend - .ghac_upload(self.cache_id, size, AsyncBody::Bytes(bs)) + .ghac_upload( + self.cache_id, + size as u64, + AsyncBody::Bytes(bs.copy_to_bytes(size)), + ) .await?; let resp = self.backend.client.send(req).await?; if resp.status().is_success() { resp.into_body().consume().await?; - self.size += size; + self.size += size as u64; Ok(size) } else { Err(parse_error(resp) diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 90499e47c..12e4eb557 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -41,21 +41,8 @@ impl<F> HdfsWriter<F> { #[async_trait] impl oio::Write for HdfsWriter<hdrs::AsyncFile> { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); - - while self.pos < bs.len() { - let n = self - .f - .write(&bs[self.pos..]) - .await - .map_err(parse_io_error)?; - self.pos += n; - } - // Reset pos to 0 for next write. - self.pos = 0; - - Ok(size as u64) + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + self.f.write(bs.chunk()).await.map_err(parse_io_error) } async fn abort(&mut self) -> Result<()> { @@ -73,17 +60,8 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { } impl oio::BlockingWrite for HdfsWriter<hdrs::File> { - fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); - - while self.pos < bs.len() { - let n = self.f.write(&bs[self.pos..]).map_err(parse_io_error)?; - self.pos += n; - } - // Reset pos to 0 for next write. - self.pos = 0; - - Ok(size as u64) + fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + self.f.write(bs.chunk()).map_err(parse_io_error) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index d3f98e77b..7e5199b2a 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -38,9 +38,12 @@ impl IpmfsWriter { #[async_trait] impl oio::Write for IpmfsWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len() as u64; - let resp = self.backend.ipmfs_write(&self.path, bs).await?; + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); + let resp = self + .backend + .ipmfs_write(&self.path, bs.copy_to_bytes(size)) + .await?; let status = resp.status(); diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index d3b1e119f..e37c51a45 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -18,13 +18,11 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Buf; use http::StatusCode; use super::core::*; use super::error::parse_error; use crate::raw::oio::MultipartUploadPart; -use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -53,13 +51,14 @@ impl ObsWriter { #[async_trait] impl oio::OneShotWrite for ObsWriter { - async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { + async fn write_once(&self, bs: &dyn Buf) -> Result<()> { + let size = bs.remaining(); let mut req = self.core.obs_put_object_request( &self.path, - Some(size), + Some(size as u64), self.op.content_type(), self.op.cache_control(), - AsyncBody::Stream(stream), + AsyncBody::Bytes(bs.copy_to_bytes(size)), )?; self.core.sign(&mut req).await?; @@ -93,7 +92,8 @@ impl oio::MultipartUploadWrite for ObsWriter { let bs = resp.into_body().bytes().await?; let result: InitiateMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + quick_xml::de::from_reader(bytes::Buf::reader(bs)) + .map_err(new_xml_deserialize_error)?; Ok(result.upload_id) } diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index abb846871..49337718a 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Buf; use bytes::Bytes; use http::StatusCode; @@ -46,8 +45,9 @@ impl OneDriveWriter { #[async_trait] impl oio::Write for OneDriveWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); + let bs = bs.copy_to_bytes(size); if size <= Self::MAX_SIMPLE_SIZE { self.write_simple(bs).await?; @@ -55,7 +55,7 @@ impl oio::Write for OneDriveWriter { self.write_chunked(bs).await?; } - Ok(size as u64) + Ok(size) } async fn abort(&mut self) -> Result<()> { @@ -167,7 +167,7 @@ impl OneDriveWriter { StatusCode::OK => { let bs = resp.into_body().bytes().await?; let result: OneDriveUploadSessionCreationResponseBody = - serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; + serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; Ok(result) } _ => Err(parse_error(resp).await?), diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 27aa09011..95935ba96 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -18,12 +18,10 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Buf; use http::StatusCode; use super::core::*; use super::error::parse_error; -use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -52,14 +50,15 @@ impl OssWriter { #[async_trait] impl oio::OneShotWrite for OssWriter { - async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { + async fn write_once(&self, bs: &dyn Buf) -> Result<()> { + let size = bs.remaining(); let mut req = self.core.oss_put_object_request( &self.path, - Some(size), + Some(size as u64), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - AsyncBody::Stream(stream), + AsyncBody::Bytes(bs.copy_to_bytes(size)), false, )?; @@ -100,7 +99,8 @@ impl oio::MultipartUploadWrite for OssWriter { let bs = resp.into_body().bytes().await?; let result: InitiateMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + quick_xml::de::from_reader(bytes::Buf::reader(bs)) + .map_err(new_xml_deserialize_error)?; Ok(result.upload_id) } diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index a27341d71..4b8df2246 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -18,12 +18,10 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Buf; use http::StatusCode; use super::core::*; use super::error::parse_error; -use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -49,14 +47,16 @@ impl S3Writer { #[async_trait] impl oio::OneShotWrite for S3Writer { - async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { + async fn write_once(&self, bs: &dyn Buf) -> Result<()> { + let size = bs.remaining(); + let mut req = self.core.s3_put_object_request( &self.path, - Some(size), + Some(size as u64), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - AsyncBody::Stream(stream), + AsyncBody::Bytes(bs.copy_to_bytes(size)), )?; self.core.sign(&mut req).await?; @@ -95,7 +95,8 @@ impl oio::MultipartUploadWrite for S3Writer { let bs = resp.into_body().bytes().await?; let result: InitiateMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + quick_xml::de::from_reader(bytes::Buf::reader(bs)) + .map_err(new_xml_deserialize_error)?; Ok(result.upload_id) } diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 2df725ab2..5ddbd5ea0 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -20,9 +20,7 @@ use bytes::Bytes; use openssh_sftp_client::file::File; use crate::raw::oio; -use crate::Error; -use crate::ErrorKind; -use crate::Result; +use crate::*; pub struct SftpWriter { file: File, @@ -36,9 +34,8 @@ impl SftpWriter { #[async_trait] impl oio::Write for SftpWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len() as u64; - self.file.write_all(&bs).await?; + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = self.file.write(bs.chunk()).await?; Ok(size) } diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index 5c57d7a95..6d592512f 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -67,14 +67,10 @@ impl SupabaseWriter { #[async_trait] impl oio::Write for SupabaseWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - if bs.is_empty() { - return Ok(9); - } - - let size = bs.len(); - self.upload(bs).await?; - Ok(size as u64) + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); + self.upload(bs.copy_to_bytes(size)).await?; + Ok(size) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index efb223df6..a6d9b24b9 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -39,15 +39,15 @@ impl VercelArtifactsWriter { #[async_trait] impl oio::Write for VercelArtifactsWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); let resp = self .backend .vercel_artifacts_put( self.path.as_str(), self.op.content_length().unwrap(), - AsyncBody::Bytes(bs), + AsyncBody::Bytes(bs.copy_to_bytes(size)), ) .await?; @@ -56,7 +56,7 @@ impl oio::Write for VercelArtifactsWriter { match status { StatusCode::OK | StatusCode::ACCEPTED => { resp.into_body().consume().await?; - Ok(size as u64) + Ok(size) } _ => Err(parse_error(resp).await?), } diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 55d898ccc..b2825291b 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -41,8 +41,8 @@ impl WasabiWriter { #[async_trait] impl oio::Write for WasabiWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); let resp = self .core @@ -52,14 +52,14 @@ impl oio::Write for WasabiWriter { self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - AsyncBody::Bytes(bs), + AsyncBody::Bytes(bs.copy_to_bytes(size)), ) .await?; match resp.status() { StatusCode::CREATED | StatusCode::OK => { resp.into_body().consume().await?; - Ok(size as u64) + Ok(size) } _ => Err(parse_error(resp).await?), } diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index 30b9827f2..ad9744ea0 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -62,10 +62,11 @@ impl WebdavWriter { #[async_trait] impl oio::Write for WebdavWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len() as u64; + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); - self.write_oneshot(size, AsyncBody::Bytes(bs)).await?; + self.write_oneshot(size as u64, AsyncBody::Bytes(bs.copy_to_bytes(size))) + .await?; Ok(size) } diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 8dcbc9dc3..6357a9e3a 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -39,8 +39,8 @@ impl WebhdfsWriter { #[async_trait] impl oio::Write for WebhdfsWriter { - async fn write(&mut self, bs: Bytes) -> Result<u64> { - let size = bs.len(); + async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { + let size = bs.remaining(); let req = self .backend @@ -48,7 +48,7 @@ impl oio::Write for WebhdfsWriter { &self.path, Some(size), self.op.content_type(), - AsyncBody::Bytes(bs), + AsyncBody::Bytes(bs.copy_to_bytes(size)), ) .await?; @@ -58,7 +58,7 @@ impl oio::Write for WebhdfsWriter { match status { StatusCode::CREATED | StatusCode::OK => { resp.into_body().consume().await?; - Ok(size as u64) + Ok(size) } _ => Err(parse_error(resp).await?), } diff --git a/core/src/raw/oio/buf.rs b/core/src/types/buf.rs similarity index 89% rename from core/src/raw/oio/buf.rs rename to core/src/types/buf.rs index 9b7aa7135..18024641a 100644 --- a/core/src/raw/oio/buf.rs +++ b/core/src/types/buf.rs @@ -22,7 +22,7 @@ use std::{cmp, ptr}; /// /// The biggest difference is that `Buf`'s `copy_to_slice` and `copy_to_bytes` only needs `&self` /// instead of `&mut self`. -pub trait Buf { +pub trait Buf: Send + Sync { /// Returns the number of bytes between the current position and the end of the buffer. /// /// This value is greater than or equal to the length of the slice returned by chunk(). @@ -81,11 +81,12 @@ pub trait Buf { /// # Notes /// /// Users should not assume the returned bytes is the same as the Buf::remaining(). - fn copy_to_bytes(&self) -> Bytes { + fn copy_to_bytes(&self, len: usize) -> Bytes { let src = self.chunk(); + let size = cmp::min(src.len(), len); - let mut ret = BytesMut::with_capacity(src.len()); - ret.extend_from_slice(src); + let mut ret = BytesMut::with_capacity(size); + ret.extend_from_slice(&src[..size]); ret.freeze() } } @@ -108,8 +109,8 @@ macro_rules! deref_forward_buf { (**self).copy_to_slice(dst) } - fn copy_to_bytes(&self) -> Bytes { - (**self).copy_to_bytes() + fn copy_to_bytes(&self, len: usize) -> Bytes { + (**self).copy_to_bytes(len) } }; } @@ -139,7 +140,7 @@ impl Buf for &[u8] { } } -impl<T: AsRef<[u8]>> Buf for std::io::Cursor<T> { +impl<T: AsRef<[u8]> + Send + Sync> Buf for std::io::Cursor<T> { fn remaining(&self) -> usize { let len = self.get_ref().as_ref().len(); let pos = self.position(); @@ -189,8 +190,9 @@ impl Buf for Bytes { } #[inline] - fn copy_to_bytes(&self) -> Bytes { - self.clone() + fn copy_to_bytes(&self, len: usize) -> Bytes { + let size = cmp::min(self.len(), len); + self.slice(..size) } } @@ -211,7 +213,8 @@ impl Buf for BytesMut { } #[inline] - fn copy_to_bytes(&self) -> Bytes { - self.clone().freeze() + fn copy_to_bytes(&self, len: usize) -> Bytes { + let size = cmp::min(self.len(), len); + Bytes::from(&self[..size]) } } diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs index 0f75fa50c..dff3ea2cb 100644 --- a/core/src/types/mod.rs +++ b/core/src/types/mod.rs @@ -58,3 +58,6 @@ pub use scheme::Scheme; mod capability; pub use capability::Capability; + +mod buf; +pub use buf::Buf; diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index c1d34f87d..c17054b67 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -550,7 +550,7 @@ impl BlockingOperator { self.inner().clone(), path, (OpWrite::default().with_content_length(bs.len() as u64), bs), - |inner, path, (args, bs)| { + |inner, path, (args, mut bs)| { if !validate_path(&path, EntryMode::FILE) { return Err( Error::new(ErrorKind::IsADirectory, "write path is a directory") @@ -561,7 +561,10 @@ impl BlockingOperator { } let (_, mut w) = inner.blocking_write(&path, args)?; - w.write(bs)?; + while bs.remaining() > 0 { + let n = w.write(&bs)?; + bs.advance(n); + } w.close()?; Ok(()) diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index ab97a2df2..b51c2bc60 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -17,7 +17,7 @@ use std::time::Duration; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use futures::stream; use futures::AsyncReadExt; use futures::Stream; @@ -716,7 +716,7 @@ impl Operator { self.inner().clone(), path, (OpWrite::default().with_content_length(bs.len() as u64), bs), - |inner, path, (args, bs)| { + |inner, path, (args, mut bs)| { let fut = async move { if !validate_path(&path, EntryMode::FILE) { return Err(Error::new( @@ -729,7 +729,11 @@ impl Operator { } let (_, mut w) = inner.write(&path, args).await?; - w.write(bs).await?; + while bs.remaining() > 0 { + let n = w.write(&bs).await?; + bs.advance(n); + } + w.close().await?; Ok(()) diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs index 6efc8cdad..1688700d5 100644 --- a/core/src/types/reader.rs +++ b/core/src/types/reader.rs @@ -266,9 +266,7 @@ mod tests { let path = "test_file"; let content = gen_random_bytes(); - op.write(path, content.clone()) - .await - .expect("write must succeed"); + op.write(path, content).await.expect("write must succeed"); let mut reader = op.reader(path).await.unwrap(); let mut buf = Vec::new(); diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 3ae7f6f4c..e78c83212 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -22,7 +22,7 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use futures::future::BoxFuture; use futures::AsyncWrite; use futures::FutureExt; @@ -80,7 +80,7 @@ impl Writer { } /// Write into inner writer. - pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> { + pub async fn write(&mut self, mut bs: impl Buf) -> Result<()> { let w = if let State::Idle(Some(w)) = &mut self.state { w } else { @@ -90,11 +90,9 @@ impl Writer { ); }; - let mut bs = bs.into(); - - while !bs.is_empty() { - let n = w.write(bs.clone()).await?; - bs.advance(n as usize); + while bs.remaining() > 0 { + let n = w.write(&bs).await?; + bs.advance(n); } Ok(()) @@ -149,10 +147,10 @@ impl Writer { let mut written = 0; while let Some(bs) = sink_from.try_next().await? { let mut bs = bs.into(); - while bs.has_remaining() { - let n = w.write(bs.clone()).await?; - bs.advance(n as usize); - written += n; + while bs.remaining() > 0 { + let n = w.write(&bs).await?; + bs.advance(n); + written += n as u64; } } Ok(written) @@ -262,10 +260,9 @@ impl AsyncWrite for Writer { let mut w = w .take() .expect("invalid state of writer: Idle state with empty write"); - let bs = Bytes::from(buf.to_vec()); let fut = async move { - let n = w.write(bs).await?; - Ok((n as usize, w)) + let n = w.write(&buf).await?; + Ok((n, w)) }; self.state = State::Write(Box::pin(fut)); } @@ -334,9 +331,8 @@ impl tokio::io::AsyncWrite for Writer { let mut w = w .take() .expect("invalid state of writer: Idle state with empty write"); - let bs = Bytes::from(buf.to_vec()); let fut = async move { - let n = w.write(bs).await?; + let n = w.write(&buf).await?; Ok((n as usize, w)) }; self.state = State::Write(Box::pin(fut)); @@ -414,12 +410,10 @@ impl BlockingWriter { } /// Write into inner writer. - pub fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> { - let mut bs = bs.into(); - - while !bs.is_empty() { - let n = self.inner.write(bs.clone())?; - bs.advance(n as usize); + pub fn write(&mut self, mut bs: impl Buf) -> Result<()> { + while bs.remaining() > 0 { + let n = self.inner.write(&bs)?; + bs.advance(n); } Ok(()) @@ -434,8 +428,7 @@ impl BlockingWriter { impl io::Write for BlockingWriter { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.inner - .write(Bytes::from(buf.to_vec())) - .map(|n| n as usize) + .write(&buf) .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) }
