This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-writer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 4d71d9c777929b11b1b4b6eae2551b3a9f470f59
Author: Xuanwo <[email protected]>
AuthorDate: Wed Apr 19 15:58:17 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/cursor.rs                   | 129 +++++++++++++++++++
 core/src/raw/oio/mod.rs                      |   1 +
 core/src/raw/oio/write.rs                    |  28 +++--
 core/src/services/azblob/backend.rs          |   4 +-
 core/src/services/azdfs/backend.rs           |   4 +-
 core/src/services/ftp/backend.rs             |   4 +-
 core/src/services/gcs/backend.rs             |  25 +---
 core/src/services/gcs/core.rs                |  10 +-
 core/src/services/gcs/writer.rs              | 180 +++++++++++++++++----------
 core/src/services/ghac/backend.rs            |   4 +-
 core/src/services/ipmfs/backend.rs           |   4 +-
 core/src/services/obs/backend.rs             |  10 +-
 core/src/services/obs/core.rs                |   5 -
 core/src/services/obs/writer.rs              |   1 -
 core/src/services/oss/backend.rs             |  18 +--
 core/src/services/oss/writer.rs              |  96 +++++++++++---
 core/src/services/s3/backend.rs              |  32 +----
 core/src/services/s3/core.rs                 |   5 -
 core/src/services/s3/writer.rs               | 119 ++++++++++++++----
 core/src/services/webdav/backend.rs          |   4 +-
 core/src/services/webhdfs/backend.rs         |   4 +-
 core/src/types/operator/blocking_operator.rs |  18 ++-
 core/src/types/operator/operator.rs          |  18 ++-
 core/src/types/ops.rs                        |  32 ++---
 core/src/types/writer.rs                     |   4 +-
 25 files changed, 510 insertions(+), 249 deletions(-)

diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index c0282ffd..f8db0539 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -15,12 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::VecDeque;
 use std::io::Read;
 use std::io::SeekFrom;
 use std::task::Context;
 use std::task::Poll;
 
+use bytes::Buf;
 use bytes::Bytes;
+use bytes::BytesMut;
 
 use crate::raw::*;
 use crate::*;
@@ -138,3 +141,129 @@ impl oio::BlockingRead for Cursor {
         }
     }
 }
