This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch add-docs-and-checks
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 303328803b2db13cc4e34dca0ec845ef137ae1dd
Author: Xuanwo <[email protected]>
AuthorDate: Wed Apr 19 19:53:38 2023 +0800

    feat: Add size check for sized writer
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs    | 125 +++++++++++++++++++++++++++++++++++++++--
 core/src/raw/http_util/body.rs |   4 +-
 core/src/types/error.rs        |  20 +++++++
 core/src/types/writer.rs       |  17 ++++--
 4 files changed, 154 insertions(+), 12 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 0fbd7178..e4625101 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -23,6 +23,7 @@ use std::task::Context;
 use std::task::Poll;
 
 use async_trait::async_trait;
+use bytes::Bytes;
 
 use crate::ops::*;
 use crate::raw::oio::into_reader::RangeReader;
@@ -329,8 +330,8 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
     type Inner = A;
     type Reader = CompleteReader<A, A::Reader>;
     type BlockingReader = CompleteReader<A, A::BlockingReader>;
-    type Writer = A::Writer;
-    type BlockingWriter = A::BlockingWriter;
+    type Writer = CompleteWriter<A::Writer>;
+    type BlockingWriter = CompleteWriter<A::BlockingWriter>;
     type Pager = CompletePager<A, A::Pager>;
     type BlockingPager = CompletePager<A, A::BlockingPager>;
 
@@ -365,11 +366,18 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        self.inner.write(path, args).await
+        let size = args.content_length();
+        self.inner
+            .write(path, args)
+            .await
+            .map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
     }
 
     fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
-        self.inner.blocking_write(path, args)
+        let size = args.content_length();
+        self.inner
+            .blocking_write(path, args)
+            .map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
@@ -423,7 +431,7 @@ where
         }
     }
 
-    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<bytes::Bytes>>> {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
         use CompleteReader::*;
 
         match self {
@@ -460,7 +468,7 @@ where
         }
     }
 
-    fn next(&mut self) -> Option<Result<bytes::Bytes>> {
+    fn next(&mut self) -> Option<Result<Bytes>> {
         use CompleteReader::*;
 
         match self {
@@ -509,3 +517,108 @@ where
         }
     }
 }
