This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch make-write-all
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 911ef20b98f9a49040991e3d40399efac4deac36
Author: Xuanwo <[email protected]>
AuthorDate: Thu Jul 11 16:47:23 2024 +0800

    Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/async_backtrace.rs       |  4 +-
 core/src/layers/await_tree.rs            |  4 +-
 core/src/layers/blocking.rs              |  2 +-
 core/src/layers/dtrace.rs                | 14 ++---
 core/src/layers/metrics.rs               | 17 +++---
 core/src/layers/minitrace.rs             |  4 +-
 core/src/layers/oteltrace.rs             |  4 +-
 core/src/layers/prometheus.rs            | 20 ++++---
 core/src/layers/prometheus_client.rs     | 16 +++---
 core/src/layers/retry.rs                 |  4 +-
 core/src/layers/throttle.rs              |  4 +-
 core/src/layers/tracing.rs               |  4 +-
 core/src/services/aliyun_drive/writer.rs |  6 +-
 core/src/services/alluxio/writer.rs      |  7 +--
 core/src/services/compfs/writer.rs       | 24 ++++----
 core/src/services/fs/writer.rs           | 19 +++++--
 core/src/services/ftp/writer.rs          | 25 +++++----
 core/src/services/ghac/writer.rs         |  4 +-
 core/src/services/hdfs/writer.rs         | 18 ++++--
 core/src/services/hdfs_native/writer.rs  |  2 +-
 core/src/services/sftp/writer.rs         | 13 ++++-
 core/src/types/context/write.rs          | 94 +++-----------------------------
 22 files changed, 133 insertions(+), 176 deletions(-)

diff --git a/core/src/layers/async_backtrace.rs 
b/core/src/layers/async_backtrace.rs
index 6d77591ca0..290171c29c 100644
--- a/core/src/layers/async_backtrace.rs
+++ b/core/src/layers/async_backtrace.rs
@@ -169,7 +169,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
AsyncBacktraceWrapper<R> {
 
 impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> {
     #[async_backtrace::framed]
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs).await
     }
 
