This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 9535d0222 refactor: Polish RangeWrite implementation to remove the 
extra buffer logic (#3038)
9535d0222 is described below

commit 9535d02227f50449073f2b5acd2f9169e71f5a23
Author: Xuanwo <[email protected]>
AuthorDate: Tue Sep 12 20:29:22 2023 +0800

    refactor: Polish RangeWrite implementation to remove the extra buffer logic 
(#3038)
    
    * Build pass
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Save code
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix panic
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix gcs
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Remove chunked cursor
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/http_util/body.rs                   |   4 +
 core/src/raw/http_util/client.rs                 |   1 +
 core/src/raw/http_util/multipart.rs              |   1 +
 core/src/raw/oio/buf/chunked_bytes.rs            |  22 +-
 core/src/raw/oio/cursor.rs                       | 378 -----------------------
 core/src/raw/oio/mod.rs                          |   1 -
 core/src/raw/oio/write/multipart_upload_write.rs |  17 +-
 core/src/raw/oio/write/range_write.rs            | 151 +++++----
 core/src/services/gcs/backend.rs                 |  14 +-
 core/src/services/gcs/core.rs                    |   3 +
 core/src/services/gcs/writer.rs                  |  15 +-
 11 files changed, 126 insertions(+), 481 deletions(-)

diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs
index 7583f4437..474f74897 100644
--- a/core/src/raw/http_util/body.rs
+++ b/core/src/raw/http_util/body.rs
@@ -40,6 +40,10 @@ pub enum AsyncBody {
     Empty,
     /// Body with bytes.
     Bytes(Bytes),
+    /// Body with chunked bytes.
+    ///
+    /// This is nearly the same with stream, but we can save an extra box.
+    ChunkedBytes(oio::ChunkedBytes),
     /// Body with stream.
     Stream(oio::Streamer),
 }
diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index d0926d1cd..783eaf8fb 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -101,6 +101,7 @@ impl HttpClient {
         req_builder = match body {
             AsyncBody::Empty => req_builder.body(reqwest::Body::from("")),
             AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)),
+            AsyncBody::ChunkedBytes(bs) => 
req_builder.body(reqwest::Body::wrap_stream(bs)),
             AsyncBody::Stream(s) => 
req_builder.body(reqwest::Body::wrap_stream(s)),
         };
 
diff --git a/core/src/raw/http_util/multipart.rs 
b/core/src/raw/http_util/multipart.rs
index edc061c1f..a6c6f7652 100644
--- a/core/src/raw/http_util/multipart.rs
+++ b/core/src/raw/http_util/multipart.rs
@@ -410,6 +410,7 @@ impl MixedPart {
                 bs.len() as u64,
                 Some(Box::new(oio::Cursor::from(bs)) as Streamer),
             ),
+            AsyncBody::ChunkedBytes(bs) => (bs.len() as u64, Some(Box::new(bs) 
as Streamer)),
             AsyncBody::Stream(stream) => {
                 let len = parts
                     .headers
diff --git a/core/src/raw/oio/buf/chunked_bytes.rs 
b/core/src/raw/oio/buf/chunked_bytes.rs
index 82447dc62..810c0dfda 100644
--- a/core/src/raw/oio/buf/chunked_bytes.rs
+++ b/core/src/raw/oio/buf/chunked_bytes.rs
@@ -16,9 +16,11 @@
 // under the License.
 
 use bytes::{Bytes, BytesMut};
+use futures::Stream;
 use std::cmp::min;
 use std::collections::VecDeque;
 use std::io::IoSlice;
+use std::pin::Pin;
 use std::task::{Context, Poll};
 
 use crate::raw::*;
@@ -74,9 +76,9 @@ impl ChunkedBytes {
     /// Reference: 
<https://doc.rust-lang.org/stable/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT,+A%3E%3E-for-VecDeque%3CT,+A%3E>
     pub fn from_vec(bs: Vec<Bytes>) -> Self {
         Self {
+            size: bs.iter().map(|v| v.len()).sum(),
             frozen: bs.into(),
             active: BytesMut::new(),
-            size: 0,
 
             chunk_size: DEFAULT_CHUNK_SIZE,
         }
@@ -332,6 +334,24 @@ impl oio::Stream for ChunkedBytes {
     }
 }
 
+impl Stream for ChunkedBytes {
+    type Item = Result<Bytes>;
+
+    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        match self.frozen.pop_front() {
+            Some(bs) => {
+                self.size -= bs.len();
+                Poll::Ready(Some(Ok(bs)))
+            }
+            None if !self.active.is_empty() => {
+                self.size -= self.active.len();
+                Poll::Ready(Some(Ok(self.active.split().freeze())))
+            }
+            None => Poll::Ready(None),
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use log::debug;
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index ddead52c9..0cc4e42a5 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::Ordering;
-use std::collections::VecDeque;
 use std::io::Read;
 use std::io::SeekFrom;
 use std::task::Context;
@@ -174,379 +172,3 @@ impl oio::Stream for Cursor {
         Poll::Ready(Ok(()))
     }
 }
-
-/// ChunkedCursor is used represents a non-contiguous bytes in memory.
-///
-/// This is useful when we buffer users' random writes without copy. 
ChunkedCursor implements
-/// [`oio::Stream`] so it can be used in [`oio::Write::copy_from`] directly.
-///
-/// # TODO
-///
-/// we can do some compaction during runtime. For example, merge 4K data
-/// into the same bytes instead.
-#[derive(Clone)]
-pub struct ChunkedCursor {
-    inner: VecDeque<Bytes>,
-    idx: usize,
-}
-
-impl Default for ChunkedCursor {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl ChunkedCursor {
-    /// Create a new chunked cursor.
-    pub fn new() -> Self {
-        Self {
-            inner: VecDeque::new(),
-            idx: 0,
-        }
-    }
-
-    /// Returns `true` if current cursor is empty.
-    pub fn is_empty(&self) -> bool {
-        self.inner.len() <= self.idx
-    }
-
-    /// Return current bytes size of cursor.
-    pub fn len(&self) -> usize {
-        self.inner.iter().skip(self.idx).map(|v| v.len()).sum()
-    }
-
-    /// Clear the entire cursor.
-    pub fn clear(&mut self) {
-        self.idx = 0;
-        self.inner.clear();
-    }
-
-    /// Push a new bytes into vector cursor.
-    pub fn push(&mut self, bs: Bytes) {
-        self.inner.push_back(bs);
-    }
-
-    /// split_off will split the cursor into two cursors at given size.
-    ///
-    /// After split, `self` will contains the `0..at` part and the returned 
cursor contains
-    /// `at..` parts.
-    ///
-    /// # Panics
-    ///
-    /// - Panics if `at > len`
-    /// - Panics if `idx != 0`, the cursor must be reset before split.
-    pub fn split_off(&mut self, at: usize) -> Self {
-        assert!(
-            at <= self.len(),
-            "split_off at must smaller than current size"
-        );
-        assert_eq!(self.idx, 0, "split_off must reset cursor first");
-
-        let mut chunks = VecDeque::new();
-        let mut size = self.len() - at;
-
-        while let Some(mut bs) = self.inner.pop_back() {
-            match size.cmp(&bs.len()) {
-                Ordering::Less => {
-                    let remaining = bs.split_off(bs.len() - size);
-                    chunks.push_front(remaining);
-                    self.inner.push_back(bs);
-                    break;
-                }
-                Ordering::Equal => {
-                    chunks.push_front(bs);
-                    break;
-                }
-                Ordering::Greater => {
-                    size -= bs.len();
-                    chunks.push_front(bs);
-                }
-            }
-        }
-
-        Self {
-            inner: chunks,
-            idx: 0,
-        }
-    }
-
-    /// split_to will split the cursor into two cursors at given size.
-    ///
-    /// After split, `self` will contains the `at..` part and the returned 
cursor contains
-    /// `0..at` parts.
-    ///
-    /// # Panics
-    ///
-    /// - Panics if `at > len`
-    /// - Panics if `idx != 0`, the cursor must be reset before split.
-    pub fn split_to(&mut self, at: usize) -> Self {
-        assert!(
-            at <= self.len(),
-            "split_to at must smaller than current size"
-        );
-        assert_eq!(self.idx, 0, "split_to must reset cursor first");
-
-        let mut chunks = VecDeque::new();
-        let mut size = at;
-
-        while let Some(mut bs) = self.inner.pop_front() {
-            match size.cmp(&bs.len()) {
-                Ordering::Less => {
-                    let remaining = bs.split_off(size);
-                    chunks.push_back(bs);
-                    self.inner.push_front(remaining);
-                    break;
-                }
-                Ordering::Equal => {
-                    chunks.push_back(bs);
-                    break;
-                }
-                Ordering::Greater => {
-                    size -= bs.len();
-                    chunks.push_back(bs);
-                }
-            }
-        }
-
-        Self {
-            inner: chunks,
-            idx: 0,
-        }
-    }
-
-    #[cfg(test)]
-    fn concat(&self) -> Bytes {
-        let mut bs = bytes::BytesMut::new();
-        for v in self.inner.iter().skip(self.idx) {
-            bs.extend_from_slice(v);
-        }
-        bs.freeze()
-    }
-}
-
-impl oio::Stream for ChunkedCursor {
-    fn poll_next(&mut self, _: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
-        if self.is_empty() {
-            return Poll::Ready(None);
-        }
-
-        let bs = self.inner[self.idx].clone();
-        self.idx += 1;
-        Poll::Ready(Some(Ok(bs)))
-    }
-
-    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        self.idx = 0;
-        Poll::Ready(Ok(()))
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use pretty_assertions::assert_eq;
-    use rand::thread_rng;
-    use rand::Rng;
-    use rand::RngCore;
-    use sha2::Digest;
-    use sha2::Sha256;
-
-    use super::*;
-    use crate::raw::oio::StreamExt;
-
-    #[tokio::test]
-    async fn test_chunked_cursor() -> Result<()> {
-        let mut c = ChunkedCursor::new();
-
-        c.push(Bytes::from("hello"));
-        assert_eq!(c.len(), 5);
-        assert!(!c.is_empty());
-
-        c.push(Bytes::from("world"));
-        assert_eq!(c.len(), 10);
-        assert!(!c.is_empty());
-
-        let bs = c.next().await.unwrap().unwrap();
-        assert_eq!(bs, Bytes::from("hello"));
-        assert_eq!(c.len(), 5);
-        assert!(!c.is_empty());
-
-        let bs = c.next().await.unwrap().unwrap();
-        assert_eq!(bs, Bytes::from("world"));
-        assert_eq!(c.len(), 0);
-        assert!(c.is_empty());
-
-        c.reset().await?;
-        assert_eq!(c.len(), 10);
-        assert!(!c.is_empty());
-
-        c.clear();
-        assert_eq!(c.len(), 0);
-        assert!(c.is_empty());
-
-        Ok(())
-    }
-
-    #[test]
-    fn test_chunked_cursor_split_to() {
-        let mut base = ChunkedCursor::new();
-        base.push(Bytes::from("Hello"));
-        base.push(Bytes::from("Wor"));
-        base.push(Bytes::from("ld"));
-
-        // Case 1: split less than first chunk
-        let mut c1 = base.clone();
-        let c2 = c1.split_to(3);
-
-        assert_eq!(c1.len(), 7);
-        assert_eq!(
-            &c1.inner,
-            &[Bytes::from("lo"), Bytes::from("Wor"), Bytes::from("ld")]
-        );
-
-        assert_eq!(c2.len(), 3);
-        assert_eq!(&c2.inner, &[Bytes::from("Hel")]);
-
-        // Case 2: split larger than first chunk
-        let mut c1 = base.clone();
-        let c2 = c1.split_to(6);
-
-        assert_eq!(c1.len(), 4);
-        assert_eq!(&c1.inner, &[Bytes::from("or"), Bytes::from("ld")]);
-
-        assert_eq!(c2.len(), 6);
-        assert_eq!(&c2.inner, &[Bytes::from("Hello"), Bytes::from("W")]);
-
-        // Case 3: split at chunk edge
-        let mut c1 = base.clone();
-        let c2 = c1.split_to(8);
-
-        assert_eq!(c1.len(), 2);
-        assert_eq!(&c1.inner, &[Bytes::from("ld")]);
-
-        assert_eq!(c2.len(), 8);
-        assert_eq!(&c2.inner, &[Bytes::from("Hello"), Bytes::from("Wor")]);
-    }
-
-    #[test]
-    fn test_chunked_cursor_split_off() {
-        let mut base = ChunkedCursor::new();
-        base.push(Bytes::from("Hello"));
-        base.push(Bytes::from("Wor"));
-        base.push(Bytes::from("ld"));
-
-        // Case 1: split less than first chunk
-        let mut c1 = base.clone();
-        let c2 = c1.split_off(3);
-
-        assert_eq!(c1.len(), 3);
-        assert_eq!(&c1.inner, &[Bytes::from("Hel")]);
-
-        assert_eq!(c2.len(), 7);
-        assert_eq!(
-            &c2.inner,
-            &[Bytes::from("lo"), Bytes::from("Wor"), Bytes::from("ld")]
-        );
-
-        // Case 2: split larger than first chunk
-        let mut c1 = base.clone();
-        let c2 = c1.split_off(6);
-
-        assert_eq!(c1.len(), 6);
-        assert_eq!(&c1.inner, &[Bytes::from("Hello"), Bytes::from("W")]);
-
-        assert_eq!(c2.len(), 4);
-        assert_eq!(&c2.inner, &[Bytes::from("or"), Bytes::from("ld")]);
-
-        // Case 3: split at chunk edge
-        let mut c1 = base.clone();
-        let c2 = c1.split_off(8);
-
-        assert_eq!(c1.len(), 8);
-        assert_eq!(&c1.inner, &[Bytes::from("Hello"), Bytes::from("Wor")]);
-
-        assert_eq!(c2.len(), 2);
-        assert_eq!(&c2.inner, &[Bytes::from("ld")]);
-    }
-
-    #[test]
-    fn test_fuzz_chunked_cursor_split_to() {
-        let mut rng = thread_rng();
-        let mut expected = vec![];
-        let mut total_size = 0;
-
-        let mut cursor = ChunkedCursor::new();
-
-        // Build Cursor
-        let count = rng.gen_range(1..1000);
-        for _ in 0..count {
-            let size = rng.gen_range(1..100);
-            let mut content = vec![0; size];
-            rng.fill_bytes(&mut content);
-            total_size += size;
-
-            expected.extend_from_slice(&content);
-            cursor.push(Bytes::from(content));
-        }
-
-        // Test Cursor
-        for _ in 0..count {
-            let mut cursor = cursor.clone();
-
-            let at = rng.gen_range(0..total_size);
-            let to = cursor.split_to(at);
-
-            assert_eq!(cursor.len(), total_size - at);
-            assert_eq!(
-                format!("{:x}", Sha256::digest(&cursor.concat())),
-                format!("{:x}", Sha256::digest(&expected[at..])),
-            );
-
-            assert_eq!(to.len(), at);
-            assert_eq!(
-                format!("{:x}", Sha256::digest(&to.concat())),
-                format!("{:x}", Sha256::digest(&expected[0..at])),
-            );
-        }
-    }
-
-    #[test]
-    fn test_fuzz_chunked_cursor_split_off() {
-        let mut rng = thread_rng();
-        let mut expected = vec![];
-        let mut total_size = 0;
-
-        let mut cursor = ChunkedCursor::new();
-
-        // Build Cursor
-        let count = rng.gen_range(1..1000);
-        for _ in 0..count {
-            let size = rng.gen_range(1..100);
-            let mut content = vec![0; size];
-            rng.fill_bytes(&mut content);
-            total_size += size;
-
-            expected.extend_from_slice(&content);
-            cursor.push(Bytes::from(content));
-        }
-
-        // Test Cursor
-        for _ in 0..count {
-            let mut cursor = cursor.clone();
-
-            let at = rng.gen_range(0..total_size);
-            let off = cursor.split_off(at);
-
-            assert_eq!(cursor.len(), at);
-            assert_eq!(
-                format!("{:x}", Sha256::digest(&cursor.concat())),
-                format!("{:x}", Sha256::digest(&expected[..at])),
-            );
-
-            assert_eq!(off.len(), total_size - at);
-            assert_eq!(
-                format!("{:x}", Sha256::digest(&off.concat())),
-                format!("{:x}", Sha256::digest(&expected[at..])),
-            );
-        }
-    }
-}
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index 29cf8e474..9e52acfd2 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -35,7 +35,6 @@ mod page;
 pub use page::*;
 
 mod cursor;
-pub use cursor::ChunkedCursor;
 pub use cursor::Cursor;
 
 mod entry;
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index ec42275f0..6edeb96a8 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -21,7 +21,6 @@ use std::task::Context;
 use std::task::Poll;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use futures::future::BoxFuture;
 
 use crate::raw::*;
@@ -109,7 +108,7 @@ pub struct MultipartUploadPart {
 pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
     state: State<W>,
 
-    cache: Option<Bytes>,
+    cache: Option<oio::ChunkedBytes>,
     upload_id: Option<Arc<String>>,
     parts: Vec<MultipartUploadPart>,
 }
@@ -163,7 +162,7 @@ where
                                         &upload_id,
                                         part_number,
                                         size as u64,
-                                        AsyncBody::Bytes(bs),
+                                        AsyncBody::ChunkedBytes(bs),
                                     )
                                     .await;
 
@@ -174,7 +173,8 @@ where
                             // Fill cache with the first write.
                             if self.cache.is_none() {
                                 let size = bs.remaining();
-                                self.cache = Some(bs.bytes(size));
+                                let cb = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+                                self.cache = Some(cb);
                                 return Poll::Ready(Ok(size));
                             }
 
@@ -197,7 +197,8 @@ where
                     self.parts.push(part?);
                     // Replace the cache when last write succeeded
                     let size = bs.remaining();
-                    self.cache = Some(bs.bytes(size));
+                    let cb = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+                    self.cache = Some(cb);
                     return Poll::Ready(Ok(size));
                 }
                 State::Close(_) => {
@@ -232,7 +233,7 @@ where
                                                 &upload_id,
                                                 parts.len(),
                                                 size as u64,
-                                                AsyncBody::Bytes(bs),
+                                                AsyncBody::ChunkedBytes(bs),
                                             )
                                             .await;
                                         (w, part)
@@ -250,7 +251,9 @@ where
                             Some(bs) => {
                                 self.state = State::Close(Box::pin(async move {
                                     let size = bs.len();
-                                    let res = w.write_once(size as u64, 
AsyncBody::Bytes(bs)).await;
+                                    let res = w
+                                        .write_once(size as u64, 
AsyncBody::ChunkedBytes(bs))
+                                        .await;
                                     (w, res)
                                 }));
                             }
diff --git a/core/src/raw/oio/write/range_write.rs 
b/core/src/raw/oio/write/range_write.rs
index 9dfe73197..9ae017a5e 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -50,6 +50,13 @@ use crate::*;
 /// - Expose `RangeWriter` as `Accessor::Writer`
 #[async_trait]
 pub trait RangeWrite: Send + Sync + Unpin + 'static {
+    /// write_once is used to write the data to underlying storage at once.
+    ///
+    /// RangeWriter will call this API when:
+    ///
+    /// - All the data has been written to the buffer and we can perform the 
upload at once.
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>;
+
     /// Initiate range the range write, the returning value is the location.
     async fn initiate_range(&self) -> Result<String>;
 
@@ -79,8 +86,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
 pub struct RangeWriter<W: RangeWrite> {
     location: Option<String>,
     written: u64,
-    align_size: usize,
-    align_buffer: oio::ChunkedCursor,
+    buffer: Option<oio::ChunkedBytes>,
 
     state: State<W>,
 }
@@ -88,8 +94,7 @@ pub struct RangeWriter<W: RangeWrite> {
 enum State<W> {
     Idle(Option<W>),
     Init(BoxFuture<'static, (W, Result<String>)>),
-    /// The returning value is (consume size, written size).
-    Write(BoxFuture<'static, (W, Result<(usize, u64)>)>),
+    Write(BoxFuture<'static, (W, Result<u64>)>),
     Complete(BoxFuture<'static, (W, Result<()>)>),
     Abort(BoxFuture<'static, (W, Result<()>)>),
 }
@@ -105,25 +110,11 @@ impl<W: RangeWrite> RangeWriter<W> {
         Self {
             state: State::Idle(Some(inner)),
 
+            buffer: None,
             location: None,
             written: 0,
-            align_size: 256 * 1024,
-            align_buffer: oio::ChunkedCursor::default(),
         }
     }
-
-    /// Set the align size.
-    ///
-    /// The size is default to 256 KiB.
-    ///
-    /// # Note
-    ///
-    /// Please don't mix this with the buffer size. Align size is usually the 
hard
-    /// limit for the service to accept the chunk.
-    pub fn with_align_size(mut self, size: usize) -> Self {
-        self.align_size = size;
-        self
-    }
 }
 
 impl<W: RangeWrite> oio::Write for RangeWriter<W> {
@@ -133,51 +124,38 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
                 State::Idle(w) => {
                     match self.location.clone() {
                         Some(location) => {
-                            let remaining = bs.remaining();
-                            let current_size = self.align_buffer.len();
-                            let mut total_size = current_size + remaining;
-
-                            if total_size <= self.align_size {
-                                let bs = bs.bytes(remaining);
-                                self.align_buffer.push(bs);
-                                return Poll::Ready(Ok(remaining));
-                            }
-                            // If total_size is aligned, we need to write one 
less chunk to make sure
-                            // that the file has at least one chunk during 
complete stage.
-                            if total_size % self.align_size == 0 {
-                                total_size -= self.align_size;
-                            }
-
-                            let consume = total_size - total_size % 
self.align_size - current_size;
-                            let mut align_buffer = self.align_buffer.clone();
-                            let bs = bs.bytes(consume);
-                            align_buffer.push(bs);
-
                             let written = self.written;
-                            let w = w.take().unwrap();
-                            let fut = async move {
-                                let size = align_buffer.len() as u64;
+
+                            let buffer = self.buffer.clone().expect("cache 
must be valid").clone();
+                            let w = w.take().expect("writer must be valid");
+                            self.state = State::Write(Box::pin(async move {
+                                let size = buffer.len() as u64;
                                 let res = w
                                     .write_range(
                                         &location,
                                         written,
                                         size,
-                                        
AsyncBody::Stream(Box::new(align_buffer)),
+                                        AsyncBody::ChunkedBytes(buffer),
                                     )
                                     .await;
 
-                                (w, res.map(|_| (consume, size)))
-                            };
-                            self.state = State::Write(Box::pin(fut));
+                                (w, res.map(|_| size))
+                            }));
                         }
                         None => {
-                            let w = w.take().unwrap();
-                            let fut = async move {
-                                let res = w.initiate_range().await;
+                            // Fill cache with the first write.
+                            if self.buffer.is_none() {
+                                let size = bs.remaining();
+                                let cb = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+                                self.buffer = Some(cb);
+                                return Poll::Ready(Ok(size));
+                            }
 
-                                (w, res)
-                            };
-                            self.state = State::Init(Box::pin(fut));
+                            let w = w.take().expect("writer must be valid");
+                            self.state = State::Init(Box::pin(async move {
+                                let location = w.initiate_range().await;
+                                (w, location)
+                            }));
                         }
                     }
                 }
@@ -187,15 +165,16 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
                     self.location = Some(res?);
                 }
                 State::Write(fut) => {
-                    let (w, res) = ready!(fut.poll_unpin(cx));
+                    let (w, size) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(w));
-                    let (consume, written) = res?;
-                    self.written += written;
-                    self.align_buffer.clear();
-                    // It's possible that the buffer is already aligned, no 
bytes has been consumed.
-                    if consume != 0 {
-                        return Poll::Ready(Ok(consume));
-                    }
+                    // Update the written.
+                    self.written += size?;
+
+                    // Replace the cache when last write succeeded
+                    let size = bs.remaining();
+                    let cb = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+                    self.buffer = Some(cb);
+                    return Poll::Ready(Ok(size));
                 }
                 State::Complete(_) => {
                     unreachable!("RangeWriter must not go into State::Complete 
during poll_write")
@@ -211,28 +190,41 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
         loop {
             match &mut self.state {
                 State::Idle(w) => {
-                    let w = w.take().unwrap();
+                    let w = w.take().expect("writer must be valid");
                     match self.location.clone() {
                         Some(location) => {
-                            let align_buffer = self.align_buffer.clone();
-
                             let written = self.written;
-                            let fut = async move {
-                                let size = align_buffer.len() as u64;
-                                let res = w
-                                    .complete_range(
-                                        &location,
-                                        written,
-                                        size,
-                                        
AsyncBody::Stream(Box::new(align_buffer)),
-                                    )
-                                    .await;
-
-                                (w, res)
-                            };
-                            self.state = State::Complete(Box::pin(fut));
+                            match self.buffer.clone() {
+                                Some(bs) => {
+                                    self.state = 
State::Complete(Box::pin(async move {
+                                        let res = w
+                                            .complete_range(
+                                                &location,
+                                                written,
+                                                bs.len() as u64,
+                                                AsyncBody::ChunkedBytes(bs),
+                                            )
+                                            .await;
+                                        (w, res)
+                                    }));
+                                }
+                                None => {
+                                    unreachable!("It's must be bug that 
RangeWrite is in State::Idle with no cache but has location")
+                                }
+                            }
                         }
-                        None => return Poll::Ready(Ok(())),
+                        None => match self.buffer.clone() {
+                            Some(bs) => {
+                                self.state = State::Complete(Box::pin(async 
move {
+                                    let size = bs.len();
+                                    let res = w
+                                        .write_once(size as u64, 
AsyncBody::ChunkedBytes(bs))
+                                        .await;
+                                    (w, res)
+                                }));
+                            }
+                            None => return Poll::Ready(Ok(())),
+                        },
                     }
                 }
                 State::Init(_) => {
@@ -244,7 +236,6 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
                 State::Complete(fut) => {
                     let (w, res) = ready!(fut.poll_unpin(cx));
                     self.state = State::Idle(Some(w));
-                    self.align_buffer.clear();
                     return Poll::Ready(res);
                 }
                 State::Abort(_) => {
@@ -283,7 +274,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
                 State::Abort(fut) => {
                     let (w, res) = ready!(fut.poll_unpin(cx));
                     self.state = State::Idle(Some(w));
-                    self.align_buffer.clear();
+                    self.buffer = None;
                     return Poll::Ready(res);
                 }
             }
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index f91c792a0..9e26cdce1 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp::max;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
@@ -337,7 +338,7 @@ pub struct GcsBackend {
 impl Accessor for GcsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = GcsWriters;
+    type Writer = oio::TwoWaysWriter<GcsWriters, 
oio::ExactBufWriter<GcsWriters>>;
     type BlockingWriter = ();
     type Pager = GcsPager;
     type BlockingPager = ();
@@ -420,10 +421,15 @@ impl Accessor for GcsBackend {
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let w = GcsWriter::new(self.core.clone(), path, args.clone());
-        let w = if args.content_length().is_some() {
-            GcsWriters::One(oio::OneShotWriter::new(w))
+        let w = oio::RangeWriter::new(w);
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            // FIXME: we should align with 256KiB instead.
+            let buffer_size = max(DEFAULT_WRITE_FIXED_SIZE, buffer_size);
+
+            oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
         } else {
-            GcsWriters::Two(oio::RangeWriter::new(w))
+            oio::TwoWaysWriter::One(w)
         };
 
         Ok((RpWrite::default(), w))
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 06d772e06..40acf42bc 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -299,6 +299,9 @@ impl GcsCore {
                 AsyncBody::Bytes(bytes) => {
                     media_part = media_part.content(bytes);
                 }
+                AsyncBody::ChunkedBytes(bs) => {
+                    media_part = media_part.stream(bs.len() as u64, 
Box::new(bs));
+                }
                 AsyncBody::Stream(stream) => {
                     media_part = media_part.stream(size.unwrap(), stream);
                 }
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 1b22acd7b..d55d6d7a9 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::GcsCore;
@@ -26,8 +25,7 @@ use super::error::parse_error;
 use crate::raw::*;
 use crate::*;
 
-pub type GcsWriters =
-    oio::TwoWaysWriter<oio::OneShotWriter<GcsWriter>, 
oio::RangeWriter<GcsWriter>>;
+pub type GcsWriters = oio::RangeWriter<GcsWriter>;
 
 pub struct GcsWriter {
     core: Arc<GcsCore>,
@@ -46,13 +44,13 @@ impl GcsWriter {
 }
 
 #[async_trait]
-impl oio::OneShotWrite for GcsWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+impl oio::RangeWrite for GcsWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
         let mut req = self.core.gcs_insert_object_request(
             &percent_encode_path(&self.path),
-            Some(bs.len() as u64),
+            Some(size),
             &self.op,
-            AsyncBody::Bytes(bs),
+            body,
         )?;
 
         self.core.sign(&mut req).await?;
@@ -69,10 +67,7 @@ impl oio::OneShotWrite for GcsWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
-}
 
-#[async_trait]
-impl oio::RangeWrite for GcsWriter {
     async fn initiate_range(&self) -> Result<String> {
         let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?;
         let status = resp.status();


Reply via email to