This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-writer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit ce4ad52a9e1a3b68dd200129a6af5da39c67b83b
Author: Xuanwo <[email protected]>
AuthorDate: Wed Apr 19 17:10:21 2023 +0800

    Add test for copy
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/types/writer.rs     | 66 ++++++++++++++++++++++++++++++++++++++++++++
 core/tests/behavior/utils.rs | 11 ++++++++
 core/tests/behavior/write.rs | 44 +++++++++++++++++++++++++----
 3 files changed, 116 insertions(+), 5 deletions(-)

diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 68fe446f..49cc397e 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -180,6 +180,72 @@ impl AsyncWrite for Writer {
     }
 }
 
+impl tokio::io::AsyncWrite for Writer {
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        loop {
+            match &mut self.state {
+                State::Idle(w) => {
+                    let mut w = w
+                        .take()
+                        .expect("invalid state of writer: Idle state with 
empty write");
+                    let bs = Bytes::from(buf.to_vec());
+                    let size = bs.len();
+                    let fut = async move {
+                        w.write(bs).await?;
+                        Ok((size, w))
+                    };
+                    self.state = State::Write(Box::pin(fut));
+                }
+                State::Write(fut) => match ready!(fut.poll_unpin(cx)) {
+                    Ok((size, w)) => {
+                        self.state = State::Idle(Some(w));
+                        return Poll::Ready(Ok(size));
+                    }
+                    Err(err) => return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+                },
+                State::Close(_) => {
+                    unreachable!("invalid state of writer: poll_write with 
State::Close")
+                }
+            };
+        }
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> 
Poll<io::Result<()>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<io::Result<()>> {
+        loop {
+            match &mut self.state {
+                State::Idle(w) => {
+                    let mut w = w
+                        .take()
+                        .expect("invalid state of writer: Idle state with 
empty write");
+                    let fut = async move {
+                        w.close().await?;
+                        Ok(w)
+                    };
+                    self.state = State::Close(Box::pin(fut));
+                }
+                State::Write(_) => {
+                    unreachable!("invalid state of writer: poll_close with 
State::Write")
+                }
+                State::Close(fut) => match ready!(fut.poll_unpin(cx)) {
+                    Ok(w) => {
+                        self.state = State::Idle(Some(w));
+                        return Poll::Ready(Ok(()));
+                    }
+                    Err(err) => return 
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+                },
+            }
+        }
+    }
+}
+
 /// BlockingWriter is designed to write data into given path in an blocking
 /// manner.
 pub struct BlockingWriter {
diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs
index 44190997..b909d436 100644
--- a/core/tests/behavior/utils.rs
+++ b/core/tests/behavior/utils.rs
@@ -25,6 +25,7 @@ use log::debug;
 use opendal::layers::LoggingLayer;
 use opendal::layers::RetryLayer;
 use opendal::*;
+use rand::distributions::uniform::SampleRange;
 use rand::prelude::*;
 use sha2::Digest;
 use sha2::Sha256;
@@ -88,6 +89,16 @@ pub fn gen_bytes() -> (Vec<u8>, usize) {
     (content, size)
 }
 
+pub fn gen_bytes_with_range(range: impl SampleRange<usize>) -> (Vec<u8>, 
usize) {
+    let mut rng = thread_rng();
+
+    let size = rng.gen_range(range);
+    let mut content = vec![0; size];
+    rng.fill_bytes(&mut content);
+
+    (content, size)
+}
+
 pub fn gen_fixed_bytes(size: usize) -> Vec<u8> {
     let mut rng = thread_rng();
 
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index c0d24c4a..baedb883 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -97,8 +97,9 @@ macro_rules! behavior_write_tests {
                 test_delete_with_special_chars,
                 test_delete_not_existing,
                 test_delete_stream,
-                test_append,
-                test_abort_writer,
+                test_writer_write,
+                test_writer_abort,
+                test_writer_futures_copy,
             );
         )*
     };
@@ -580,7 +581,7 @@ pub async fn test_read_with_special_chars(op: Operator) -> 
Result<()> {
 }
 
 // Delete existing file should succeed.
-pub async fn test_abort_writer(op: Operator) -> Result<()> {
+pub async fn test_writer_abort(op: Operator) -> Result<()> {
     let path = uuid::Uuid::new_v4().to_string();
     let (content, _) = gen_bytes();
 
@@ -685,8 +686,8 @@ pub async fn test_delete_stream(op: Operator) -> Result<()> 
{
     Ok(())
 }
 
-// Append write
-pub async fn test_append(op: Operator) -> Result<()> {
+// Append data into writer
+pub async fn test_writer_write(op: Operator) -> Result<()> {
     let path = uuid::Uuid::new_v4().to_string();
     let size = 5 * 1024 * 1024; // write file with 5 MiB
     let content_a = gen_fixed_bytes(size);
@@ -723,3 +724,36 @@ pub async fn test_append(op: Operator) -> Result<()> {
     op.delete(&path).await.expect("delete must succeed");
     Ok(())
 }
+
+// copy data from reader to writer
+pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
+    let path = uuid::Uuid::new_v4().to_string();
+    let (content, size): (Vec<u8>, usize) =
+        gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
+
+    let mut w = match op.writer(&path).await {
+        Ok(w) => w,
+        Err(err) if err.kind() == ErrorKind::Unsupported => {
+            warn!("service doesn't support write with append");
+            return Ok(());
+        }
+        Err(err) => return Err(err.into()),
+    };
+
+    futures::io::copy(&mut content.as_slice(), &mut w).await?;
+    w.close().await?;
+
+    let meta = op.stat(&path).await.expect("stat must succeed");
+    assert_eq!(meta.content_length(), (size * 2) as u64);
+
+    let bs = op.read(&path).await?;
+    assert_eq!(bs.len(), size, "read size");
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[..size])),
+        format!("{:x}", Sha256::digest(content)),
+        "read content"
+    );
+
+    op.delete(&path).await.expect("delete must succeed");
+    Ok(())
+}

Reply via email to