+
+/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Read`]
+pub struct VectorCursor {
+    inner: VecDeque<Bytes>,
+    size: usize,
+}
+
+impl VectorCursor {
+    /// Create a new vector cursor.
+    pub fn new() -> Self {
+        Self {
+            inner: VecDeque::new(),
+            size: 0,
+        }
+    }
+
+    /// Returns `true` if current vector is empty.
+    pub fn is_empty(&self) -> bool {
+        self.inner.is_empty()
+    }
+
+    /// Return current bytes size of current vector.
+    pub fn len(&self) -> usize {
+        self.size
+    }
+
+    /// Push a new bytes into vector cursor.
+    pub fn push(&mut self, bs: Bytes) {
+        self.size += bs.len();
+        self.inner.push_back(bs);
+    }
+
+    /// Pop a bytes from vector cursor.
+    pub fn pop(&mut self) {
+        let bs = self.inner.pop_back();
+        self.size -= bs.expect("poped bytes must exist").len()
+    }
+
+    /// Clear the entire vector.
+    pub fn clear(&mut self) {
+        self.inner.clear();
+        self.size = 0;
+    }
+
+    /// Peak will read and copy n bytes from current cursor without
+    /// change it's content.
+    ///
+    /// # Panics
+    ///
+    /// Panics if n is larger than current size.
+    ///
+    /// # TODO
+    ///
+    /// Optimize to avoid data copy.
+    pub fn peak(&self, n: usize) -> Bytes {
+        assert!(n <= self.size, "peak size must smamller than current size");
+
+        // Avoid data copy if n is smaller than first chunk.
+        if self.inner[0].len() >= n {
+            return self.inner[0].slice(..n);
+        }
+
+        let mut bs = BytesMut::with_capacity(n);
+        let mut n = n;
+        for b in &self.inner {
+            if n == 0 {
+                break;
+            }
+            let len = b.len().min(n);
+            bs.extend_from_slice(&b[..len]);
+            n -= len;
+        }
+        bs.freeze()
+    }
+
+    /// Take will consume n bytes from current cursor.
+    ///
+    /// # Panics
+    ///
+    /// Panics if n is larger than current size.
+    pub fn take(&mut self, n: usize) {
+        assert!(n <= self.size, "take size must smamller than current size");
+
+        // Update current size
+        self.size -= n;
+
+        let mut n = n;
+        while n > 0 {
+            assert!(!self.inner.is_empty(), "inner must not be empty");
+
+            if self.inner[0].len() <= n {
+                n -= self.inner[0].len();
+                self.inner.pop_front();
+            } else {
+                self.inner[0].advance(n);
+                n = 0;
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_vector_cursor() {
+        let mut vc = VectorCursor::new();
+
+        vc.push(Bytes::from("hello"));
+        vc.push(Bytes::from("world"));
+
+        assert_eq!(vc.peak(1), Bytes::from("h"));
+        assert_eq!(vc.peak(1), Bytes::from("h"));
+        assert_eq!(vc.peak(4), Bytes::from("hell"));
+        assert_eq!(vc.peak(6), Bytes::from("hellow"));
+        assert_eq!(vc.peak(10), Bytes::from("helloworld"));
+
+        vc.take(1);
+        assert_eq!(vc.peak(1), Bytes::from("e"));
+        vc.take(1);
+        assert_eq!(vc.peak(1), Bytes::from("l"));
+        vc.take(5);
+        assert_eq!(vc.peak(1), Bytes::from("r"));
+    }
+}
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index f82af4ca..56b48dda 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -43,6 +43,7 @@ pub use write::Writer;
 
 mod cursor;
 pub use cursor::Cursor;
+pub use cursor::VectorCursor;
 
 mod into_streamable;
 pub use into_streamable::into_streamable_reader;
diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs
index 05f93a93..c74e0d90 100644
--- a/core/src/raw/oio/write.rs
+++ b/core/src/raw/oio/write.rs
@@ -76,23 +76,33 @@ impl From<WriteOperation> for &'static str {
 pub type Writer = Box<dyn Write>;
 
 /// Write is the trait that OpenDAL returns to callers.
+///
+/// # Notes
+///
+/// There are two possible two cases:
+///
+/// - Sized: The total size of the object is known in advance.
+/// - Unsized: The total size of the object is unknown in advance.
+///
+/// And it's possible that the given bs length is less than the total
+/// content length. Users will call write multiple times to write
+/// the whole data.
 #[async_trait]
 pub trait Write: Unpin + Send + Sync {
-    /// Write whole content at once.
+    /// Write given into writer.
+    ///
+    /// # Notes
     ///
-    /// To append multiple bytes together, use `append` instead.
+    /// It's possible that the given bs length is less than the total
+    /// content length. And users will call write multiple times.
+    ///
+    /// Please make sure `write` is safe to re-enter.
     async fn write(&mut self, bs: Bytes) -> Result<()>;
 
     /// Append bytes to the writer.
-    ///
-    /// It is highly recommended to align the length of the input bytes
-    /// into blocks of 4MiB (except the last block) for better performance
-    /// and compatibility.
     async fn append(&mut self, bs: Bytes) -> Result<()>;
 
-    /// Abort the pending appendable writer.
-    /// #note
-    /// This method is only applicable to writers opened in append mode.
+    /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
 
     /// Close the writer and make sure all data has been flushed.
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index 5daa104e..4e5f89b4 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -497,10 +497,10 @@ impl Accessor for AzblobBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/azdfs/backend.rs 
b/core/src/services/azdfs/backend.rs
index 5c0b68db..1dbe605e 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -354,10 +354,10 @@ impl Accessor for AzdfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index f509eff2..5dd4b0ff 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -388,10 +388,10 @@ impl Accessor for FtpBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 7e865dcb..aba03063 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -410,32 +410,9 @@ impl Accessor for GcsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let upload_location = if args.append() {
-            let resp = self.core.gcs_initiate_resumable_upload(path).await?;
-            let status = resp.status();
-
-            match status {
-                StatusCode::OK => {
-                    let bs = parse_location(resp.headers())
-                        .expect("Failed to retrieve location of resumable 
upload");
-                    if let Some(location) = bs {
-                        Some(String::from(location))
-                    } else {
-                        return Err(Error::new(
-                            ErrorKind::NotFound,
-                            "location is not in the response header",
-                        ));
-                    }
-                }
-                _ => return Err(parse_error(resp).await?),
-            }
-        } else {
-            None
-        };
-
         Ok((
             RpWrite::default(),
-            GcsWriter::new(self.core.clone(), args, path.to_string(), 
upload_location),
+            GcsWriter::new(self.core.clone(), path, args),
         ))
     }
 
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 18376acd..907d0ef4 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -322,7 +322,7 @@ impl GcsCore {
         &self,
         location: &str,
         size: u64,
-        written_bytes: u64,
+        written: u64,
         is_last_part: bool,
         body: AsyncBody,
     ) -> Result<Request<AsyncBody>> {
@@ -331,12 +331,12 @@ impl GcsCore {
         let range_header = if is_last_part {
             format!(
                 "bytes {}-{}/{}",
-                written_bytes,
-                written_bytes + size - 1,
-                written_bytes + size
+                written,
+                written + size - 1,
+                written + size
             )
         } else {
-            format!("bytes {}-{}/*", written_bytes, written_bytes + size - 1)
+            format!("bytes {}-{}/*", written, written + size - 1)
         };
 
         req = req
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index b47b2051..19fbd6d5 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -29,37 +29,40 @@ use crate::*;
 
 pub struct GcsWriter {
     core: Arc<GcsCore>,
-
-    op: OpWrite,
     path: String,
+    op: OpWrite,
+
     location: Option<String>,
-    written_bytes: u64,
-    is_last_part_written: bool,
-    last: Option<Bytes>,
+    written: u64,
+    buffer: oio::VectorCursor,
+    buffer_size: usize,
 }
 
 impl GcsWriter {
-    pub fn new(
-        core: Arc<GcsCore>,
-        op: OpWrite,
-        path: String,
-        upload_location: Option<String>,
-    ) -> Self {
+    pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self {
         GcsWriter {
             core,
+            path: path.to_string(),
             op,
-            path,
-            location: upload_location,
-            written_bytes: 0,
-            is_last_part_written: false,
-            last: None,
+
+            location: None,
+            written: 0,
+            buffer: oio::VectorCursor::new(),
+            // The chunk size should be a multiple of 256 KiB
+            // (256 x 1024 bytes), unless it's the last chunk
+            // that completes the upload.
+            //
+            // Larger chunk sizes typically make uploads faster,
+            // but note that there's a tradeoff between speed and
+            // memory usage. It's recommended that you use at least
+            // 8 MiB for the chunk size.
+            //
+            // TODO: allow this value to be configured.
+            buffer_size: 8 * 1024 * 1024,
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for GcsWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
         let mut req = self.core.gcs_insert_object_request(
             &percent_encode_path(&self.path),
             Some(bs.len()),
@@ -82,80 +85,123 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let location = if let Some(location) = &self.location {
-            location
-        } else {
-            return Ok(());
-        };
+    async fn initiate_upload(&self) -> Result<String> {
+        let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?;
+        let status = resp.status();
 
-        let result = if let Some(last) = &self.last {
-            let bytes_to_upload = last.slice(0..last.len());
-            let part_size = bytes_to_upload.len() as u64;
-            let is_last_part = part_size % (256 * 1024) != 0;
-            let mut req = self.core.gcs_upload_in_resumable_upload(
-                location,
-                part_size,
-                self.written_bytes,
-                is_last_part,
-                AsyncBody::Bytes(bytes_to_upload),
-            )?;
-
-            self.core.sign(&mut req).await?;
-
-            let resp = self.core.send(req).await?;
-
-            let status = resp.status();
-
-            match status {
-                StatusCode::OK | StatusCode::PERMANENT_REDIRECT => {
-                    if is_last_part {
-                        self.is_last_part_written = true
-                    } else {
-                        self.written_bytes += part_size;
-                    }
-                    Ok(())
+        match status {
+            StatusCode::OK => {
+                let bs = parse_location(resp.headers())?;
+                if let Some(location) = bs {
+                    Ok(location.to_string())
+                } else {
+                    Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "location is not in the response header",
+                    ))
+                }
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(&self, location: &str, bs: Bytes) -> Result<()> {
+        let mut req = self.core.gcs_upload_in_resumable_upload(
+            location,
+            bs.len() as u64,
+            self.written,
+            false,
+            AsyncBody::Bytes(bs),
+        )?;
+
+        self.core.sign(&mut req).await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+        match status {
+            StatusCode::OK | StatusCode::PERMANENT_REDIRECT => Ok(()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Write for GcsWriter {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let location = match &self.location {
+            Some(location) => location,
+            None => {
+                if self.op.content_length().unwrap_or_default() == bs.len() as 
u64
+                    && self.written == 0
+                {
+                    return self.write_oneshot(bs).await;
+                } else {
+                    let location = self.initiate_upload().await?;
+                    self.location = Some(location.clone());
+                    self.location.as_deref().unwrap()
                 }
-                _ => Err(parse_error(resp).await?),
             }
-        } else {
-            Ok(())
         };
 
-        self.last = Some(bs.slice(0..bs.len()));
-        return result;
+        // Ignore empty bytes
+        if bs.len() == 0 {
+            return Ok(());
+        }
+
+        self.buffer.push(bs);
+        // Return directly if the buffer is not full
+        if self.buffer.len() <= self.buffer_size {
+            return Ok(());
+        }
+
+        let bs = self.buffer.peak(self.buffer_size);
+
+        match self.write_part(location, bs).await {
+            Ok(_) => {
+                self.buffer.take(self.buffer_size);
+                self.written += self.buffer_size as u64;
+                Ok(())
+            }
+            Err(e) => {
+                // If the upload fails, we should pop the given bs to make sure
+                // write is re-enter safe.
+                self.buffer.pop();
+                Err(e)
+            }
+        }
     }
 
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        self.write(bs).await
+    }
+
+    // TODO: we can cancel the upload by sending a DELETE request to the 
location
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
-        if self.is_last_part_written {
-            return Ok(());
-        }
-
         let location = if let Some(location) = &self.location {
             location
         } else {
             return Ok(());
         };
 
-        let bs = self
-            .last
-            .as_ref()
-            .expect("failed to get the previously uploaded part");
+        let bs = self.buffer.peak(self.buffer.len());
 
         let resp = self
             .core
-            .gcs_complete_resumable_upload(location, self.written_bytes, 
bs.slice(0..bs.len()))
+            .gcs_complete_resumable_upload(location, self.written, bs)
             .await?;
 
         let status = resp.status();
-
         match status {
             StatusCode::OK => {
                 resp.into_body().consume().await?;
+
+                self.location = None;
+                self.buffer.clear();
                 Ok(())
             }
             _ => Err(parse_error(resp).await?),
diff --git a/core/src/services/ghac/backend.rs 
b/core/src/services/ghac/backend.rs
index 159b0022..c5afeb81 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -391,10 +391,10 @@ impl Accessor for GhacBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index c2e59f84..9e9377b8 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -110,10 +110,10 @@ impl Accessor for IpmfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 3412999e..c8223d1a 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -315,9 +315,9 @@ impl Accessor for ObsBackend {
     }
 
     async fn create_dir(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
-        let mut req =
-            self.core
-                .obs_put_object_request(path, Some(0), None, None, 
AsyncBody::Empty)?;
+        let mut req = self
+            .core
+            .obs_put_object_request(path, Some(0), None, AsyncBody::Empty)?;
 
         self.core.sign(&mut req).await?;
 
@@ -352,10 +352,10 @@ impl Accessor for ObsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs
index c61f1f02..191ec95b 100644
--- a/core/src/services/obs/core.rs
+++ b/core/src/services/obs/core.rs
@@ -116,7 +116,6 @@ impl ObsCore {
         path: &str,
         size: Option<usize>,
         content_type: Option<&str>,
-        if_match: Option<&str>,
         body: AsyncBody,
     ) -> Result<Request<AsyncBody>> {
         let p = build_abs_path(&self.root, path);
@@ -125,10 +124,6 @@ impl ObsCore {
 
         let mut req = Request::put(&url);
 
-        if let Some(if_match) = if_match {
-            req = req.header(IF_MATCH, if_match);
-        }
-
         if let Some(size) = size {
             req = req.header(CONTENT_LENGTH, size)
         }
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index 080e0919..21c685c7 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -47,7 +47,6 @@ impl oio::Write for ObsWriter {
             &self.path,
             Some(bs.len()),
             self.op.content_type(),
-            self.op.if_match(),
             AsyncBody::Bytes(bs),
         )?;
 
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index b3077c7b..4e298fb6 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -407,25 +407,9 @@ impl Accessor for OssBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let upload_id = if args.append() {
-            let resp = self.core.oss_initiate_upload(path, &args).await?;
-            match resp.status() {
-                StatusCode::OK => {
-                    let bs = resp.into_body().bytes().await?;
-                    let result: InitiateMultipartUploadResult =
-                        quick_xml::de::from_reader(bs.reader())
-                            .map_err(new_xml_deserialize_error)?;
-                    Some(result.upload_id)
-                }
-                _ => return Err(parse_error(resp).await?),
-            }
-        } else {
-            None
-        };
-
         Ok((
             RpWrite::default(),
-            OssWriter::new(self.core.clone(), args, path.to_string(), 
upload_id),
+            OssWriter::new(self.core.clone(), &path, args),
         ))
     }
 
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 3c56226c..095ba6e4 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -18,7 +18,7 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
 use http::StatusCode;
 
 use super::core::*;
@@ -33,24 +33,33 @@ pub struct OssWriter {
     op: OpWrite,
     path: String,
     upload_id: Option<String>,
+
     parts: Vec<MultipartUploadPart>,
+    buffer: oio::VectorCursor,
+    buffer_size: usize,
 }
 
 impl OssWriter {
-    pub fn new(core: Arc<OssCore>, op: OpWrite, path: String, upload_id: 
Option<String>) -> Self {
+    pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
         OssWriter {
             core,
+            path: path.to_string(),
             op,
-            path,
-            upload_id,
+
+            upload_id: None,
             parts: vec![],
+            buffer: oio::VectorCursor::new(),
+            // The part size must be 5 MiB to 5 GiB. There is no minimum
+            // size limit on the last part of your multipart upload.
+            //
+            // We pick the default value as 8 MiB for better thoughput.
+            //
+            // TODO: allow this value to be configured.
+            buffer_size: 8 * 1024 * 1024,
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for OssWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
         let mut req = self.core.oss_put_object_request(
             &self.path,
             Some(bs.len()),
@@ -76,10 +85,20 @@ impl oio::Write for OssWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let upload_id = self.upload_id.as_ref().expect(
-            "Writer doesn't have upload id, but users trying to call append, 
must be buggy",
-        );
+    async fn initiate_upload(&self) -> Result<String> {
+        let resp = self.core.oss_initiate_upload(&self.path, &self.op).await?;
+        match resp.status() {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+                let result: InitiateMultipartUploadResult =
+                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+                Ok(result.upload_id)
+            }
+            _ => return Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(&self, upload_id: &str, bs: Bytes) -> 
Result<MultipartUploadPart> {
         // Aliyun OSS requires part number must between [1..=10000]
         let part_number = self.parts.len() + 1;
         let mut req = self
@@ -108,13 +127,62 @@ impl oio::Write for OssWriter {
                     })?
                     .to_string();
                 resp.into_body().consume().await?;
-                self.parts.push(MultipartUploadPart { part_number, etag });
-                Ok(())
+                Ok(MultipartUploadPart { part_number, etag })
             }
             _ => Err(parse_error(resp).await?),
         }
     }
+}
+
+#[async_trait]
+impl oio::Write for OssWriter {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let upload_id = match &self.upload_id {
+            Some(upload_id) => upload_id,
+            None => {
+                if self.op.content_length().unwrap_or_default() == bs.len() as 
u64 {
+                    return self.write_oneshot(bs).await;
+                } else {
+                    let upload_id = self.initiate_upload().await?;
+                    self.upload_id = Some(upload_id.clone());
+                    self.upload_id.as_deref().unwrap()
+                }
+            }
+        };
+
+        // Ignore empty bytes
+        if bs.len() == 0 {
+            return Ok(());
+        }
+
+        self.buffer.push(bs);
+        // Return directly if the buffer is not full
+        if self.buffer.len() <= self.buffer_size {
+            return Ok(());
+        }
+
+        let bs = self.buffer.peak(self.buffer_size);
+
+        match self.write_part(upload_id, bs).await {
+            Ok(part) => {
+                self.buffer.take(self.buffer_size);
+                self.parts.push(part);
+                Ok(())
+            }
+            Err(e) => {
+                // If the upload fails, we should pop the given bs to make sure
+                // write is re-enter safe.
+                self.buffer.pop();
+                Err(e)
+            }
+        }
+    }
+
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        todo!()
+    }
 
+    // TODO: we can cancel the upload by sending an abort request.
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index da2f1181..7eb7b1f9 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -959,39 +959,9 @@ impl Accessor for S3Backend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let upload_id = if args.append() {
-            let resp = self
-                .core
-                .s3_initiate_multipart_upload(
-                    path,
-                    args.content_type(),
-                    args.content_disposition(),
-                    args.cache_control(),
-                    args.if_match(),
-                )
-                .await?;
-
-            let status = resp.status();
-
-            match status {
-                StatusCode::OK => {
-                    let bs = resp.into_body().bytes().await?;
-
-                    let result: InitiateMultipartUploadResult =
-                        quick_xml::de::from_reader(bs.reader())
-                            .map_err(new_xml_deserialize_error)?;
-
-                    Some(result.upload_id)
-                }
-                _ => return Err(parse_error(resp).await?),
-            }
-        } else {
-            None
-        };
-
         Ok((
             RpWrite::default(),
-            S3Writer::new(self.core.clone(), args, path.to_string(), 
upload_id),
+            S3Writer::new(self.core.clone(), &path, args),
         ))
     }
 
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index c618b87d..4f1c36a2 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -489,7 +489,6 @@ impl S3Core {
         content_type: Option<&str>,
         content_disposition: Option<&str>,
         cache_control: Option<&str>,
-        if_match: Option<&str>,
     ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
 
@@ -509,10 +508,6 @@ impl S3Core {
             req = req.header(CACHE_CONTROL, cache_control)
         }
 
-        if let Some(if_match) = if_match {
-            req = req.header(IF_MATCH, if_match)
-        }
-
         // Set storage class header
         if let Some(v) = &self.default_storage_class {
             req = 
req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index d75906f9..5887cafc 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -18,7 +18,7 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
 use http::StatusCode;
 
 use super::core::*;
@@ -32,32 +32,34 @@ pub struct S3Writer {
 
     op: OpWrite,
     path: String,
-
     upload_id: Option<String>,
+
     parts: Vec<CompleteMultipartUploadRequestPart>,
+    buffer: oio::VectorCursor,
+    buffer_size: usize,
 }
 
 impl S3Writer {
-    pub fn new(core: Arc<S3Core>, op: OpWrite, path: String, upload_id: 
Option<String>) -> Self {
+    pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
         S3Writer {
             core,
-
+            path: path.to_string(),
             op,
-            path,
-            upload_id,
+
+            upload_id: None,
             parts: vec![],
+            buffer: oio::VectorCursor::new(),
+            // The part size must be 5 MiB to 5 GiB. There is no minimum
+            // size limit on the last part of your multipart upload.
+            //
+            // We pick the default value as 8 MiB for better thoughput.
+            //
+            // TODO: allow this value to be configured.
+            buffer_size: 8 * 1024 * 1024,
         }
     }
-}
-
-#[async_trait]
-impl oio::Write for S3Writer {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        debug_assert!(
-            self.upload_id.is_none(),
-            "Writer initiated with upload id, but users trying to call write, 
must be buggy"
-        );
 
+    async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
         let mut req = self.core.s3_put_object_request(
             &self.path,
             Some(bs.len()),
@@ -82,10 +84,37 @@ impl oio::Write for S3Writer {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let upload_id = self.upload_id.as_ref().expect(
-            "Writer doesn't have upload id, but users trying to call append, 
must be buggy",
-        );
+    async fn initiate_upload(&self) -> Result<String> {
+        let resp = self
+            .core
+            .s3_initiate_multipart_upload(
+                &self.path,
+                self.op.content_type(),
+                self.op.content_disposition(),
+                self.op.cache_control(),
+            )
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let result: InitiateMultipartUploadResult =
+                    
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+
+                Ok(result.upload_id)
+            }
+            _ => return Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(
+        &self,
+        upload_id: &str,
+        bs: Bytes,
+    ) -> Result<CompleteMultipartUploadRequestPart> {
         // AWS S3 requires part number must between [1..=10000]
         let part_number = self.parts.len() + 1;
 
@@ -116,15 +145,61 @@ impl oio::Write for S3Writer {
 
                 resp.into_body().consume().await?;
 
-                self.parts
-                    .push(CompleteMultipartUploadRequestPart { part_number, 
etag });
+                Ok(CompleteMultipartUploadRequestPart { part_number, etag })
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Write for S3Writer {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let upload_id = match &self.upload_id {
+            Some(upload_id) => upload_id,
+            None => {
+                if self.op.content_length().unwrap_or_default() == bs.len() as 
u64 {
+                    return self.write_oneshot(bs).await;
+                } else {
+                    let upload_id = self.initiate_upload().await?;
+                    self.upload_id = Some(upload_id.clone());
+                    self.upload_id.as_deref().unwrap()
+                }
+            }
+        };
+
+        // Ignore empty bytes
+        if bs.len() == 0 {
+            return Ok(());
+        }
+
+        self.buffer.push(bs);
+        // Return directly if the buffer is not full
+        if self.buffer.len() <= self.buffer_size {
+            return Ok(());
+        }
+
+        let bs = self.buffer.peak(self.buffer_size);
 
+        match self.write_part(upload_id, bs).await {
+            Ok(part) => {
+                self.buffer.take(self.buffer_size);
+                self.parts.push(part);
                 Ok(())
             }
-            _ => Err(parse_error(resp).await?),
+            Err(e) => {
+                // If the upload fails, we should pop the given bs to make sure
+                // write is re-enter safe.
+                self.buffer.pop();
+                Err(e)
+            }
         }
     }
 
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        todo!()
+    }
+
     async fn abort(&mut self) -> Result<()> {
         let upload_id = if let Some(upload_id) = &self.upload_id {
             upload_id
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index 23032855..2f872229 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -300,10 +300,10 @@ impl Accessor for WebdavBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 094bef24..3a8bb7c2 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -519,10 +519,10 @@ impl Accessor for WebhdfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
+        if args.content_length().is_none() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "append write is not supported",
+                "write without content length is not supported",
             ));
         }
 
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index c6bae2dd..a985474d 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -443,7 +443,12 @@ impl BlockingOperator {
     /// # }
     /// ```
     pub fn write(&self, path: &str, bs: impl Into<Bytes>) -> Result<()> {
-        self.write_with(path, OpWrite::new(), bs)
+        let bs = bs.into();
+        self.write_with(
+            path,
+            OpWrite::new().with_content_length(bs.len() as u64),
+            bs,
+        )
     }
 
     /// Copy a file from `from` to `to`.
