This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-writer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit abc22f82c88f67f3443d56f49c71cc8f4526e46b Author: Xuanwo <[email protected]> AuthorDate: Mon Sep 4 19:47:46 2023 +0800 Fix test Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 6 +++--- core/src/raw/oio/read/into_read_from_file.rs | 2 +- core/src/raw/oio/write/exact_buf_write.rs | 6 +++--- core/src/types/writer.rs | 18 +++++++++--------- core/tests/behavior/write.rs | 11 +++++++++-- 5 files changed, 25 insertions(+), 18 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 6ebd5c23c..db470334c 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -729,9 +729,9 @@ where let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - w.write(bs).await?; - self.written += n as u64; - Ok(n as u64) + let n = w.write(bs).await?; + self.written += n; + Ok(n) } async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { diff --git a/core/src/raw/oio/read/into_read_from_file.rs b/core/src/raw/oio/read/into_read_from_file.rs index 6cc28f2d8..f005ac737 100644 --- a/core/src/raw/oio/read/into_read_from_file.rs +++ b/core/src/raw/oio/read/into_read_from_file.rs @@ -126,7 +126,7 @@ where Poll::Ready(Some(Err(Error::new( ErrorKind::Unsupported, - "output reader doesn't support seeking", + "output reader doesn't support next", )))) } } diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 57118fce8..b2209ed73 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -66,7 +66,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { loop { match &mut self.buffer { Buffer::Filling(fill) => { - if fill.len() == self.buffer_size { + if fill.len() >= self.buffer_size { self.buffer = Buffer::Consuming(fill.split().freeze()); continue; } @@ -94,7 +94,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { loop { match &mut self.buffer { Buffer::Filling(fill) => { - if fill.len() == self.buffer_size { + if fill.len() >= self.buffer_size { self.buffer = Buffer::Consuming(fill.split().freeze()); continue; } @@ -108,7 +108,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { // Safety: the input buffer is created with_capacity(length). unsafe { buf.assume_init(dst_len) }; - let n = s.read(buf.filled_mut()).await?; + let n = s.read(buf.initialize_unfilled()).await?; // Safety: read makes sure this buffer has been filled. unsafe { fill.advance_mut(n) }; diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 212aba528..9479b7c73 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -183,7 +183,10 @@ impl Writer { R: futures::AsyncRead + futures::AsyncSeek + Send + Sync + Unpin + 'static, { if let State::Idle(Some(w)) = &mut self.state { - let r = Box::new(oio::into_read_from_file(read_from, 0, size)); + let r = Box::new(oio::into_streamable_read( + oio::into_read_from_file(read_from, 0, size), + 64 * 1024, + )); w.pipe(size, r).await } else { unreachable!( @@ -257,10 +260,9 @@ impl AsyncWrite for Writer { .take() .expect("invalid state of writer: Idle state with empty write"); let bs = Bytes::from(buf.to_vec()); - let size = bs.len(); let fut = async move { - w.write(bs).await?; - Ok((size, w)) + let n = w.write(bs).await?; + Ok((n as usize, w)) }; self.state = State::Write(Box::pin(fut)); } @@ -324,10 +326,9 @@ impl tokio::io::AsyncWrite for Writer { .take() .expect("invalid state of writer: Idle state with empty write"); let bs = Bytes::from(buf.to_vec()); - let size = bs.len(); let fut = async move { - w.write(bs).await?; - Ok((size, w)) + let n = w.write(bs).await?; + Ok((n as usize, w)) }; self.state = State::Write(Box::pin(fut)); } @@ -417,10 +418,9 @@ impl BlockingWriter { impl io::Write for BlockingWriter { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - let size = buf.len(); self.inner .write(Bytes::from(buf.to_vec())) - .map(|_| size) + .map(|n| n as usize) .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) } diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index 0ad042295..fe4166d78 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -19,6 +19,7 @@ use std::str::FromStr; use std::time::Duration; use anyhow::Result; +use bytes::{Buf, Bytes}; use futures::io::BufReader; use futures::io::Cursor; use futures::stream; @@ -1194,13 +1195,19 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> { let size = 5 * 1024 * 1024; // write file with 5 MiB let content_a = gen_fixed_bytes(size); let content_b = gen_fixed_bytes(size); - let reader = Cursor::new([content_a.clone(), content_b.clone()].concat()); let mut w = op .writer_with(&path) .content_length(2 * size as u64) .await?; - w.copy(2 * size as u64, reader).await?; + + let mut content = Bytes::from([content_a.clone(), content_b.clone()].concat()); + while !content.is_empty() { + let reader = Cursor::new(content.clone()); + let n = w.copy(2 * size as u64, reader).await?; + content.advance(n as usize); + } + w.close().await?; let meta = op.stat(&path).await.expect("stat must succeed");