+
+pub struct CompleteWriter<W> {
+    inner: W,
+    size: Option<u64>,
+    written: u64,
+}
+
+impl<W> CompleteWriter<W> {
+    pub fn new(inner: W, size: Option<u64>) -> CompleteWriter<W> {
+        CompleteWriter {
+            inner,
+            size,
+            written: 0,
+        }
+    }
+}
+
+#[async_trait]
+impl<W> oio::Write for CompleteWriter<W>
+where
+    W: oio::Write,
+{
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let n = bs.len();
+
+        if let Some(size) = self.size {
+            if self.written + n as u64 > size {
+                return Err(Error::new(
+                    ErrorKind::ContentTruncated,
+                    &format!(
+                        "writer got too much data, expect: {size}, actual: {}",
+                        self.written + n as u64
+                    ),
+                ));
+            }
+        }
+
+        self.inner.write(bs).await?;
+        self.written += n as u64;
+        Ok(())
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.inner.abort().await
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        if let Some(size) = self.size {
+            if self.written < size {
+                return Err(Error::new(
+                    ErrorKind::ContentIncomplete,
+                    &format!(
+                        "writer got too less data, expect: {size}, actual: {}",
+                        self.written
+                    ),
+                ));
+            }
+        }
+
+        self.inner.close().await?;
+        Ok(())
+    }
+}
+
+impl<W> oio::BlockingWrite for CompleteWriter<W>
+where
+    W: oio::BlockingWrite,
+{
+    fn write(&mut self, bs: Bytes) -> Result<()> {
+        let n = bs.len();
+
+        if let Some(size) = self.size {
+            if self.written + n as u64 > size {
+                return Err(Error::new(
+                    ErrorKind::ContentTruncated,
+                    &format!(
+                        "writer got too much data, expect: {size}, actual: {}",
+                        self.written + n as u64
+                    ),
+                ));
+            }
+        }
+
+        self.inner.write(bs)?;
+        self.written += n as u64;
+        Ok(())
+    }
+
+    fn close(&mut self) -> Result<()> {
+        if let Some(size) = self.size {
+            if self.written < size {
+                return Err(Error::new(
+                    ErrorKind::ContentIncomplete,
+                    &format!(
+                        "writer got too less data, expect: {size}, actual: {}",
+                        self.written
+                    ),
+                ));
+            }
+        }
+
+        self.inner.close()?;
+        Ok(())
+    }
+}
diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs
index b4d43247..4d8eec3f 100644
--- a/core/src/raw/http_util/body.rs
+++ b/core/src/raw/http_util/body.rs
@@ -144,11 +144,11 @@ impl IncomingAsyncBody {
         match actual.cmp(&expect) {
             Ordering::Equal => Ok(()),
             Ordering::Less => Err(Error::new(
-                ErrorKind::Unexpected,
+                ErrorKind::ContentIncomplete,
                 &format!("reader got too less data, expect: {expect}, actual: 
{actual}"),
             )),
             Ordering::Greater => Err(Error::new(
-                ErrorKind::Unexpected,
+                ErrorKind::ContentTruncated,
                 &format!("reader got too much data, expect: {expect}, actual: 
{actual}"),
             )),
         }
diff --git a/core/src/types/error.rs b/core/src/types/error.rs
index 79475db9..6317b5d7 100644
--- a/core/src/types/error.rs
+++ b/core/src/types/error.rs
@@ -75,6 +75,24 @@ pub enum ErrorKind {
     /// For example, reading a file with If-Match header but the file's ETag
     /// is not match.
     PreconditionFailed,
+    /// The content is truncated.
+    ///
+    /// This error kind means there are more content to come but been 
truncated.
+    ///
+    /// For examples:
+    ///
+    /// - Users expected to read 1024 bytes, but service returned more bytes.
+    /// - Service expected to write 1024 bytes, but users write more bytes.
+    ContentTruncated,
+    /// The content is incomplete.
+    ///
+    /// This error kind means expect content length is not reached.
+    ///
+    /// For examples:
+    ///
+    /// - Users expected to read 1024 bytes, but service returned less bytes.
+    /// - Service expected to write 1024 bytes, but users write less bytes.
+    ContentIncomplete,
 }
 
 impl ErrorKind {
@@ -104,6 +122,8 @@ impl From<ErrorKind> for &'static str {
             ErrorKind::RateLimited => "RateLimited",
             ErrorKind::IsSameFile => "IsSameFile",
             ErrorKind::PreconditionFailed => "PreconditionFailed",
+            ErrorKind::ContentTruncated => "ContentTruncated",
+            ErrorKind::ContentIncomplete => "ContentIncomplete",
         }
     }
 }
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 321988b9..859dcfe5 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -35,11 +35,20 @@ use crate::*;
 /// Writer is designed to write data into given path in an asynchronous
 /// manner.
 ///
-/// # Notes
+/// ## Notes
 ///
-/// Writer is designed for appending multiple blocks which could
-/// lead to much requests. If only want to send all data in single chunk,
-/// please use [`Operator::write`] instead.
+/// Writer can be used in two ways:
+///
+/// - Sized: write data with a known size by specify the content length.
+/// - Unsized: write data with an unknown size, also known as streaming.
+///
+/// All services will support `sized` writer and provide special optimization 
if
+/// the given data size is the same as the content length, allowing them to
+/// be written in one request.
+///
+/// Some services also supports `unsized` writer. They MAY buffer part of the 
data
+/// and flush them into storage at needs. And finally, the file will be 
avaliable
+/// after `close` has been called.
 pub struct Writer {
     state: State,
 }

Reply via email to