This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch make-write-all in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 153385eafafe235266a3f80e3aaf79d44c140b1f Author: Xuanwo <[email protected]> AuthorDate: Thu Jul 11 16:11:31 2024 +0800 Remove returning n in write Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 7 +- core/src/layers/concurrent_limit.rs | 4 +- core/src/layers/error_context.rs | 14 ++-- core/src/layers/logging.rs | 25 +++---- core/src/layers/retry.rs | 4 +- core/src/layers/timeout.rs | 2 +- core/src/raw/adapters/kv/backend.rs | 10 ++- core/src/raw/adapters/typed_kv/backend.rs | 10 ++- core/src/raw/enum_utils.rs | 4 +- core/src/raw/oio/write/api.rs | 30 +++----- core/src/raw/oio/write/append_write.rs | 8 +-- core/src/raw/oio/write/block_write.rs | 10 +-- core/src/raw/oio/write/multipart_write.rs | 10 +-- core/src/raw/oio/write/one_shot_write.rs | 5 +- core/src/raw/oio/write/position_write.rs | 10 +-- core/src/raw/oio/write/range_write.rs | 10 +-- core/src/types/blocking_write/blocking_writer.rs | 8 +-- core/src/types/blocking_write/std_writer.rs | 10 ++- core/src/types/context/write.rs | 89 +++++++++--------------- core/src/types/write/buffer_sink.rs | 12 ++-- core/src/types/write/writer.rs | 14 +--- 21 files changed, 114 insertions(+), 182 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index c2156a0418..68b0340aa7 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -654,7 +654,7 @@ impl<W> oio::Write for CompleteWriter<W> where W: oio::Write, { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; @@ -689,13 +689,12 @@ impl<W> oio::BlockingWrite for CompleteWriter<W> where W: oio::BlockingWrite, { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - let n = w.write(bs)?; - Ok(n) + w.write(bs) } fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index a1a61ad01d..87ad19b50c 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -262,7 +262,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> { } impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs).await } @@ -276,7 +276,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 86ae9dba80..cabe84b053 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -385,14 +385,13 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> { } impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); self.inner .write(bs) .await - .map(|n| { - self.processed += n as u64; - n + .map(|_| { + self.processed += size as u64; }) .map_err(|err| { err.with_operation(WriteOperation::Write) @@ -423,13 +422,12 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { } impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); self.inner .write(bs) - .map(|n| { - self.processed += n as u64; - n + .map(|_| { + self.processed += size as u64; }) .map_err(|err| { err.with_operation(WriteOperation::BlockingWrite) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 1e0d80d264..507745c6d1 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1072,21 +1072,20 @@ impl<W> LoggingWriter<W> { } impl<W: oio::Write> oio::Write for LoggingWriter<W> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { - match self.inner.write(bs.clone()).await { - Ok(n) => { - self.written += n as u64; + async fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + match self.inner.write(bs).await { + Ok(_) => { trace!( target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> input data {}B, write {}B", + "service={} operation={} path={} written={}B -> data write {}B", self.ctx.scheme, WriteOperation::Write, self.path, self.written, - bs.len(), - n, + size, ); - Ok(n) + Ok(()) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1170,21 +1169,19 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { match self.inner.write(bs.clone()) { - Ok(n) => { - self.written += n as u64; + Ok(_) => { trace!( target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> input data {}B, write {}B", + "service={} operation={} path={} written={}B -> data write {}B", self.ctx.scheme, WriteOperation::BlockingWrite, self.path, self.written, bs.len(), - n ); - Ok(n) + Ok(()) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index c48307fdbc..5bc37e6145 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -626,7 +626,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapp } impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { use backon::RetryableWithContext; let inner = self.take_inner()?; @@ -694,7 +694,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { } impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWrapper<R, I> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { { || self.inner.as_mut().unwrap().write(bs.clone()) } .retry(&self.builder) .when(|e| e.is_temporary()) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 246049dfbf..1cbc0c5ac1 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -350,7 +350,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { } impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let fut = self.inner.write(bs); Self::io_timeout(self.timeout, WriteOperation::Write.into_static(), fut).await } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index bb08e4cb19..625e7ea982 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -242,10 +242,9 @@ impl<S> KvWriter<S> { unsafe impl<S: Adapter> Sync for KvWriter<S> {} impl<S: Adapter> oio::Write for KvWriter<S> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { - let ret = bs.len(); + async fn write(&mut self, bs: Buffer) -> Result<()> { self.buffer.push(bs); - Ok(ret) + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -260,10 +259,9 @@ impl<S: Adapter> oio::Write for KvWriter<S> { } impl<S: Adapter> oio::BlockingWrite for KvWriter<S> { - fn write(&mut self, bs: Buffer) -> Result<usize> { - let ret = bs.len(); + fn write(&mut self, bs: Buffer) -> Result<()> { self.buffer.push(bs); - Ok(ret) + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index ecce2eb879..fd6271691b 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -275,12 +275,11 @@ impl<S> KvWriter<S> { } impl<S: Adapter> oio::Write for KvWriter<S> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { - let size = bs.len(); + async fn write(&mut self, bs: Buffer) -> Result<()> { let mut buf = self.buf.take().unwrap_or_default(); buf.push(bs); self.buf = Some(buf); - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -303,12 +302,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> { } impl<S: Adapter> oio::BlockingWrite for KvWriter<S> { - fn write(&mut self, bs: Buffer) -> Result<usize> { - let size = bs.len(); + fn write(&mut self, bs: Buffer) -> Result<()> { let mut buf = self.buf.take().unwrap_or_default(); buf.push(bs); self.buf = Some(buf); - Ok(size) + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs index c22411904d..111da78be0 100644 --- a/core/src/raw/enum_utils.rs +++ b/core/src/raw/enum_utils.rs @@ -70,7 +70,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead> oio::BlockingRead for TwoWa } impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWays<ONE, TWO> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { match self { Self::One(v) => v.write(bs).await, Self::Two(v) => v.write(bs).await, @@ -129,7 +129,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead, THREE: oio::BlockingRead> o impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write for ThreeWays<ONE, TWO, THREE> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { match self { Self::One(v) => v.write(bs).await, Self::Two(v) => v.write(bs).await, diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index e6c7c05918..4ec53adab6 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -77,31 +77,19 @@ pub trait Write: Unpin + Send + Sync { /// /// # Behavior /// - /// - `Ok(n)` means `n` bytes has been written successfully. + /// - `Ok(())` means all bytes has been written successfully. /// - `Err(err)` means error happens and no bytes has been written. - /// - /// It's possible that `n < bs.len()`, caller should pass the remaining bytes - /// repeatedly until all bytes has been written. - #[cfg(not(target_arch = "wasm32"))] - fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend; - #[cfg(target_arch = "wasm32")] - fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>>; + fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend; /// Close the writer and make sure all data has been flushed. - #[cfg(not(target_arch = "wasm32"))] fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend; - #[cfg(target_arch = "wasm32")] - fn close(&mut self) -> impl Future<Output = Result<()>>; /// Abort the pending writer. - #[cfg(not(target_arch = "wasm32"))] fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend; - #[cfg(target_arch = "wasm32")] - fn abort(&mut self) -> impl Future<Output = Result<()>>; } impl Write for () { - async fn write(&mut self, _: Buffer) -> Result<usize> { + async fn write(&mut self, _: Buffer) -> Result<()> { unimplemented!("write is required to be implemented for oio::Write") } @@ -121,7 +109,7 @@ impl Write for () { } pub trait WriteDyn: Unpin + Send + Sync { - fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<usize>>; + fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<()>>; fn close_dyn(&mut self) -> BoxedFuture<Result<()>>; @@ -129,7 +117,7 @@ pub trait WriteDyn: Unpin + Send + Sync { } impl<T: Write + ?Sized> WriteDyn for T { - fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<usize>> { + fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<()>> { Box::pin(self.write(bs)) } @@ -143,7 +131,7 @@ impl<T: Write + ?Sized> WriteDyn for T { } impl<T: WriteDyn + ?Sized> Write for Box<T> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { self.deref_mut().write_dyn(bs).await } @@ -170,14 +158,14 @@ pub trait BlockingWrite: Send + Sync + 'static { /// /// It's possible that `n < bs.len()`, caller should pass the remaining bytes /// repeatedly until all bytes has been written. - fn write(&mut self, bs: Buffer) -> Result<usize>; + fn write(&mut self, bs: Buffer) -> Result<()>; /// Close the writer and make sure all data has been flushed. fn close(&mut self) -> Result<()>; } impl BlockingWrite for () { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { let _ = bs; unimplemented!("write is required to be implemented for oio::BlockingWrite") @@ -195,7 +183,7 @@ impl BlockingWrite for () { /// /// To make BlockingWriter work as expected, we must add this impl. impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { (**self).write(bs) } diff --git a/core/src/raw/oio/write/append_write.rs b/core/src/raw/oio/write/append_write.rs index 2f48b68307..06c72cc5e2 100644 --- a/core/src/raw/oio/write/append_write.rs +++ b/core/src/raw/oio/write/append_write.rs @@ -80,7 +80,7 @@ impl<W> oio::Write for AppendWriter<W> where W: AppendWrite, { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let offset = match self.offset { Some(offset) => offset, None => { @@ -91,12 +91,10 @@ where }; let size = bs.len(); - self.inner - .append(offset, size as u64, Buffer::from(bs.to_bytes())) - .await?; + self.inner.append(offset, size as u64, bs).await?; // Update offset after succeed. self.offset = Some(offset + size as u64); - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs index 99c76562ca..cd0ec43b45 100644 --- a/core/src/raw/oio/write/block_write.rs +++ b/core/src/raw/oio/write/block_write.rs @@ -162,10 +162,10 @@ impl<W> oio::Write for BlockWriter<W> where W: BlockWrite, { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { if !self.started && self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + self.fill_cache(bs); + return Ok(()); } // The block upload process has been started. @@ -181,8 +181,8 @@ where }) .await?; self.cache = None; - let size = self.fill_cache(bs); - Ok(size) + self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/multipart_write.rs b/core/src/raw/oio/write/multipart_write.rs index 0d893d7cb3..44a33c7a4b 100644 --- a/core/src/raw/oio/write/multipart_write.rs +++ b/core/src/raw/oio/write/multipart_write.rs @@ -203,14 +203,14 @@ impl<W> oio::Write for MultipartWriter<W> where W: MultipartWrite, { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let upload_id = match self.upload_id.clone() { Some(v) => v, None => { // Fill cache with the first write. if self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + self.fill_cache(bs); + return Ok(()); } let upload_id = self.w.initiate_part().await?; @@ -234,8 +234,8 @@ where .await?; self.cache = None; self.next_part_number += 1; - let size = self.fill_cache(bs); - Ok(size) + self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index cd056c1461..938973c33a 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -50,16 +50,15 @@ impl<W: OneShotWrite> OneShotWriter<W> { } impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { match &self.buffer { Some(_) => Err(Error::new( ErrorKind::Unsupported, "OneShotWriter doesn't support multiple write", )), None => { - let size = bs.len(); self.buffer = Some(bs); - Ok(size) + Ok(()) } } } diff --git a/core/src/raw/oio/write/position_write.rs b/core/src/raw/oio/write/position_write.rs index 3dbf5c93ef..5aa5ff3294 100644 --- a/core/src/raw/oio/write/position_write.rs +++ b/core/src/raw/oio/write/position_write.rs @@ -124,10 +124,10 @@ impl<W: PositionWrite> PositionWriter<W> { } impl<W: PositionWrite> oio::Write for PositionWriter<W> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { if self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + let _ = self.fill_cache(bs); + return Ok(()); } let bytes = self.cache.clone().expect("pending write must exist"); @@ -144,8 +144,8 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> { .await?; self.cache = None; self.next_offset += length; - let size = self.fill_cache(bs); - Ok(size) + let _ = self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 67ae619dd9..f44f06ad9c 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -155,14 +155,14 @@ impl<W: RangeWrite> RangeWriter<W> { } impl<W: RangeWrite> oio::Write for RangeWriter<W> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let location = match self.location.clone() { Some(location) => location, None => { // Fill cache with the first write. if self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + self.fill_cache(bs); + return Ok(()); } let location = self.w.initiate_range().await?; @@ -187,8 +187,8 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> { .await?; self.cache = None; self.next_offset += length; - let size = self.fill_cache(bs); - Ok(size) + self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/types/blocking_write/blocking_writer.rs b/core/src/types/blocking_write/blocking_writer.rs index 489cae502a..d97cabb147 100644 --- a/core/src/types/blocking_write/blocking_writer.rs +++ b/core/src/types/blocking_write/blocking_writer.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use std::sync::Arc; use crate::raw::*; @@ -68,12 +67,7 @@ impl BlockingWriter { /// } /// ``` pub fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> { - let mut bs = bs.into(); - while !bs.is_empty() { - let n = self.inner.write(bs.clone())?; - bs.advance(n); - } - Ok(()) + self.inner.write(bs.into()) } /// Close the writer and make sure all data have been committed. diff --git a/core/src/types/blocking_write/std_writer.rs b/core/src/types/blocking_write/std_writer.rs index 5b18467e36..fe918b43bf 100644 --- a/core/src/types/blocking_write/std_writer.rs +++ b/core/src/types/blocking_write/std_writer.rs @@ -81,10 +81,9 @@ impl Write for StdWriter { } let bs = self.buf.get().expect("frozen buffer must be valid"); - let n = w - .write(Buffer::from(bs)) + w.write(Buffer::from(bs)) .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - self.buf.advance(n); + self.buf.clean(); } } @@ -103,10 +102,9 @@ impl Write for StdWriter { return Ok(()); }; - let n = w - .write(Buffer::from(bs)) + w.write(Buffer::from(bs)) .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - self.buf.advance(n); + self.buf.clean(); } } } diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs index 557341da3d..92248762cf 100644 --- a/core/src/types/context/write.rs +++ b/core/src/types/context/write.rs @@ -18,7 +18,6 @@ use crate::raw::oio::Write; use crate::raw::*; use crate::*; -use bytes::Buf; use std::sync::Arc; /// WriteContext holds the immutable context for give write operation. @@ -134,15 +133,14 @@ impl WriteGenerator<oio::Writer> { impl WriteGenerator<oio::Writer> { /// Write the entire buffer into writer. - pub async fn write(&mut self, mut bs: Buffer) -> Result<usize> { + pub async fn write(&mut self, mut bs: Buffer) -> Result<()> { let Some(chunk_size) = self.chunk_size else { return self.w.write_dyn(bs).await; }; if self.buffer.len() + bs.len() < chunk_size { - let size = bs.len(); self.buffer.push(bs); - return Ok(size); + return Ok(()); } // Condition: @@ -151,13 +149,10 @@ impl WriteGenerator<oio::Writer> { // Action: // - write buffer + bs directly. if !self.exact { - let fill_size = bs.len(); self.buffer.push(bs); - let mut buf = self.buffer.take().collect(); - let written = self.w.write_dyn(buf.clone()).await?; - buf.advance(written); - self.buffer.push(buf); - return Ok(fill_size); + let buf = self.buffer.take().collect(); + self.w.write_dyn(buf).await?; + return Ok(()); } // Condition: @@ -167,10 +162,8 @@ impl WriteGenerator<oio::Writer> { // Action: // - write existing buffer in chunk_size to make more rooms for writing data. if self.buffer.len() >= chunk_size { - let mut buf = self.buffer.take().collect(); - let written = self.w.write_dyn(buf.clone()).await?; - buf.advance(written); - self.buffer.push(buf); + let buf = self.buffer.take().collect(); + self.w.write_dyn(buf).await?; } // Condition @@ -180,9 +173,8 @@ impl WriteGenerator<oio::Writer> { // - write bs to buffer with remaining size. let remaining = chunk_size - self.buffer.len(); bs.truncate(remaining); - let n = bs.len(); self.buffer.push(bs); - Ok(n) + Ok(()) } /// Finish the write process. @@ -192,8 +184,8 @@ impl WriteGenerator<oio::Writer> { break; } - let written = self.w.write_dyn(self.buffer.clone().collect()).await?; - self.buffer.advance(written); + self.w.write_dyn(self.buffer.clone().collect()).await?; + self.buffer.clear(); } self.w.close().await @@ -223,15 +215,14 @@ impl WriteGenerator<oio::BlockingWriter> { impl WriteGenerator<oio::BlockingWriter> { /// Write the entire buffer into writer. - pub fn write(&mut self, mut bs: Buffer) -> Result<usize> { + pub fn write(&mut self, mut bs: Buffer) -> Result<()> { let Some(chunk_size) = self.chunk_size else { return self.w.write(bs); }; if self.buffer.len() + bs.len() < chunk_size { - let size = bs.len(); self.buffer.push(bs); - return Ok(size); + return Ok(()); } // Condition: @@ -240,13 +231,10 @@ impl WriteGenerator<oio::BlockingWriter> { // Action: // - write buffer + bs directly. if !self.exact { - let fill_size = bs.len(); self.buffer.push(bs); - let mut buf = self.buffer.take().collect(); - let written = self.w.write(buf.clone())?; - buf.advance(written); - self.buffer.push(buf); - return Ok(fill_size); + let buf = self.buffer.take().collect(); + self.w.write(buf)?; + return Ok(()); } // Condition: @@ -256,10 +244,8 @@ impl WriteGenerator<oio::BlockingWriter> { // Action: // - write existing buffer in chunk_size to make more rooms for writing data. if self.buffer.len() >= chunk_size { - let mut buf = self.buffer.take().collect(); - let written = self.w.write(buf.clone())?; - buf.advance(written); - self.buffer.push(buf); + let buf = self.buffer.take().collect(); + self.w.write(buf)?; } // Condition @@ -269,9 +255,8 @@ impl WriteGenerator<oio::BlockingWriter> { // - write bs to buffer with remaining size. let remaining = chunk_size - self.buffer.len(); bs.truncate(remaining); - let n = bs.len(); self.buffer.push(bs); - Ok(n) + Ok(()) } /// Finish the write process. @@ -281,8 +266,8 @@ impl WriteGenerator<oio::BlockingWriter> { break; } - let written = self.w.write(self.buffer.clone().collect())?; - self.buffer.advance(written); + self.w.write(self.buffer.clone().collect())?; + self.buffer.clear(); } self.w.close() @@ -343,10 +328,7 @@ mod tests { let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), true); let mut bs = Bytes::from(expected.clone()); - while !bs.is_empty() { - let n = w.write(bs.clone().into()).await?; - bs.advance(n); - } + w.write(bs.clone().into()).await?; w.close().await?; @@ -375,10 +357,7 @@ mod tests { rng.fill_bytes(&mut expected); let bs = Bytes::from(expected.clone()); - // The MockWriter always returns the first chunk size. - let n = w.write(bs.into()).await?; - assert_eq!(expected.len(), n); - + w.write(bs.into()).await?; w.close().await?; let buf = buf.lock().await; @@ -413,14 +392,13 @@ mod tests { // content > chunk size. let content = new_content(15); - assert_eq!(15, w.write(content.into()).await?); + w.write(content.into()).await?; // content < chunk size. let content = new_content(5); - assert_eq!(5, w.write(content.into()).await?); + w.write(content.into()).await?; // content > chunk size, but 5 bytes in queue. let content = new_content(15); - // The MockWriter can send all 15 bytes together, so we can only advance 5 bytes. - assert_eq!(15, w.write(content.clone().into()).await?); + w.write(content.clone().into()).await?; w.close().await?; @@ -456,16 +434,16 @@ mod tests { // content > chunk size. let content = new_content(15); - assert_eq!(15, w.write(content.into()).await?); + w.write(content.into()).await?; // content < chunk size. let content = new_content(5); - assert_eq!(5, w.write(content.into()).await?); + w.write(content.into()).await?; // content < chunk size. let content = new_content(3); - assert_eq!(3, w.write(content.into()).await?); + w.write(content.into()).await?; // content > chunk size, but can send all chunks in the queue. let content = new_content(15); - assert_eq!(15, w.write(content.clone().into()).await?); + w.write(content.clone().into()).await?; w.close().await?; @@ -539,10 +517,10 @@ mod tests { // content < chunk size. let content = new_content(5); - assert_eq!(5, w.write(content.into()).await?); + w.write(content.into()).await?; // Non-contiguous buffer. let content = Buffer::from(vec![new_content(3), new_content(2)]); - assert_eq!(5, w.write(content).await?); + w.write(content).await?; w.close().await?; @@ -584,10 +562,7 @@ mod tests { expected.extend_from_slice(&content); let mut bs = Bytes::from(content.clone()); - while !bs.is_empty() { - let n = writer.write(bs.clone().into()).await?; - bs.advance(n); - } + writer.write(bs.clone().into()).await?; } writer.close().await?; diff --git a/core/src/types/write/buffer_sink.rs b/core/src/types/write/buffer_sink.rs index 46d9112530..cc42e09090 100644 --- a/core/src/types/write/buffer_sink.rs +++ b/core/src/types/write/buffer_sink.rs @@ -20,8 +20,6 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use bytes::Buf; - use crate::raw::*; use crate::*; @@ -35,7 +33,7 @@ pub struct BufferSink { enum State { Idle(Option<WriteGenerator<oio::Writer>>), - Writing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<usize>)>), + Writing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<()>)>), Closing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<()>)>), } @@ -92,8 +90,8 @@ impl futures::Sink<Buffer> for BufferSink { let (w, res) = ready!(fut.as_mut().poll(cx)); this.state = State::Idle(Some(w)); match res { - Ok(n) => { - this.buf.advance(n); + Ok(_) => { + this.buf = Buffer::new(); } Err(err) => return Poll::Ready(Err(err)), } @@ -139,8 +137,8 @@ impl futures::Sink<Buffer> for BufferSink { let (w, res) = ready!(fut.as_mut().poll(cx)); this.state = State::Idle(Some(w)); match res { - Ok(n) => { - this.buf.advance(n); + Ok(_) => { + this.buf = Buffer::new(); } Err(err) => return Poll::Ready(Err(err)), } diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs index dc81d3d716..44755ed37c 100644 --- a/core/src/types/write/writer.rs +++ b/core/src/types/write/writer.rs @@ -136,12 +136,7 @@ impl Writer { /// } /// ``` pub async fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> { - let mut bs = bs.into(); - while !bs.is_empty() { - let n = self.inner.write(bs.clone()).await?; - bs.advance(n); - } - Ok(()) + self.inner.write(bs.into()).await } /// Write [`bytes::Buf`] into inner writer. @@ -153,11 +148,8 @@ impl Writer { /// Optimize this function to avoid unnecessary copy. pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> { let mut bs = bs; - let mut bs = Buffer::from(bs.copy_to_bytes(bs.remaining())); - while !bs.is_empty() { - let n = self.inner.write(bs.clone()).await?; - bs.advance(n); - } + let bs = Buffer::from(bs.copy_to_bytes(bs.remaining())); + self.inner.write(bs).await?; Ok(()) }