@@ -594,8 +599,11 @@ impl BlockingOperator {
             );
         }
 
-        let (_, mut w) = self.inner().blocking_write(&path, args)?;
-        w.write(bs.into())?;
+        let bs = bs.into();
+        let (_, mut w) = self
+            .inner()
+            .blocking_write(&path, args.with_content_length(bs.len() as u64))?;
+        w.write(bs)?;
         w.close()?;
 
         Ok(())
@@ -636,8 +644,8 @@ impl BlockingOperator {
             );
         }
 
-        let op = OpWrite::default().with_append();
-        BlockingWriter::create_dir(self.inner().clone(), &path, op)
+        let op = OpWrite::default();
+        BlockingWriter::create(self.inner().clone(), &path, op)
     }
 
     /// Delete given path.
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index 8156dcc8..0d46ebad 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -631,7 +631,13 @@ impl Operator {
     /// # }
     /// ```
     pub async fn write(&self, path: &str, bs: impl Into<Bytes>) -> Result<()> {
-        self.write_with(path, OpWrite::new(), bs).await
+        let bs = bs.into();
+        self.write_with(
+            path,
+            OpWrite::new().with_content_length(bs.len() as u64),
+            bs,
+        )
+        .await
     }
 
     /// Copy a file from `from` to `to`.
@@ -817,7 +823,7 @@ impl Operator {
             );
         }
 
