This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch make-write-all in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 911ef20b98f9a49040991e3d40399efac4deac36 Author: Xuanwo <[email protected]> AuthorDate: Thu Jul 11 16:47:23 2024 +0800 Fix build Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/async_backtrace.rs | 4 +- core/src/layers/await_tree.rs | 4 +- core/src/layers/blocking.rs | 2 +- core/src/layers/dtrace.rs | 14 ++--- core/src/layers/metrics.rs | 17 +++--- core/src/layers/minitrace.rs | 4 +- core/src/layers/oteltrace.rs | 4 +- core/src/layers/prometheus.rs | 20 ++++--- core/src/layers/prometheus_client.rs | 16 +++--- core/src/layers/retry.rs | 4 +- core/src/layers/throttle.rs | 4 +- core/src/layers/tracing.rs | 4 +- core/src/services/aliyun_drive/writer.rs | 6 +- core/src/services/alluxio/writer.rs | 7 +-- core/src/services/compfs/writer.rs | 24 ++++---- core/src/services/fs/writer.rs | 19 +++++-- core/src/services/ftp/writer.rs | 25 +++++---- core/src/services/ghac/writer.rs | 4 +- core/src/services/hdfs/writer.rs | 18 ++++-- core/src/services/hdfs_native/writer.rs | 2 +- core/src/services/sftp/writer.rs | 13 ++++- core/src/types/context/write.rs | 94 +++----------------------------- 22 files changed, 133 insertions(+), 176 deletions(-) diff --git a/core/src/layers/async_backtrace.rs b/core/src/layers/async_backtrace.rs index 6d77591ca0..290171c29c 100644 --- a/core/src/layers/async_backtrace.rs +++ b/core/src/layers/async_backtrace.rs @@ -169,7 +169,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for AsyncBacktraceWrapper<R> { impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> { #[async_backtrace::framed] - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs).await } @@ -185,7 +185,7 @@ impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for AsyncBacktraceWrapper<R> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs index 7bb20d42fb..58fd73e8fe 100644 --- a/core/src/layers/await_tree.rs +++ b/core/src/layers/await_tree.rs @@ -191,7 +191,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for AwaitTreeWrapper<R> { } impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> { - fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend { self.inner .write(bs) .instrument_await(format!("opendal::{}", WriteOperation::Write.into_static())) @@ -211,7 +211,7 @@ impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for AwaitTreeWrapper<R> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 70830981c8..7293ced965 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -288,7 +288,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> { } impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { self.handle.block_on(self.inner.write(bs)) } diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs index c71277ed9a..51b001313b 100644 --- a/core/src/layers/dtrace.rs +++ b/core/src/layers/dtrace.rs @@ -379,15 +379,14 @@ impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> { } impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, writer_write_start, c_path.as_ptr()); self.inner .write(bs) .await - .map(|n| { - probe_lazy!(opendal, writer_write_ok, c_path.as_ptr(), n); - n + .map(|_| { + probe_lazy!(opendal, writer_write_ok, c_path.as_ptr()); }) .map_err(|err| { probe_lazy!(opendal, writer_write_error, c_path.as_ptr()); @@ -427,14 +426,13 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr()); self.inner .write(bs) - .map(|n| { - probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr(), n); - n + .map(|_| { + probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr()); }) .map_err(|err| { probe_lazy!(opendal, blocking_writer_write_error, c_path.as_ptr()); diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 47d7a8a936..decbad81a8 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -785,17 +785,17 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> { } impl<R: oio::Write> oio::Write for MetricWrapper<R> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let start = Instant::now(); + let size = bs.len(); self.inner .write(bs) .await - .map(|n| { - self.bytes_counter.increment(n as u64); + .map(|_| { + self.bytes_counter.increment(size as u64); self.requests_duration_seconds .record(start.elapsed().as_secs_f64()); - n }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); @@ -819,12 +819,13 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + self.inner .write(bs) - .map(|n| { - self.bytes_counter.increment(n as u64); - n + .map(|_| { + self.bytes_counter.increment(size as u64); }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 983d18b6b8..bca4a2e584 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -306,7 +306,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> { } impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { - fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static()); self.inner.write(bs) @@ -326,7 +326,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static()); self.inner.write(bs) diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 6fb5d582ba..b04ad22fdd 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -284,7 +284,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> { } impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { - fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend { self.inner.write(bs) } @@ -298,7 +298,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index ee740d55c4..a4359ead72 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -742,7 +742,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { } impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + let labels = self.stats.generate_metric_label( self.scheme.into_static(), WriteOperation::Write.into_static(), @@ -758,12 +760,12 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { timer.observe_duration(); match res { - Ok(n) => { + Ok(_) => { self.stats .bytes_total .with_label_values(&labels) - .observe(n as f64); - Ok(n) + .observe(size as f64); + Ok(()) } Err(err) => { self.stats.increment_errors_total(self.op, err.kind()); @@ -822,7 +824,9 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + let labels = self.stats.generate_metric_label( self.scheme.into_static(), Operation::BlockingWrite.into_static(), @@ -838,12 +842,12 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> { timer.observe_duration(); match res { - Ok(n) => { + Ok(_) => { self.stats .bytes_total .with_label_values(&labels) - .observe(n as f64); - Ok(n) + .observe(size as f64); + Ok(()) } Err(err) => { self.stats.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 463e409275..58fef7e894 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -627,24 +627,24 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { } impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let start = Instant::now(); + let size = bs.len(); self.inner .write(bs) .await - .map(|n| { + .map(|_| { self.metrics.observe_bytes_total( self.scheme, WriteOperation::Write.into_static(), - n, + size, ); self.metrics.observe_request_duration( self.scheme, WriteOperation::Write.into_static(), start.elapsed(), ); - n }) .map_err(|err| { self.metrics.increment_errors_total( @@ -704,23 +704,23 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { let start = Instant::now(); + let size = bs.len(); self.inner .write(bs) - .map(|n| { + .map(|_| { self.metrics.observe_bytes_total( self.scheme, WriteOperation::BlockingWrite.into_static(), - n, + size, ); self.metrics.observe_request_duration( self.scheme, WriteOperation::BlockingWrite.into_static(), start.elapsed(), ); - n }) .map_err(|err| { self.metrics.increment_errors_total( diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 5bc37e6145..4d42098394 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -938,8 +938,8 @@ mod tests { struct MockWriter {} impl oio::Write for MockWriter { - async fn write(&mut self, bs: Buffer) -> Result<usize> { - Ok(bs.len()) + async fn write(&mut self, _: Buffer) -> Result<()> { + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 3b66e327f3..f73f33d3ca 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -191,7 +191,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> { } impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); loop { @@ -226,7 +226,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { } impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); loop { diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 4ba829a6ec..4a2dc4bc00 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -286,7 +286,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend { self.inner.write(bs) } @@ -312,7 +312,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/services/aliyun_drive/writer.rs b/core/src/services/aliyun_drive/writer.rs index 30ca2ef94e..461764541f 100644 --- a/core/src/services/aliyun_drive/writer.rs +++ b/core/src/services/aliyun_drive/writer.rs @@ -51,7 +51,7 @@ impl AliyunDriveWriter { } impl oio::Write for AliyunDriveWriter { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let (upload_id, file_id) = match (self.upload_id.as_ref(), self.file_id.as_ref()) { (Some(upload_id), Some(file_id)) => (upload_id, file_id), _ => { @@ -94,8 +94,6 @@ impl oio::Write for AliyunDriveWriter { return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url")); }; - let size = bs.len(); - if let Err(err) = self.core.upload(upload_url, bs).await { if err.kind() != ErrorKind::AlreadyExists { return Err(err); @@ -104,7 +102,7 @@ impl oio::Write for AliyunDriveWriter { self.part_number += 1; - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/alluxio/writer.rs b/core/src/services/alluxio/writer.rs index e5ca807b60..f452b6b10b 100644 --- a/core/src/services/alluxio/writer.rs +++ b/core/src/services/alluxio/writer.rs @@ -43,7 +43,7 @@ impl AlluxioWriter { } impl oio::Write for AlluxioWriter { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let stream_id = match self.stream_id { Some(stream_id) => stream_id, None => { @@ -52,9 +52,8 @@ impl oio::Write for AlluxioWriter { stream_id } }; - self.core - .write(stream_id, Buffer::from(bs.to_bytes())) - .await + self.core.write(stream_id, bs).await?; + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/compfs/writer.rs b/core/src/services/compfs/writer.rs index 2e12ea2dc7..749cab10ff 100644 --- a/core/src/services/compfs/writer.rs +++ b/core/src/services/compfs/writer.rs @@ -15,13 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::{io::Cursor, sync::Arc}; - -use compio::{buf::buf_try, fs::File, io::AsyncWrite}; - use super::core::CompfsCore; use crate::raw::*; use crate::*; +use compio::io::AsyncWriteExt; +use compio::{buf::buf_try, fs::File}; +use std::{io::Cursor, sync::Arc}; #[derive(Debug)] pub struct CompfsWriter { @@ -36,17 +35,22 @@ impl CompfsWriter { } impl oio::Write for CompfsWriter { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + /// FIXME + /// + /// the write_all doesn't work correctly if `bs` is non-contiguous. + /// + /// The IoBuf::buf_len() only returns the length of the current buffer. + async fn write(&mut self, bs: Buffer) -> Result<()> { let mut file = self.file.clone(); - let n = self - .core + self.core .exec(move || async move { - let (n, _) = buf_try!(@try file.write(bs).await); - Ok(n) + buf_try!(@try file.write_all(bs).await); + Ok(()) }) .await?; - Ok(n) + + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index b9e84d736c..d0d1bc6674 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -53,11 +53,15 @@ impl<F> FsWriter<F> { unsafe impl<F> Sync for FsWriter<F> {} impl oio::Write for FsWriter<tokio::fs::File> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("FsWriter must be initialized"); - // TODO: use write_vectored instead. - f.write(bs.chunk()).await.map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -88,10 +92,15 @@ impl oio::Write for FsWriter<tokio::fs::File> { } impl oio::BlockingWrite for FsWriter<std::fs::File> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("FsWriter must be initialized"); - f.write(bs.chunk()).map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index ec21609c18..f7f6864dd9 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -53,7 +53,7 @@ impl FtpWriter { } impl oio::Write for FtpWriter { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, mut bs: Buffer) -> Result<()> { let path = if let Some(tmp_path) = &self.tmp_path { tmp_path } else { @@ -69,17 +69,20 @@ impl oio::Write for FtpWriter { )); } - let size = self - .data_stream - .as_mut() - .unwrap() - .write(bs.chunk()) - .await - .map_err(|err| { - Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) - })?; + while bs.has_remaining() { + let n = self + .data_stream + .as_mut() + .unwrap() + .write(bs.chunk()) + .await + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) + })?; + bs.advance(n); + } - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 00bce7fac3..3b58dd0fc2 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -38,7 +38,7 @@ impl GhacWriter { } impl oio::Write for GhacWriter { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); let offset = self.size; @@ -61,7 +61,7 @@ impl oio::Write for GhacWriter { } self.size += size as u64; - Ok(size) + Ok(()) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 96708c1ab1..0f60014f41 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -53,10 +53,15 @@ impl<F> HdfsWriter<F> { } impl oio::Write for HdfsWriter<hdrs::AsyncFile> { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("HdfsWriter must be initialized"); - f.write(bs.chunk()).await.map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -82,9 +87,14 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { } impl oio::BlockingWrite for HdfsWriter<hdrs::File> { - fn write(&mut self, bs: Buffer) -> Result<usize> { + fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("HdfsWriter must be initialized"); - f.write(bs.chunk()).map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/hdfs_native/writer.rs b/core/src/services/hdfs_native/writer.rs index e6fb0205e4..4cab45b3be 100644 --- a/core/src/services/hdfs_native/writer.rs +++ b/core/src/services/hdfs_native/writer.rs @@ -31,7 +31,7 @@ impl HdfsNativeWriter { } impl oio::Write for HdfsNativeWriter { - async fn write(&mut self, _bs: Buffer) -> Result<usize> { + async fn write(&mut self, _bs: Buffer) -> Result<()> { todo!() } diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 69bff86245..9b86c78957 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -39,8 +39,17 @@ impl SftpWriter { } impl oio::Write for SftpWriter { - async fn write(&mut self, bs: Buffer) -> Result<usize> { - self.file.write(bs.chunk()).await.map_err(new_std_io_error) + async fn write(&mut self, mut bs: Buffer) -> Result<()> { + while bs.has_remaining() { + let n = self + .file + .write(bs.chunk()) + .await + .map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs index 92248762cf..af875c14ed 100644 --- a/core/src/types/context/write.rs +++ b/core/src/types/context/write.rs @@ -278,7 +278,7 @@ impl WriteGenerator<oio::BlockingWriter> { mod tests { use super::*; use crate::raw::oio::Write; - use bytes::Buf; + use bytes::BufMut; use bytes::Bytes; use log::debug; use pretty_assertions::assert_eq; @@ -294,13 +294,12 @@ mod tests { } impl Write for MockWriter { - async fn write(&mut self, bs: Buffer) -> Result<usize> { + async fn write(&mut self, bs: Buffer) -> Result<()> { debug!("test_fuzz_exact_buf_writer: flush size: {}", &bs.len()); - let chunk = bs.chunk(); let mut buf = self.buf.lock().await; - buf.extend_from_slice(chunk); - Ok(chunk.len()) + buf.put(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -327,8 +326,8 @@ mod tests { let buf = Arc::new(Mutex::new(vec![])); let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), true); - let mut bs = Bytes::from(expected.clone()); - w.write(bs.clone().into()).await?; + let bs = Bytes::from(expected.clone()); + w.write(bs.into()).await?; w.close().await?; @@ -456,83 +455,6 @@ mod tests { Ok(()) } - struct PartialWriter { - buf: Arc<Mutex<Vec<u8>>>, - } - - impl Write for PartialWriter { - async fn write(&mut self, mut bs: Buffer) -> Result<usize> { - let mut buf = self.buf.lock().await; - - if Buffer::count(&bs) > 1 { - // Always leaves last buffer for non-contiguous buffer. - let mut written = 0; - while Buffer::count(&bs) > 1 { - let chunk = bs.chunk(); - buf.extend_from_slice(chunk); - written += chunk.len(); - bs.advance(chunk.len()); - } - Ok(written) - } else { - let chunk = bs.chunk(); - buf.extend_from_slice(chunk); - Ok(chunk.len()) - } - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - } - - #[tokio::test] - async fn test_inexact_buf_writer_partial_send() -> Result<()> { - let _ = tracing_subscriber::fmt() - .pretty() - .with_test_writer() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - - let buf = Arc::new(Mutex::new(vec![])); - let mut w = WriteGenerator::new( - Box::new(PartialWriter { buf: buf.clone() }), - Some(10), - false, - ); - - let mut rng = thread_rng(); - let mut expected = vec![]; - - let mut new_content = |size| { - let mut content = vec![0; size]; - rng.fill_bytes(&mut content); - expected.extend_from_slice(&content); - Bytes::from(content) - }; - - // content < chunk size. - let content = new_content(5); - w.write(content.into()).await?; - // Non-contiguous buffer. - let content = Buffer::from(vec![new_content(3), new_content(2)]); - w.write(content).await?; - - w.close().await?; - - let buf = buf.lock().await; - assert_eq!(buf.len(), expected.len()); - assert_eq!( - format!("{:x}", Sha256::digest(&*buf)), - format!("{:x}", Sha256::digest(&expected)) - ); - Ok(()) - } - #[tokio::test] async fn test_fuzz_exact_buf_writer() -> Result<()> { let _ = tracing_subscriber::fmt() @@ -561,8 +483,8 @@ mod tests { expected.extend_from_slice(&content); - let mut bs = Bytes::from(content.clone()); - writer.write(bs.clone().into()).await?; + let bs = Bytes::from(content.clone()); + writer.write(bs.into()).await?; } writer.close().await?;
