This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch exact-buf-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit e40c5c6d10620dbf00cde1eae143df9e400d98a9 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 23 23:13:39 2023 +0800 Implement Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/write/exact_buf_write.rs | 100 +++++++++++++++++++++++++++--- 1 file changed, 90 insertions(+), 10 deletions(-) diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index f1cba001b..3afffe1d9 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -86,16 +86,21 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { .await } + /// # TODO + /// + /// We know every stream size, we can collect them into a buffer without chain them every time. async fn sink(&mut self, size: u64, mut s: Streamer) -> Result<()> { // Collect the stream into buffer directly if the buffet is not full. - if self.buffer.len() as u64 + size < self.buffer_size as u64 { - self.buffer.push(s.collect().await?); - return Ok(()); + if self.buffer_stream.is_none() { + if self.buffer.len() as u64 + size <= self.buffer_size as u64 { + self.buffer.push(s.collect().await?); + return Ok(()); + } } - if self.buffer.len() > self.buffer_size { - let buf = self.buffer.clone(); - let to_write = self.buffer.split_to(self.buffer_size); + if self.buffer.len() >= self.buffer_size { + let mut buf = self.buffer.clone(); + let to_write = buf.split_to(self.buffer_size); return self .inner .sink(to_write.len() as u64, Box::new(to_write)) @@ -151,12 +156,14 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { self.buffer_stream = None; break; } - Some(bs) => self.buffer.push(bs), + Some(bs) => { + self.buffer.push(bs); + } } } - let mut buf = self.buffer.clone(); - if buf.len() >= self.buffer_size { + if self.buffer.len() >= self.buffer_size { + let mut buf = self.buffer.clone(); let to_write = buf.split_to(self.buffer_size); self.inner .sink(to_write.len() as u64, Box::new(to_write)) @@ -164,7 +171,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { // Replace buffer with remaining if the write is successful. .map(|_| { self.buffer = buf; - })? + })?; } } @@ -182,3 +189,76 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { self.inner.close().await } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::raw::oio::StreamExt; + use crate::raw::oio::Write; + use log::debug; + use pretty_assertions::assert_eq; + use rand::{thread_rng, Rng, RngCore}; + use sha2::{Digest, Sha256}; + + struct MockWriter { + buf: Vec<u8>, + } + + #[async_trait] + impl Write for MockWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len()); + + self.buf.extend_from_slice(&bs); + Ok(()) + } + + async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + let bs = s.collect().await?; + assert_eq!(bs.len() as u64, size); + self.write(bs).await + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_fuzz_exact_buf_writer() -> Result<()> { + let _ = tracing_subscriber::fmt() + .pretty() + .with_test_writer() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut rng = thread_rng(); + let mut expected = vec![]; + + let buffer_size = rng.gen_range(1..10); + let mut writer = ExactBufWriter::new(MockWriter { buf: vec![] }, buffer_size); + debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}"); + + for _ in 0..1000 { + let size = rng.gen_range(1..20); + debug!("test_fuzz_exact_buf_writer: write size: {size}"); + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + + expected.extend_from_slice(&content); + writer.write(Bytes::from(content)).await?; + } + writer.close().await?; + + assert_eq!(writer.inner.buf.len(), expected.len()); + assert_eq!( + format!("{:x}", Sha256::digest(&writer.inner.buf)), + format!("{:x}", Sha256::digest(&expected)) + ); + Ok(()) + } +}