-        Writer::create_dir(self.inner().clone(), &path, 
args.with_append()).await
+        Writer::create(self.inner().clone(), &path, args).await
     }
 
     /// Write data with extra options.
@@ -854,8 +860,12 @@ impl Operator {
             );
         }
 
-        let (_, mut w) = self.inner().write(&path, args).await?;
-        w.write(bs.into()).await?;
+        let bs = bs.into();
+        let (_, mut w) = self
+            .inner()
+            .write(&path, args.with_content_length(bs.len() as u64))
+            .await?;
+        w.write(bs).await?;
         w.close().await?;
 
         Ok(())
diff --git a/core/src/types/ops.rs b/core/src/types/ops.rs
index 616316a5..63180d5a 100644
--- a/core/src/types/ops.rs
+++ b/core/src/types/ops.rs
@@ -320,12 +320,10 @@ impl OpStat {
 /// Args for `write` operation.
 #[derive(Debug, Clone, Default)]
 pub struct OpWrite {
-    append: bool,
-
+    content_length: Option<u64>,
     content_type: Option<String>,
     content_disposition: Option<String>,
     cache_control: Option<String>,
-    if_match: Option<String>,
 }
 
 impl OpWrite {
@@ -336,13 +334,20 @@ impl OpWrite {
         Self::default()
     }
 
-    pub(crate) fn with_append(mut self) -> Self {
-        self.append = true;
-        self
+    /// Get the content length from op.
+    ///
+    /// The content length is the total length of the data to be written.
+    pub fn content_length(&self) -> Option<u64> {
+        self.content_length
     }
 
-    pub(crate) fn append(&self) -> bool {
-        self.append
+    /// Set the content length of op.
+    ///
+    /// If the content length is not set, the content length will be
+    /// calculated automatically by buffering part of data.
+    pub fn with_content_length(mut self, content_length: u64) -> Self {
+        self.content_length = Some(content_length);
+        self
     }
 
     /// Get the content type from option
@@ -377,17 +382,6 @@ impl OpWrite {
         self.cache_control = Some(cache_control.to_string());
         self
     }
-
-    /// Set the If-Match of the option
-    pub fn with_if_match(mut self, if_match: &str) -> Self {
-        self.if_match = Some(if_match.to_string());
-        self
-    }
-
-    /// Get If-Match from option
-    pub fn if_match(&self) -> Option<&str> {
-        self.if_match.as_deref()
-    }
 }
 
 /// Args for `copy` operation.
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index dc4d2bfd..3705f4d6 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -52,7 +52,7 @@ impl Writer {
     ///
     /// We don't want to expose those details to users so keep this function
     /// in crate only.
-    pub(crate) async fn create_dir(acc: FusedAccessor, path: &str, op: 
OpWrite) -> Result<Self> {
+    pub(crate) async fn create(acc: FusedAccessor, path: &str, op: OpWrite) -> 
Result<Self> {
         let (_, w) = acc.write(path, op).await?;
 
         Ok(Writer {
@@ -198,7 +198,7 @@ impl BlockingWriter {
     ///
     /// We don't want to expose those details to users so keep this function
     /// in crate only.
-    pub(crate) fn create_dir(acc: FusedAccessor, path: &str, op: OpWrite) -> 
Result<Self> {
+    pub(crate) fn create(acc: FusedAccessor, path: &str, op: OpWrite) -> 
Result<Self> {
         let (_, w) = acc.blocking_write(path, op)?;
 
         Ok(BlockingWriter { inner: w })


Reply via email to