This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-writer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit abc22f82c88f67f3443d56f49c71cc8f4526e46b
Author: Xuanwo <[email protected]>
AuthorDate: Mon Sep 4 19:47:46 2023 +0800

    Fix test
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs                  |  6 +++---
 core/src/raw/oio/read/into_read_from_file.rs |  2 +-
 core/src/raw/oio/write/exact_buf_write.rs    |  6 +++---
 core/src/types/writer.rs                     | 18 +++++++++---------
 core/tests/behavior/write.rs                 | 11 +++++++++--
 5 files changed, 25 insertions(+), 18 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 6ebd5c23c..db470334c 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -729,9 +729,9 @@ where
         let w = self.inner.as_mut().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
-        w.write(bs).await?;
-        self.written += n as u64;
-        Ok(n as u64)
+        let n = w.write(bs).await?;
+        self.written += n;
+        Ok(n)
     }
 
     async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
diff --git a/core/src/raw/oio/read/into_read_from_file.rs 
b/core/src/raw/oio/read/into_read_from_file.rs
index 6cc28f2d8..f005ac737 100644
--- a/core/src/raw/oio/read/into_read_from_file.rs
+++ b/core/src/raw/oio/read/into_read_from_file.rs
@@ -126,7 +126,7 @@ where
 
         Poll::Ready(Some(Err(Error::new(
             ErrorKind::Unsupported,
-            "output reader doesn't support seeking",
+            "output reader doesn't support next",
         ))))
     }
 }
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index 57118fce8..b2209ed73 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -66,7 +66,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
         loop {
             match &mut self.buffer {
                 Buffer::Filling(fill) => {
-                    if fill.len() == self.buffer_size {
+                    if fill.len() >= self.buffer_size {
                         self.buffer = Buffer::Consuming(fill.split().freeze());
                         continue;
                     }
@@ -94,7 +94,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
         loop {
             match &mut self.buffer {
                 Buffer::Filling(fill) => {
-                    if fill.len() == self.buffer_size {
+                    if fill.len() >= self.buffer_size {
                         self.buffer = Buffer::Consuming(fill.split().freeze());
                         continue;
                     }
@@ -108,7 +108,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
                     // Safety: the input buffer is created 
with_capacity(length).
                     unsafe { buf.assume_init(dst_len) };
 
-                    let n = s.read(buf.filled_mut()).await?;
+                    let n = s.read(buf.initialize_unfilled()).await?;
 
                     // Safety: read makes sure this buffer has been filled.
                     unsafe { fill.advance_mut(n) };
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 212aba528..9479b7c73 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -183,7 +183,10 @@ impl Writer {
         R: futures::AsyncRead + futures::AsyncSeek + Send + Sync + Unpin + 
'static,
     {
         if let State::Idle(Some(w)) = &mut self.state {
-            let r = Box::new(oio::into_read_from_file(read_from, 0, size));
+            let r = Box::new(oio::into_streamable_read(
+                oio::into_read_from_file(read_from, 0, size),
+                64 * 1024,
+            ));
             w.pipe(size, r).await
         } else {
             unreachable!(
@@ -257,10 +260,9 @@ impl AsyncWrite for Writer {
                         .take()
                         .expect("invalid state of writer: Idle state with 
empty write");
                     let bs = Bytes::from(buf.to_vec());
-                    let size = bs.len();
                     let fut = async move {
-                        w.write(bs).await?;
-                        Ok((size, w))
+                        let n = w.write(bs).await?;
+                        Ok((n as usize, w))
                     };
                     self.state = State::Write(Box::pin(fut));
                 }
@@ -324,10 +326,9 @@ impl tokio::io::AsyncWrite for Writer {
                         .take()
                         .expect("invalid state of writer: Idle state with 
empty write");
                     let bs = Bytes::from(buf.to_vec());
-                    let size = bs.len();
                     let fut = async move {
-                        w.write(bs).await?;
-                        Ok((size, w))
+                        let n = w.write(bs).await?;
+                        Ok((n as usize, w))
                     };
                     self.state = State::Write(Box::pin(fut));
                 }
@@ -417,10 +418,9 @@ impl BlockingWriter {
 
 impl io::Write for BlockingWriter {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        let size = buf.len();
         self.inner
             .write(Bytes::from(buf.to_vec()))
-            .map(|_| size)
+            .map(|n| n as usize)
             .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
     }
 
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 0ad042295..fe4166d78 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -19,6 +19,7 @@ use std::str::FromStr;
 use std::time::Duration;
 
 use anyhow::Result;
+use bytes::{Buf, Bytes};
 use futures::io::BufReader;
 use futures::io::Cursor;
 use futures::stream;
@@ -1194,13 +1195,19 @@ pub async fn test_writer_copy(op: Operator) -> 
Result<()> {
     let size = 5 * 1024 * 1024; // write file with 5 MiB
     let content_a = gen_fixed_bytes(size);
     let content_b = gen_fixed_bytes(size);
-    let reader = Cursor::new([content_a.clone(), content_b.clone()].concat());
 
     let mut w = op
         .writer_with(&path)
         .content_length(2 * size as u64)
         .await?;
-    w.copy(2 * size as u64, reader).await?;
+
+    let mut content = Bytes::from([content_a.clone(), 
content_b.clone()].concat());
+    while !content.is_empty() {
+        let reader = Cursor::new(content.clone());
+        let n = w.copy(2 * size as u64, reader).await?;
+        content.advance(n as usize);
+    }
+
     w.close().await?;
 
     let meta = op.stat(&path).await.expect("stat must succeed");

Reply via email to