This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch make-write-all in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 75ff58d477750a421639debfdf8dd0cf637c93a3 Author: Xuanwo <[email protected]> AuthorDate: Thu Jul 11 17:15:30 2024 +0800 Fix tests Signed-off-by: Xuanwo <[email protected]> --- core/src/types/blocking_write/blocking_writer.rs | 8 ++- core/src/types/context/write.rs | 74 +++++++++++++++--------- core/src/types/write/buffer_sink.rs | 12 ++-- core/src/types/write/writer.rs | 8 ++- 4 files changed, 68 insertions(+), 34 deletions(-) diff --git a/core/src/types/blocking_write/blocking_writer.rs b/core/src/types/blocking_write/blocking_writer.rs index d97cabb147..489cae502a 100644 --- a/core/src/types/blocking_write/blocking_writer.rs +++ b/core/src/types/blocking_write/blocking_writer.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use bytes::Buf; use std::sync::Arc; use crate::raw::*; @@ -67,7 +68,12 @@ impl BlockingWriter { /// } /// ``` pub fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> { - self.inner.write(bs.into()) + let mut bs = bs.into(); + while !bs.is_empty() { + let n = self.inner.write(bs.clone())?; + bs.advance(n); + } + Ok(()) } /// Close the writer and make sure all data have been committed. diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs index af875c14ed..6e2464cfce 100644 --- a/core/src/types/context/write.rs +++ b/core/src/types/context/write.rs @@ -133,14 +133,17 @@ impl WriteGenerator<oio::Writer> { impl WriteGenerator<oio::Writer> { /// Write the entire buffer into writer. - pub async fn write(&mut self, mut bs: Buffer) -> Result<()> { + pub async fn write(&mut self, mut bs: Buffer) -> Result<usize> { let Some(chunk_size) = self.chunk_size else { - return self.w.write_dyn(bs).await; + let size = bs.len(); + self.w.write_dyn(bs).await?; + return Ok(size); }; if self.buffer.len() + bs.len() < chunk_size { + let size = bs.len(); self.buffer.push(bs); - return Ok(()); + return Ok(size); } // Condition: @@ -149,10 +152,11 @@ impl WriteGenerator<oio::Writer> { // Action: // - write buffer + bs directly. if !self.exact { + let fill_size = bs.len(); self.buffer.push(bs); let buf = self.buffer.take().collect(); self.w.write_dyn(buf).await?; - return Ok(()); + return Ok(fill_size); } // Condition: @@ -173,8 +177,9 @@ impl WriteGenerator<oio::Writer> { // - write bs to buffer with remaining size. let remaining = chunk_size - self.buffer.len(); bs.truncate(remaining); + let n = bs.len(); self.buffer.push(bs); - Ok(()) + Ok(n) } /// Finish the write process. @@ -184,8 +189,8 @@ impl WriteGenerator<oio::Writer> { break; } - self.w.write_dyn(self.buffer.clone().collect()).await?; - self.buffer.clear(); + let buf = self.buffer.take().collect(); + self.w.write_dyn(buf).await?; } self.w.close().await @@ -215,14 +220,17 @@ impl WriteGenerator<oio::BlockingWriter> { impl WriteGenerator<oio::BlockingWriter> { /// Write the entire buffer into writer. - pub fn write(&mut self, mut bs: Buffer) -> Result<()> { + pub fn write(&mut self, mut bs: Buffer) -> Result<usize> { let Some(chunk_size) = self.chunk_size else { - return self.w.write(bs); + let size = bs.len(); + self.w.write(bs)?; + return Ok(size); }; if self.buffer.len() + bs.len() < chunk_size { + let size = bs.len(); self.buffer.push(bs); - return Ok(()); + return Ok(size); } // Condition: @@ -231,10 +239,11 @@ impl WriteGenerator<oio::BlockingWriter> { // Action: // - write buffer + bs directly. if !self.exact { + let fill_size = bs.len(); self.buffer.push(bs); let buf = self.buffer.take().collect(); self.w.write(buf)?; - return Ok(()); + return Ok(fill_size); } // Condition: @@ -255,8 +264,9 @@ impl WriteGenerator<oio::BlockingWriter> { // - write bs to buffer with remaining size. let remaining = chunk_size - self.buffer.len(); bs.truncate(remaining); + let n = bs.len(); self.buffer.push(bs); - Ok(()) + Ok(n) } /// Finish the write process. @@ -266,8 +276,8 @@ impl WriteGenerator<oio::BlockingWriter> { break; } - self.w.write(self.buffer.clone().collect())?; - self.buffer.clear(); + let buf = self.buffer.take().collect(); + self.w.write(buf)?; } self.w.close() @@ -278,8 +288,8 @@ impl WriteGenerator<oio::BlockingWriter> { mod tests { use super::*; use crate::raw::oio::Write; - use bytes::BufMut; use bytes::Bytes; + use bytes::{Buf, BufMut}; use log::debug; use pretty_assertions::assert_eq; use rand::thread_rng; @@ -326,8 +336,11 @@ mod tests { let buf = Arc::new(Mutex::new(vec![])); let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), true); - let bs = Bytes::from(expected.clone()); - w.write(bs.into()).await?; + let mut bs = Bytes::from(expected.clone()); + while !bs.is_empty() { + let n = w.write(bs.clone().into()).await?; + bs.advance(n); + } w.close().await?; @@ -356,7 +369,10 @@ mod tests { rng.fill_bytes(&mut expected); let bs = Bytes::from(expected.clone()); - w.write(bs.into()).await?; + // The MockWriter always returns the first chunk size. + let n = w.write(bs.into()).await?; + assert_eq!(expected.len(), n); + w.close().await?; let buf = buf.lock().await; @@ -391,13 +407,14 @@ mod tests { // content > chunk size. let content = new_content(15); - w.write(content.into()).await?; + assert_eq!(15, w.write(content.into()).await?); // content < chunk size. let content = new_content(5); - w.write(content.into()).await?; + assert_eq!(5, w.write(content.into()).await?); // content > chunk size, but 5 bytes in queue. let content = new_content(15); - w.write(content.clone().into()).await?; + // The MockWriter can send all 15 bytes together, so we can only advance 5 bytes. + assert_eq!(15, w.write(content.clone().into()).await?); w.close().await?; @@ -433,16 +450,16 @@ mod tests { // content > chunk size. let content = new_content(15); - w.write(content.into()).await?; + assert_eq!(15, w.write(content.into()).await?); // content < chunk size. let content = new_content(5); - w.write(content.into()).await?; + assert_eq!(5, w.write(content.into()).await?); // content < chunk size. let content = new_content(3); - w.write(content.into()).await?; + assert_eq!(3, w.write(content.into()).await?); // content > chunk size, but can send all chunks in the queue. let content = new_content(15); - w.write(content.clone().into()).await?; + assert_eq!(15, w.write(content.clone().into()).await?); w.close().await?; @@ -483,8 +500,11 @@ mod tests { expected.extend_from_slice(&content); - let bs = Bytes::from(content.clone()); - writer.write(bs.into()).await?; + let mut bs = Bytes::from(content.clone()); + while !bs.is_empty() { + let n = writer.write(bs.clone().into()).await?; + bs.advance(n); + } } writer.close().await?; diff --git a/core/src/types/write/buffer_sink.rs b/core/src/types/write/buffer_sink.rs index cc42e09090..46d9112530 100644 --- a/core/src/types/write/buffer_sink.rs +++ b/core/src/types/write/buffer_sink.rs @@ -20,6 +20,8 @@ use std::task::ready; use std::task::Context; use std::task::Poll; +use bytes::Buf; + use crate::raw::*; use crate::*; @@ -33,7 +35,7 @@ pub struct BufferSink { enum State { Idle(Option<WriteGenerator<oio::Writer>>), - Writing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<()>)>), + Writing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<usize>)>), Closing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<()>)>), } @@ -90,8 +92,8 @@ impl futures::Sink<Buffer> for BufferSink { let (w, res) = ready!(fut.as_mut().poll(cx)); this.state = State::Idle(Some(w)); match res { - Ok(_) => { - this.buf = Buffer::new(); + Ok(n) => { + this.buf.advance(n); } Err(err) => return Poll::Ready(Err(err)), } @@ -137,8 +139,8 @@ impl futures::Sink<Buffer> for BufferSink { let (w, res) = ready!(fut.as_mut().poll(cx)); this.state = State::Idle(Some(w)); match res { - Ok(_) => { - this.buf = Buffer::new(); + Ok(n) => { + this.buf.advance(n); } Err(err) => return Poll::Ready(Err(err)), } diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs index 44755ed37c..eaf8b72517 100644 --- a/core/src/types/write/writer.rs +++ b/core/src/types/write/writer.rs @@ -136,7 +136,13 @@ impl Writer { /// } /// ``` pub async fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> { - self.inner.write(bs.into()).await + let mut bs = bs.into(); + while !bs.is_empty() { + let n = self.inner.write(bs.clone()).await?; + bs.advance(n); + } + + Ok(()) } /// Write [`bytes::Buf`] into inner writer.
