This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-writer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 04c1689f0ded8cf6a232d6aa553b80bdd3930a69
Author: Xuanwo <[email protected]>
AuthorDate: Wed Apr 19 17:23:55 2023 +0800

    Fix tests
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/cursor.rs      | 63 ++++++++++++++++++++++++++++++++---------
 core/src/services/gcs/writer.rs |  4 +--
 core/src/services/oss/writer.rs | 20 +++++++++++--
 core/src/services/s3/writer.rs  | 20 +++++++++++--
 core/tests/behavior/write.rs    |  2 +-
 5 files changed, 89 insertions(+), 20 deletions(-)

diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 0889e45e..b0a4258e 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -165,7 +165,7 @@ impl VectorCursor {
 
     /// Returns `true` if current vector is empty.
     pub fn is_empty(&self) -> bool {
-        self.inner.is_empty()
+        self.size == 0
     }
 
     /// Return current bytes size of current vector.
@@ -191,8 +191,11 @@ impl VectorCursor {
         self.size = 0;
     }
 
-    /// Peak will read and copy n bytes from current cursor without
-    /// change it's content.
+    /// Peak will read and copy exactly n bytes from current cursor
+    /// without change it's content.
+    ///
+    /// This function is useful if you want to read a fixed size
+    /// content to make sure it aligned.
     ///
     /// # Panics
     ///
@@ -201,8 +204,8 @@ impl VectorCursor {
     /// # TODO
     ///
     /// Optimize to avoid data copy.
-    pub fn peak(&self, n: usize) -> Bytes {
-        assert!(n <= self.size, "peak size must smamller than current size");
+    pub fn peak_exact(&self, n: usize) -> Bytes {
+        assert!(n <= self.size, "peak size must smaller than current size");
 
         // Avoid data copy if n is smaller than first chunk.
         if self.inner[0].len() >= n {
@@ -222,6 +225,40 @@ impl VectorCursor {
         bs.freeze()
     }
 
+    /// peak_at_least will read and copy at least n bytes from current
+    /// cursor without change it's content.
+    ///
+    /// This function is useful if you only want to make sure the
+    /// returning bytes is larger.
+    ///
+    /// # Panics
+    ///
+    /// Panics if n is larger than current size.
+    ///
+    /// # TODO
+    ///
+    /// Optimize to avoid data copy.
+    pub fn peak_at_least(&self, n: usize) -> Bytes {
+        assert!(n <= self.size, "peak size must smaller than current size");
+
+        // Avoid data copy if n is smaller than first chunk.
+        if self.inner[0].len() >= n {
+            return self.inner[0].clone();
+        }
+
+        let mut bs = BytesMut::with_capacity(n);
+        let mut n = n;
+        for b in &self.inner {
+            if n == 0 {
+                break;
+            }
+            let len = b.len().min(n);
+            bs.extend_from_slice(&b[..len]);
+            n -= len;
+        }
+        bs.freeze()
+    }
+
     /// Take will consume n bytes from current cursor.
     ///
     /// # Panics
@@ -259,17 +296,17 @@ mod tests {
         vc.push(Bytes::from("hello"));
         vc.push(Bytes::from("world"));
 
-        assert_eq!(vc.peak(1), Bytes::from("h"));
-        assert_eq!(vc.peak(1), Bytes::from("h"));
-        assert_eq!(vc.peak(4), Bytes::from("hell"));
-        assert_eq!(vc.peak(6), Bytes::from("hellow"));
-        assert_eq!(vc.peak(10), Bytes::from("helloworld"));
+        assert_eq!(vc.peak_exact(1), Bytes::from("h"));
+        assert_eq!(vc.peak_exact(1), Bytes::from("h"));
+        assert_eq!(vc.peak_exact(4), Bytes::from("hell"));
+        assert_eq!(vc.peak_exact(6), Bytes::from("hellow"));
+        assert_eq!(vc.peak_exact(10), Bytes::from("helloworld"));
 
         vc.take(1);
-        assert_eq!(vc.peak(1), Bytes::from("e"));
+        assert_eq!(vc.peak_exact(1), Bytes::from("e"));
         vc.take(1);
-        assert_eq!(vc.peak(1), Bytes::from("l"));
+        assert_eq!(vc.peak_exact(1), Bytes::from("l"));
         vc.take(5);
-        assert_eq!(vc.peak(1), Bytes::from("r"));
+        assert_eq!(vc.peak_exact(1), Bytes::from("r"));
     }
 }
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 609b34ef..c4ed1f6e 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -155,7 +155,7 @@ impl oio::Write for GcsWriter {
             return Ok(());
         }
 
-        let bs = self.buffer.peak(self.buffer_size);
+        let bs = self.buffer.peak_exact(self.buffer_size);
 
         match self.write_part(location, bs).await {
             Ok(_) => {
@@ -184,7 +184,7 @@ impl oio::Write for GcsWriter {
             return Ok(());
         };
 
-        let bs = self.buffer.peak(self.buffer.len());
+        let bs = self.buffer.peak_exact(self.buffer.len());
 
         let resp = self
             .core
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index d4cbe721..1f730cfa 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -161,11 +161,12 @@ impl oio::Write for OssWriter {
             return Ok(());
         }
 
-        let bs = self.buffer.peak(self.buffer_size);
+        let bs = self.buffer.peak_at_least(self.buffer_size);
+        let size = bs.len();
 
         match self.write_part(upload_id, bs).await {
             Ok(part) => {
-                self.buffer.take(self.buffer_size);
+                self.buffer.take(size);
                 self.parts.push(part);
                 Ok(())
             }
@@ -193,6 +194,21 @@ impl oio::Write for OssWriter {
             return Ok(());
         };
 
+        // Make sure internal buffer has been flushed.
+        if !self.buffer.is_empty() {
+            let bs = self.buffer.peak_exact(self.buffer.len());
+
+            match self.write_part(upload_id, bs).await {
+                Ok(part) => {
+                    self.buffer.clear();
+                    self.parts.push(part);
+                }
+                Err(e) => {
+                    return Err(e);
+                }
+            }
+        }
+
         let resp = self
             .core
             .oss_complete_multipart_upload_request(&self.path, upload_id, 
false, &self.parts)
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index 39248cde..9e80bad2 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -179,11 +179,12 @@ impl oio::Write for S3Writer {
             return Ok(());
         }
 
-        let bs = self.buffer.peak(self.buffer_size);
+        let bs = self.buffer.peak_at_least(self.buffer_size);
+        let size = bs.len();
 
         match self.write_part(upload_id, bs).await {
             Ok(part) => {
-                self.buffer.take(self.buffer_size);
+                self.buffer.take(size);
                 self.parts.push(part);
                 Ok(())
             }
@@ -224,6 +225,21 @@ impl oio::Write for S3Writer {
             return Ok(());
         };
 
+        // Make sure internal buffer has been flushed.
+        if !self.buffer.is_empty() {
+            let bs = self.buffer.peak_exact(self.buffer.len());
+
+            match self.write_part(upload_id, bs).await {
+                Ok(part) => {
+                    self.buffer.clear();
+                    self.parts.push(part);
+                }
+                Err(e) => {
+                    return Err(e);
+                }
+            }
+        }
+
         let resp = self
             .core
             .s3_complete_multipart_upload(&self.path, upload_id, &self.parts)
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index baedb883..10082b57 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -744,7 +744,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> 
Result<()> {
     w.close().await?;
 
     let meta = op.stat(&path).await.expect("stat must succeed");
-    assert_eq!(meta.content_length(), (size * 2) as u64);
+    assert_eq!(meta.content_length(), size as u64);
 
     let bs = op.read(&path).await?;
     assert_eq!(bs.len(), size, "read size");

Reply via email to