@@ -185,7 +185,7 @@ impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> 
{
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for AsyncBacktraceWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs
index 7bb20d42fb..58fd73e8fe 100644
--- a/core/src/layers/await_tree.rs
+++ b/core/src/layers/await_tree.rs
@@ -191,7 +191,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
AwaitTreeWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + 
MaybeSend {
+    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend {
         self.inner
             .write(bs)
             .instrument_await(format!("opendal::{}", 
WriteOperation::Write.into_static()))
@@ -211,7 +211,7 @@ impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for AwaitTreeWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 70830981c8..7293ced965 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -288,7 +288,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for 
BlockingWrapper<I> {
 }
 
 impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.handle.block_on(self.inner.write(bs))
     }
 
diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs
index c71277ed9a..51b001313b 100644
--- a/core/src/layers/dtrace.rs
+++ b/core/src/layers/dtrace.rs
@@ -379,15 +379,14 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
DtraceLayerWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let c_path = CString::new(self.path.clone()).unwrap();
         probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
         self.inner
             .write(bs)
             .await
-            .map(|n| {
-                probe_lazy!(opendal, writer_write_ok, c_path.as_ptr(), n);
-                n
+            .map(|_| {
+                probe_lazy!(opendal, writer_write_ok, c_path.as_ptr());
             })
             .map_err(|err| {
                 probe_lazy!(opendal, writer_write_error, c_path.as_ptr());
@@ -427,14 +426,13 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let c_path = CString::new(self.path.clone()).unwrap();
         probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr());
         self.inner
             .write(bs)
-            .map(|n| {
-                probe_lazy!(opendal, blocking_writer_write_ok, 
c_path.as_ptr(), n);
-                n
+            .map(|_| {
+                probe_lazy!(opendal, blocking_writer_write_ok, 
c_path.as_ptr());
             })
             .map_err(|err| {
                 probe_lazy!(opendal, blocking_writer_write_error, 
c_path.as_ptr());
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 47d7a8a936..decbad81a8 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -785,17 +785,17 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let start = Instant::now();
+        let size = bs.len();
 
         self.inner
             .write(bs)
             .await
-            .map(|n| {
-                self.bytes_counter.increment(n as u64);
+            .map(|_| {
+                self.bytes_counter.increment(size as u64);
                 self.requests_duration_seconds
                     .record(start.elapsed().as_secs_f64());
-                n
             })
             .map_err(|err| {
                 self.handle.increment_errors_total(self.op, err.kind());
@@ -819,12 +819,13 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
+        let size = bs.len();
+
         self.inner
             .write(bs)
-            .map(|n| {
-                self.bytes_counter.increment(n as u64);
-                n
+            .map(|_| {
+                self.bytes_counter.increment(size as u64);
             })
             .map_err(|err| {
                 self.handle.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 983d18b6b8..bca4a2e584 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -306,7 +306,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + 
MaybeSend {
+    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend {
         let _g = self.span.set_local_parent();
         let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
         self.inner.write(bs)
@@ -326,7 +326,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let _g = self.span.set_local_parent();
         let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
         self.inner.write(bs)
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 6fb5d582ba..b04ad22fdd 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -284,7 +284,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
OtelTraceWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + 
MaybeSend {
+    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend {
         self.inner.write(bs)
     }
 
@@ -298,7 +298,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index ee740d55c4..a4359ead72 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -742,7 +742,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        let size = bs.len();
+
         let labels = self.stats.generate_metric_label(
             self.scheme.into_static(),
             WriteOperation::Write.into_static(),
@@ -758,12 +760,12 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
         timer.observe_duration();
 
         match res {
-            Ok(n) => {
+            Ok(_) => {
                 self.stats
                     .bytes_total
                     .with_label_values(&labels)
-                    .observe(n as f64);
-                Ok(n)
+                    .observe(size as f64);
+                Ok(())
             }
             Err(err) => {
                 self.stats.increment_errors_total(self.op, err.kind());
@@ -822,7 +824,9 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
+        let size = bs.len();
+
         let labels = self.stats.generate_metric_label(
             self.scheme.into_static(),
             Operation::BlockingWrite.into_static(),
@@ -838,12 +842,12 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
PrometheusMetricWrapper<R> {
         timer.observe_duration();
 
         match res {
-            Ok(n) => {
+            Ok(_) => {
                 self.stats
                     .bytes_total
                     .with_label_values(&labels)
-                    .observe(n as f64);
-                Ok(n)
+                    .observe(size as f64);
+                Ok(())
             }
             Err(err) => {
                 self.stats.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/prometheus_client.rs 
b/core/src/layers/prometheus_client.rs
index 463e409275..58fef7e894 100644
--- a/core/src/layers/prometheus_client.rs
+++ b/core/src/layers/prometheus_client.rs
@@ -627,24 +627,24 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let start = Instant::now();
+        let size = bs.len();
 
         self.inner
             .write(bs)
             .await
-            .map(|n| {
+            .map(|_| {
                 self.metrics.observe_bytes_total(
                     self.scheme,
                     WriteOperation::Write.into_static(),
-                    n,
+                    size,
                 );
                 self.metrics.observe_request_duration(
                     self.scheme,
                     WriteOperation::Write.into_static(),
                     start.elapsed(),
                 );
-                n
             })
             .map_err(|err| {
                 self.metrics.increment_errors_total(
@@ -704,23 +704,23 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let start = Instant::now();
+        let size = bs.len();
 
         self.inner
             .write(bs)
-            .map(|n| {
+            .map(|_| {
                 self.metrics.observe_bytes_total(
                     self.scheme,
                     WriteOperation::BlockingWrite.into_static(),
-                    n,
+                    size,
                 );
                 self.metrics.observe_request_duration(
                     self.scheme,
                     WriteOperation::BlockingWrite.into_static(),
                     start.elapsed(),
                 );
-                n
             })
             .map_err(|err| {
                 self.metrics.increment_errors_total(
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 5bc37e6145..4d42098394 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -938,8 +938,8 @@ mod tests {
     struct MockWriter {}
 
     impl oio::Write for MockWriter {
-        async fn write(&mut self, bs: Buffer) -> Result<usize> {
-            Ok(bs.len())
+        async fn write(&mut self, _: Buffer) -> Result<()> {
+            Ok(())
         }
 
         async fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 3b66e327f3..f73f33d3ca 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -191,7 +191,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ThrottleWrapper<R> {
 }
 
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
 
         loop {
@@ -226,7 +226,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
 
         loop {
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 4ba829a6ec..4a2dc4bc00 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -286,7 +286,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + 
MaybeSend {
+    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend {
         self.inner.write(bs)
     }
 
@@ -312,7 +312,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, bs: Buffer) -> Result<()> {
         self.inner.write(bs)
     }
 
diff --git a/core/src/services/aliyun_drive/writer.rs 
b/core/src/services/aliyun_drive/writer.rs
index 30ca2ef94e..461764541f 100644
--- a/core/src/services/aliyun_drive/writer.rs
+++ b/core/src/services/aliyun_drive/writer.rs
@@ -51,7 +51,7 @@ impl AliyunDriveWriter {
 }
 
 impl oio::Write for AliyunDriveWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let (upload_id, file_id) = match (self.upload_id.as_ref(), 
self.file_id.as_ref()) {
             (Some(upload_id), Some(file_id)) => (upload_id, file_id),
             _ => {
@@ -94,8 +94,6 @@ impl oio::Write for AliyunDriveWriter {
             return Err(Error::new(ErrorKind::Unexpected, "cannot find 
upload_url"));
         };
 
-        let size = bs.len();
-
         if let Err(err) = self.core.upload(upload_url, bs).await {
             if err.kind() != ErrorKind::AlreadyExists {
                 return Err(err);
@@ -104,7 +102,7 @@ impl oio::Write for AliyunDriveWriter {
 
         self.part_number += 1;
 
-        Ok(size)
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/alluxio/writer.rs 
b/core/src/services/alluxio/writer.rs
index e5ca807b60..f452b6b10b 100644
--- a/core/src/services/alluxio/writer.rs
+++ b/core/src/services/alluxio/writer.rs
@@ -43,7 +43,7 @@ impl AlluxioWriter {
 }
 
 impl oio::Write for AlluxioWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let stream_id = match self.stream_id {
             Some(stream_id) => stream_id,
             None => {
@@ -52,9 +52,8 @@ impl oio::Write for AlluxioWriter {
                 stream_id
             }
         };
-        self.core
-            .write(stream_id, Buffer::from(bs.to_bytes()))
-            .await
+        self.core.write(stream_id, bs).await?;
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/compfs/writer.rs 
b/core/src/services/compfs/writer.rs
index 2e12ea2dc7..749cab10ff 100644
--- a/core/src/services/compfs/writer.rs
+++ b/core/src/services/compfs/writer.rs
@@ -15,13 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{io::Cursor, sync::Arc};
-
-use compio::{buf::buf_try, fs::File, io::AsyncWrite};
-
 use super::core::CompfsCore;
 use crate::raw::*;
 use crate::*;
+use compio::io::AsyncWriteExt;
+use compio::{buf::buf_try, fs::File};
+use std::{io::Cursor, sync::Arc};
 
 #[derive(Debug)]
 pub struct CompfsWriter {
@@ -36,17 +35,22 @@ impl CompfsWriter {
 }
 
 impl oio::Write for CompfsWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    /// FIXME
+    ///
+    /// the write_all doesn't work correctly if `bs` is non-contiguous.
+    ///
+    /// The IoBuf::buf_len() only returns the length of the current buffer.
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let mut file = self.file.clone();
 
-        let n = self
-            .core
+        self.core
             .exec(move || async move {
-                let (n, _) = buf_try!(@try file.write(bs).await);
-                Ok(n)
+                buf_try!(@try file.write_all(bs).await);
+                Ok(())
             })
             .await?;
-        Ok(n)
+
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index b9e84d736c..d0d1bc6674 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -53,11 +53,15 @@ impl<F> FsWriter<F> {
 unsafe impl<F> Sync for FsWriter<F> {}
 
 impl oio::Write for FsWriter<tokio::fs::File> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let f = self.f.as_mut().expect("FsWriter must be initialized");
 
-        // TODO: use write_vectored instead.
-        f.write(bs.chunk()).await.map_err(new_std_io_error)
+        while bs.has_remaining() {
+            let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -88,10 +92,15 @@ impl oio::Write for FsWriter<tokio::fs::File> {
 }
 
 impl oio::BlockingWrite for FsWriter<std::fs::File> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let f = self.f.as_mut().expect("FsWriter must be initialized");
 
-        f.write(bs.chunk()).map_err(new_std_io_error)
+        while bs.has_remaining() {
+            let n = f.write(bs.chunk()).map_err(new_std_io_error)?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index ec21609c18..f7f6864dd9 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -53,7 +53,7 @@ impl FtpWriter {
 }
 
 impl oio::Write for FtpWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let path = if let Some(tmp_path) = &self.tmp_path {
             tmp_path
         } else {
@@ -69,17 +69,20 @@ impl oio::Write for FtpWriter {
             ));
         }
 
-        let size = self
-            .data_stream
-            .as_mut()
-            .unwrap()
-            .write(bs.chunk())
-            .await
-            .map_err(|err| {
-                Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
-            })?;
+        while bs.has_remaining() {
+            let n = self
+                .data_stream
+                .as_mut()
+                .unwrap()
+                .write(bs.chunk())
+                .await
+                .map_err(|err| {
+                    Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
+                })?;
+            bs.advance(n);
+        }
 
-        Ok(size)
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 00bce7fac3..3b58dd0fc2 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -38,7 +38,7 @@ impl GhacWriter {
 }
 
 impl oio::Write for GhacWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
         let size = bs.len();
         let offset = self.size;
 
@@ -61,7 +61,7 @@ impl oio::Write for GhacWriter {
         }
 
         self.size += size as u64;
-        Ok(size)
+        Ok(())
     }
 
     async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 96708c1ab1..0f60014f41 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -53,10 +53,15 @@ impl<F> HdfsWriter<F> {
 }
 
 impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let f = self.f.as_mut().expect("HdfsWriter must be initialized");
 
-        f.write(bs.chunk()).await.map_err(new_std_io_error)
+        while bs.has_remaining() {
+            let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -82,9 +87,14 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
 }
 
 impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
-    fn write(&mut self, bs: Buffer) -> Result<usize> {
+    fn write(&mut self, mut bs: Buffer) -> Result<()> {
         let f = self.f.as_mut().expect("HdfsWriter must be initialized");
-        f.write(bs.chunk()).map_err(new_std_io_error)
+        while bs.has_remaining() {
+            let n = f.write(bs.chunk()).map_err(new_std_io_error)?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/hdfs_native/writer.rs 
b/core/src/services/hdfs_native/writer.rs
index e6fb0205e4..4cab45b3be 100644
--- a/core/src/services/hdfs_native/writer.rs
+++ b/core/src/services/hdfs_native/writer.rs
@@ -31,7 +31,7 @@ impl HdfsNativeWriter {
 }
 
 impl oio::Write for HdfsNativeWriter {
-    async fn write(&mut self, _bs: Buffer) -> Result<usize> {
+    async fn write(&mut self, _bs: Buffer) -> Result<()> {
         todo!()
     }
 
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 69bff86245..9b86c78957 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -39,8 +39,17 @@ impl SftpWriter {
 }
 
 impl oio::Write for SftpWriter {
-    async fn write(&mut self, bs: Buffer) -> Result<usize> {
-        self.file.write(bs.chunk()).await.map_err(new_std_io_error)
+    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
+        while bs.has_remaining() {
+            let n = self
+                .file
+                .write(bs.chunk())
+                .await
+                .map_err(new_std_io_error)?;
+            bs.advance(n);
+        }
+
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index 92248762cf..af875c14ed 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -278,7 +278,7 @@ impl WriteGenerator<oio::BlockingWriter> {
 mod tests {
     use super::*;
     use crate::raw::oio::Write;
-    use bytes::Buf;
+    use bytes::BufMut;
     use bytes::Bytes;
     use log::debug;
     use pretty_assertions::assert_eq;
@@ -294,13 +294,12 @@ mod tests {
     }
 
     impl Write for MockWriter {
-        async fn write(&mut self, bs: Buffer) -> Result<usize> {
+        async fn write(&mut self, bs: Buffer) -> Result<()> {
             debug!("test_fuzz_exact_buf_writer: flush size: {}", &bs.len());
 
-            let chunk = bs.chunk();
             let mut buf = self.buf.lock().await;
-            buf.extend_from_slice(chunk);
-            Ok(chunk.len())
+            buf.put(bs);
+            Ok(())
         }
 
         async fn close(&mut self) -> Result<()> {
@@ -327,8 +326,8 @@ mod tests {
         let buf = Arc::new(Mutex::new(vec![]));
         let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() 
}), Some(10), true);
 
-        let mut bs = Bytes::from(expected.clone());
-        w.write(bs.clone().into()).await?;
+        let bs = Bytes::from(expected.clone());
+        w.write(bs.into()).await?;
 
         w.close().await?;
 
@@ -456,83 +455,6 @@ mod tests {
         Ok(())
     }
 
-    struct PartialWriter {
-        buf: Arc<Mutex<Vec<u8>>>,
-    }
-
-    impl Write for PartialWriter {
-        async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
-            let mut buf = self.buf.lock().await;
-
-            if Buffer::count(&bs) > 1 {
-                // Always leaves last buffer for non-contiguous buffer.
-                let mut written = 0;
-                while Buffer::count(&bs) > 1 {
-                    let chunk = bs.chunk();
-                    buf.extend_from_slice(chunk);
-                    written += chunk.len();
-                    bs.advance(chunk.len());
-                }
-                Ok(written)
-            } else {
-                let chunk = bs.chunk();
-                buf.extend_from_slice(chunk);
-                Ok(chunk.len())
-            }
-        }
-
-        async fn close(&mut self) -> Result<()> {
-            Ok(())
-        }
-
-        async fn abort(&mut self) -> Result<()> {
-            Ok(())
-        }
-    }
-
-    #[tokio::test]
-    async fn test_inexact_buf_writer_partial_send() -> Result<()> {
-        let _ = tracing_subscriber::fmt()
-            .pretty()
-            .with_test_writer()
-            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
-            .try_init();
-
-        let buf = Arc::new(Mutex::new(vec![]));
-        let mut w = WriteGenerator::new(
-            Box::new(PartialWriter { buf: buf.clone() }),
-            Some(10),
-            false,
-        );
-
-        let mut rng = thread_rng();
-        let mut expected = vec![];
-
-        let mut new_content = |size| {
-            let mut content = vec![0; size];
-            rng.fill_bytes(&mut content);
-            expected.extend_from_slice(&content);
-            Bytes::from(content)
-        };
-
-        // content < chunk size.
-        let content = new_content(5);
-        w.write(content.into()).await?;
-        // Non-contiguous buffer.
-        let content = Buffer::from(vec![new_content(3), new_content(2)]);
-        w.write(content).await?;
-
-        w.close().await?;
-
-        let buf = buf.lock().await;
-        assert_eq!(buf.len(), expected.len());
-        assert_eq!(
-            format!("{:x}", Sha256::digest(&*buf)),
-            format!("{:x}", Sha256::digest(&expected))
-        );
-        Ok(())
-    }
-
     #[tokio::test]
     async fn test_fuzz_exact_buf_writer() -> Result<()> {
         let _ = tracing_subscriber::fmt()
@@ -561,8 +483,8 @@ mod tests {
 
             expected.extend_from_slice(&content);
 
-            let mut bs = Bytes::from(content.clone());
-            writer.write(bs.clone().into()).await?;
+            let bs = Bytes::from(content.clone());
+            writer.write(bs.into()).await?;
         }
         writer.close().await?;
 

Reply via email to