This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-writer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 04c1689f0ded8cf6a232d6aa553b80bdd3930a69 Author: Xuanwo <[email protected]> AuthorDate: Wed Apr 19 17:23:55 2023 +0800 Fix tests Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/cursor.rs | 63 ++++++++++++++++++++++++++++++++--------- core/src/services/gcs/writer.rs | 4 +-- core/src/services/oss/writer.rs | 20 +++++++++++-- core/src/services/s3/writer.rs | 20 +++++++++++-- core/tests/behavior/write.rs | 2 +- 5 files changed, 89 insertions(+), 20 deletions(-) diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index 0889e45e..b0a4258e 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -165,7 +165,7 @@ impl VectorCursor { /// Returns `true` if current vector is empty. pub fn is_empty(&self) -> bool { - self.inner.is_empty() + self.size == 0 } /// Return current bytes size of current vector. @@ -191,8 +191,11 @@ impl VectorCursor { self.size = 0; } - /// Peak will read and copy n bytes from current cursor without - /// change it's content. + /// Peak will read and copy exactly n bytes from current cursor + /// without change it's content. + /// + /// This function is useful if you want to read a fixed size + /// content to make sure it aligned. /// /// # Panics /// @@ -201,8 +204,8 @@ impl VectorCursor { /// # TODO /// /// Optimize to avoid data copy. - pub fn peak(&self, n: usize) -> Bytes { - assert!(n <= self.size, "peak size must smamller than current size"); + pub fn peak_exact(&self, n: usize) -> Bytes { + assert!(n <= self.size, "peak size must smaller than current size"); // Avoid data copy if n is smaller than first chunk. if self.inner[0].len() >= n { @@ -222,6 +225,40 @@ impl VectorCursor { bs.freeze() } + /// peak_at_least will read and copy at least n bytes from current + /// cursor without change it's content. + /// + /// This function is useful if you only want to make sure the + /// returning bytes is larger. + /// + /// # Panics + /// + /// Panics if n is larger than current size. + /// + /// # TODO + /// + /// Optimize to avoid data copy. + pub fn peak_at_least(&self, n: usize) -> Bytes { + assert!(n <= self.size, "peak size must smaller than current size"); + + // Avoid data copy if n is smaller than first chunk. + if self.inner[0].len() >= n { + return self.inner[0].clone(); + } + + let mut bs = BytesMut::with_capacity(n); + let mut n = n; + for b in &self.inner { + if n == 0 { + break; + } + let len = b.len().min(n); + bs.extend_from_slice(&b[..len]); + n -= len; + } + bs.freeze() + } + /// Take will consume n bytes from current cursor. /// /// # Panics @@ -259,17 +296,17 @@ mod tests { vc.push(Bytes::from("hello")); vc.push(Bytes::from("world")); - assert_eq!(vc.peak(1), Bytes::from("h")); - assert_eq!(vc.peak(1), Bytes::from("h")); - assert_eq!(vc.peak(4), Bytes::from("hell")); - assert_eq!(vc.peak(6), Bytes::from("hellow")); - assert_eq!(vc.peak(10), Bytes::from("helloworld")); + assert_eq!(vc.peak_exact(1), Bytes::from("h")); + assert_eq!(vc.peak_exact(1), Bytes::from("h")); + assert_eq!(vc.peak_exact(4), Bytes::from("hell")); + assert_eq!(vc.peak_exact(6), Bytes::from("hellow")); + assert_eq!(vc.peak_exact(10), Bytes::from("helloworld")); vc.take(1); - assert_eq!(vc.peak(1), Bytes::from("e")); + assert_eq!(vc.peak_exact(1), Bytes::from("e")); vc.take(1); - assert_eq!(vc.peak(1), Bytes::from("l")); + assert_eq!(vc.peak_exact(1), Bytes::from("l")); vc.take(5); - assert_eq!(vc.peak(1), Bytes::from("r")); + assert_eq!(vc.peak_exact(1), Bytes::from("r")); } } diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 609b34ef..c4ed1f6e 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -155,7 +155,7 @@ impl oio::Write for GcsWriter { return Ok(()); } - let bs = self.buffer.peak(self.buffer_size); + let bs = self.buffer.peak_exact(self.buffer_size); match self.write_part(location, bs).await { Ok(_) => { @@ -184,7 +184,7 @@ impl oio::Write for GcsWriter { return Ok(()); }; - let bs = self.buffer.peak(self.buffer.len()); + let bs = self.buffer.peak_exact(self.buffer.len()); let resp = self .core diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index d4cbe721..1f730cfa 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -161,11 +161,12 @@ impl oio::Write for OssWriter { return Ok(()); } - let bs = self.buffer.peak(self.buffer_size); + let bs = self.buffer.peak_at_least(self.buffer_size); + let size = bs.len(); match self.write_part(upload_id, bs).await { Ok(part) => { - self.buffer.take(self.buffer_size); + self.buffer.take(size); self.parts.push(part); Ok(()) } @@ -193,6 +194,21 @@ impl oio::Write for OssWriter { return Ok(()); }; + // Make sure internal buffer has been flushed. + if !self.buffer.is_empty() { + let bs = self.buffer.peak_exact(self.buffer.len()); + + match self.write_part(upload_id, bs).await { + Ok(part) => { + self.buffer.clear(); + self.parts.push(part); + } + Err(e) => { + return Err(e); + } + } + } + let resp = self .core .oss_complete_multipart_upload_request(&self.path, upload_id, false, &self.parts) diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 39248cde..9e80bad2 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -179,11 +179,12 @@ impl oio::Write for S3Writer { return Ok(()); } - let bs = self.buffer.peak(self.buffer_size); + let bs = self.buffer.peak_at_least(self.buffer_size); + let size = bs.len(); match self.write_part(upload_id, bs).await { Ok(part) => { - self.buffer.take(self.buffer_size); + self.buffer.take(size); self.parts.push(part); Ok(()) } @@ -224,6 +225,21 @@ impl oio::Write for S3Writer { return Ok(()); }; + // Make sure internal buffer has been flushed. + if !self.buffer.is_empty() { + let bs = self.buffer.peak_exact(self.buffer.len()); + + match self.write_part(upload_id, bs).await { + Ok(part) => { + self.buffer.clear(); + self.parts.push(part); + } + Err(e) => { + return Err(e); + } + } + } + let resp = self .core .s3_complete_multipart_upload(&self.path, upload_id, &self.parts) diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index baedb883..10082b57 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -744,7 +744,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> Result<()> { w.close().await?; let meta = op.stat(&path).await.expect("stat must succeed"); - assert_eq!(meta.content_length(), (size * 2) as u64); + assert_eq!(meta.content_length(), size as u64); let bs = op.read(&path).await?; assert_eq!(bs.len(), size, "read size");
