This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch add-docs-and-checks in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 303328803b2db13cc4e34dca0ec845ef137ae1dd Author: Xuanwo <[email protected]> AuthorDate: Wed Apr 19 19:53:38 2023 +0800 feat: Add size check for sized writer Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 125 +++++++++++++++++++++++++++++++++++++++-- core/src/raw/http_util/body.rs | 4 +- core/src/types/error.rs | 20 +++++++ core/src/types/writer.rs | 17 ++++-- 4 files changed, 154 insertions(+), 12 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 0fbd7178..e4625101 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -23,6 +23,7 @@ use std::task::Context; use std::task::Poll; use async_trait::async_trait; +use bytes::Bytes; use crate::ops::*; use crate::raw::oio::into_reader::RangeReader; @@ -329,8 +330,8 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { type Inner = A; type Reader = CompleteReader<A, A::Reader>; type BlockingReader = CompleteReader<A, A::BlockingReader>; - type Writer = A::Writer; - type BlockingWriter = A::BlockingWriter; + type Writer = CompleteWriter<A::Writer>; + type BlockingWriter = CompleteWriter<A::BlockingWriter>; type Pager = CompletePager<A, A::Pager>; type BlockingPager = CompletePager<A, A::BlockingPager>; @@ -365,11 +366,18 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.inner.write(path, args).await + let size = args.content_length(); + self.inner + .write(path, args) + .await + .map(|(rp, w)| (rp, CompleteWriter::new(w, size))) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.inner.blocking_write(path, args) + let size = args.content_length(); + self.inner + .blocking_write(path, args) + .map(|(rp, w)| (rp, CompleteWriter::new(w, size))) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { @@ -423,7 +431,7 @@ where } } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<bytes::Bytes>>> { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { use CompleteReader::*; match self { @@ -460,7 +468,7 @@ where } } - fn next(&mut self) -> Option<Result<bytes::Bytes>> { + fn next(&mut self) -> Option<Result<Bytes>> { use CompleteReader::*; match self { @@ -509,3 +517,108 @@ where } } } + +pub struct CompleteWriter<W> { + inner: W, + size: Option<u64>, + written: u64, +} + +impl<W> CompleteWriter<W> { + pub fn new(inner: W, size: Option<u64>) -> CompleteWriter<W> { + CompleteWriter { + inner, + size, + written: 0, + } + } +} + +#[async_trait] +impl<W> oio::Write for CompleteWriter<W> +where + W: oio::Write, +{ + async fn write(&mut self, bs: Bytes) -> Result<()> { + let n = bs.len(); + + if let Some(size) = self.size { + if self.written + n as u64 > size { + return Err(Error::new( + ErrorKind::ContentTruncated, + &format!( + "writer got too much data, expect: {size}, actual: {}", + self.written + n as u64 + ), + )); + } + } + + self.inner.write(bs).await?; + self.written += n as u64; + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + self.inner.abort().await + } + + async fn close(&mut self) -> Result<()> { + if let Some(size) = self.size { + if self.written < size { + return Err(Error::new( + ErrorKind::ContentIncomplete, + &format!( + "writer got too less data, expect: {size}, actual: {}", + self.written + ), + )); + } + } + + self.inner.close().await?; + Ok(()) + } +} + +impl<W> oio::BlockingWrite for CompleteWriter<W> +where + W: oio::BlockingWrite, +{ + fn write(&mut self, bs: Bytes) -> Result<()> { + let n = bs.len(); + + if let Some(size) = self.size { + if self.written + n as u64 > size { + return Err(Error::new( + ErrorKind::ContentTruncated, + &format!( + "writer got too much data, expect: {size}, actual: {}", + self.written + n as u64 + ), + )); + } + } + + self.inner.write(bs)?; + self.written += n as u64; + Ok(()) + } + + fn close(&mut self) -> Result<()> { + if let Some(size) = self.size { + if self.written < size { + return Err(Error::new( + ErrorKind::ContentIncomplete, + &format!( + "writer got too less data, expect: {size}, actual: {}", + self.written + ), + )); + } + } + + self.inner.close()?; + Ok(()) + } +} diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index b4d43247..4d8eec3f 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -144,11 +144,11 @@ impl IncomingAsyncBody { match actual.cmp(&expect) { Ordering::Equal => Ok(()), Ordering::Less => Err(Error::new( - ErrorKind::Unexpected, + ErrorKind::ContentIncomplete, &format!("reader got too less data, expect: {expect}, actual: {actual}"), )), Ordering::Greater => Err(Error::new( - ErrorKind::Unexpected, + ErrorKind::ContentTruncated, &format!("reader got too much data, expect: {expect}, actual: {actual}"), )), } diff --git a/core/src/types/error.rs b/core/src/types/error.rs index 79475db9..6317b5d7 100644 --- a/core/src/types/error.rs +++ b/core/src/types/error.rs @@ -75,6 +75,24 @@ pub enum ErrorKind { /// For example, reading a file with If-Match header but the file's ETag /// is not match. PreconditionFailed, + /// The content is truncated. + /// + /// This error kind means there are more content to come but been truncated. + /// + /// For examples: + /// + /// - Users expected to read 1024 bytes, but service returned more bytes. + /// - Service expected to write 1024 bytes, but users write more bytes. + ContentTruncated, + /// The content is incomplete. + /// + /// This error kind means expect content length is not reached. + /// + /// For examples: + /// + /// - Users expected to read 1024 bytes, but service returned less bytes. + /// - Service expected to write 1024 bytes, but users write less bytes. + ContentIncomplete, } impl ErrorKind { @@ -104,6 +122,8 @@ impl From<ErrorKind> for &'static str { ErrorKind::RateLimited => "RateLimited", ErrorKind::IsSameFile => "IsSameFile", ErrorKind::PreconditionFailed => "PreconditionFailed", + ErrorKind::ContentTruncated => "ContentTruncated", + ErrorKind::ContentIncomplete => "ContentIncomplete", } } } diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 321988b9..859dcfe5 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -35,11 +35,20 @@ use crate::*; /// Writer is designed to write data into given path in an asynchronous /// manner. /// -/// # Notes +/// ## Notes /// -/// Writer is designed for appending multiple blocks which could -/// lead to much requests. If only want to send all data in single chunk, -/// please use [`Operator::write`] instead. +/// Writer can be used in two ways: +/// +/// - Sized: write data with a known size by specify the content length. +/// - Unsized: write data with an unknown size, also known as streaming. +/// +/// All services will support `sized` writer and provide special optimization if +/// the given data size is the same as the content length, allowing them to +/// be written in one request. +/// +/// Some services also supports `unsized` writer. They MAY buffer part of the data +/// and flush them into storage at needs. And finally, the file will be avaliable +/// after `close` has been called. pub struct Writer { state: State, }
