This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-writer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit ce4ad52a9e1a3b68dd200129a6af5da39c67b83b Author: Xuanwo <[email protected]> AuthorDate: Wed Apr 19 17:10:21 2023 +0800 Add test for copy Signed-off-by: Xuanwo <[email protected]> --- core/src/types/writer.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++ core/tests/behavior/utils.rs | 11 ++++++++ core/tests/behavior/write.rs | 44 +++++++++++++++++++++++++---- 3 files changed, 116 insertions(+), 5 deletions(-) diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 68fe446f..49cc397e 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -180,6 +180,72 @@ impl AsyncWrite for Writer { } } +impl tokio::io::AsyncWrite for Writer { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + loop { + match &mut self.state { + State::Idle(w) => { + let mut w = w + .take() + .expect("invalid state of writer: Idle state with empty write"); + let bs = Bytes::from(buf.to_vec()); + let size = bs.len(); + let fut = async move { + w.write(bs).await?; + Ok((size, w)) + }; + self.state = State::Write(Box::pin(fut)); + } + State::Write(fut) => match ready!(fut.poll_unpin(cx)) { + Ok((size, w)) => { + self.state = State::Idle(Some(w)); + return Poll::Ready(Ok(size)); + } + Err(err) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))), + }, + State::Close(_) => { + unreachable!("invalid state of writer: poll_write with State::Close") + } + }; + } + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + loop { + match &mut self.state { + State::Idle(w) => { + let mut w = w + .take() + .expect("invalid state of writer: Idle state with empty write"); + let fut = async move { + w.close().await?; + Ok(w) + }; + self.state = State::Close(Box::pin(fut)); + } + State::Write(_) => { + unreachable!("invalid state of writer: poll_close with State::Write") + } + State::Close(fut) => match ready!(fut.poll_unpin(cx)) { + Ok(w) => { + self.state = State::Idle(Some(w)); + return Poll::Ready(Ok(())); + } + Err(err) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))), + }, + } + } + } +} + /// BlockingWriter is designed to write data into given path in an blocking /// manner. pub struct BlockingWriter { diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs index 44190997..b909d436 100644 --- a/core/tests/behavior/utils.rs +++ b/core/tests/behavior/utils.rs @@ -25,6 +25,7 @@ use log::debug; use opendal::layers::LoggingLayer; use opendal::layers::RetryLayer; use opendal::*; +use rand::distributions::uniform::SampleRange; use rand::prelude::*; use sha2::Digest; use sha2::Sha256; @@ -88,6 +89,16 @@ pub fn gen_bytes() -> (Vec<u8>, usize) { (content, size) } +pub fn gen_bytes_with_range(range: impl SampleRange<usize>) -> (Vec<u8>, usize) { + let mut rng = thread_rng(); + + let size = rng.gen_range(range); + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + + (content, size) +} + pub fn gen_fixed_bytes(size: usize) -> Vec<u8> { let mut rng = thread_rng(); diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index c0d24c4a..baedb883 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -97,8 +97,9 @@ macro_rules! behavior_write_tests { test_delete_with_special_chars, test_delete_not_existing, test_delete_stream, - test_append, - test_abort_writer, + test_writer_write, + test_writer_abort, + test_writer_futures_copy, ); )* }; @@ -580,7 +581,7 @@ pub async fn test_read_with_special_chars(op: Operator) -> Result<()> { } // Delete existing file should succeed. -pub async fn test_abort_writer(op: Operator) -> Result<()> { +pub async fn test_writer_abort(op: Operator) -> Result<()> { let path = uuid::Uuid::new_v4().to_string(); let (content, _) = gen_bytes(); @@ -685,8 +686,8 @@ pub async fn test_delete_stream(op: Operator) -> Result<()> { Ok(()) } -// Append write -pub async fn test_append(op: Operator) -> Result<()> { +// Append data into writer +pub async fn test_writer_write(op: Operator) -> Result<()> { let path = uuid::Uuid::new_v4().to_string(); let size = 5 * 1024 * 1024; // write file with 5 MiB let content_a = gen_fixed_bytes(size); @@ -723,3 +724,36 @@ pub async fn test_append(op: Operator) -> Result<()> { op.delete(&path).await.expect("delete must succeed"); Ok(()) } + +// copy data from reader to writer +pub async fn test_writer_futures_copy(op: Operator) -> Result<()> { + let path = uuid::Uuid::new_v4().to_string(); + let (content, size): (Vec<u8>, usize) = + gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024); + + let mut w = match op.writer(&path).await { + Ok(w) => w, + Err(err) if err.kind() == ErrorKind::Unsupported => { + warn!("service doesn't support write with append"); + return Ok(()); + } + Err(err) => return Err(err.into()), + }; + + futures::io::copy(&mut content.as_slice(), &mut w).await?; + w.close().await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), (size * 2) as u64); + + let bs = op.read(&path).await?; + assert_eq!(bs.len(), size, "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[..size])), + format!("{:x}", Sha256::digest(content)), + "read content" + ); + + op.delete(&path).await.expect("delete must succeed"); + Ok(()) +}
