This is an automated email from the ASF dual-hosted git repository.

meteorgan pushed a commit to branch compfs
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 49e3099f12f226e05c9eccae8f057c61b705adbe
Author: meteorgan <[email protected]>
AuthorDate: Thu Nov 6 23:14:31 2025 +0800

    feat(services/compfs): implement IoVectoredBuf for Buffer
---
 core/src/services/compfs/core.rs   | 46 +++++---------------------------------
 core/src/services/compfs/writer.rs |  9 +-------
 core/tests/behavior/async_write.rs | 39 +++++++++++++++++++++++++++++---
 3 files changed, 43 insertions(+), 51 deletions(-)

diff --git a/core/src/services/compfs/core.rs b/core/src/services/compfs/core.rs
index 4f8dcec42..6151ac8e7 100644
--- a/core/src/services/compfs/core.rs
+++ b/core/src/services/compfs/core.rs
@@ -19,7 +19,7 @@ use std::future::Future;
 use std::path::PathBuf;
 use std::sync::Arc;
 
-use compio::buf::IoBuf;
+use compio::buf::{IoBuf, IoBuffer, IoVectoredBuf};
 use compio::dispatcher::Dispatcher;
 
 use crate::raw::*;
@@ -81,45 +81,11 @@ impl CompfsCore {
     }
 }
 
-// TODO: impl IoVectoredBuf for Buffer
-// impl IoVectoredBuf for Buffer {
-//     fn as_dyn_bufs(&self) -> impl Iterator<Item = &dyn IoBuf> {}
-//
-//     fn owned_iter(self) -> Result<OwnedIter<impl OwnedIterator<Inner = 
Self>>, Self> {
-//         Ok(OwnedIter::new(BufferIter {
-//             current: self.current(),
-//             buf: self,
-//         }))
-//     }
-// }
-
-// #[derive(Debug, Clone)]
-// struct BufferIter {
-//     buf: Buffer,
-//     current: Bytes,
-// }
-
-// impl IntoInner for BufferIter {
-//     type Inner = Buffer;
-//
-//     fn into_inner(self) -> Self::Inner {
-//         self.buf
-//     }
-// }
-
-// impl OwnedIterator for BufferIter {
-//     fn next(mut self) -> Result<Self, Self::Inner> {
-//         let Some(current) = self.buf.next() else {
-//             return Err(self.buf);
-//         };
-//         self.current = current;
-//         Ok(self)
-//     }
-//
-//     fn current(&self) -> &dyn IoBuf {
-//         &self.current
-//     }
-// }
+impl IoVectoredBuf for Buffer {
+    unsafe fn iter_io_buffer(&self) -> impl Iterator<Item = IoBuffer> {
+        self.clone().map(|b| unsafe { b.as_io_buffer() })
+    }
+}
 
 #[cfg(test)]
 mod tests {
diff --git a/core/src/services/compfs/writer.rs 
b/core/src/services/compfs/writer.rs
index e4573f157..50269efd2 100644
--- a/core/src/services/compfs/writer.rs
+++ b/core/src/services/compfs/writer.rs
@@ -42,11 +42,6 @@ impl CompfsWriter {
 }
 
 impl oio::Write for CompfsWriter {
-    /// FIXME
-    ///
-    /// the write_all doesn't work correctly if `bs` is non-contiguous.
-    ///
-    /// The IoBuf::buf_len() only returns the length of the current buffer.
     async fn write(&mut self, bs: Buffer) -> Result<()> {
         let Some(mut file) = self.file.clone() else {
             return Err(Error::new(ErrorKind::Unexpected, "file has closed"));
@@ -55,9 +50,7 @@ impl oio::Write for CompfsWriter {
         let pos = self
             .core
             .exec(move || async move {
-                for b in bs {
-                    buf_try!(@try file.write_all(b).await);
-                }
+                buf_try!(@try file.write_vectored_all(bs).await);
                 Ok(file.position())
             })
             .await?;
diff --git a/core/tests/behavior/async_write.rs 
b/core/tests/behavior/async_write.rs
index b98675897..f1264a559 100644
--- a/core/tests/behavior/async_write.rs
+++ b/core/tests/behavior/async_write.rs
@@ -59,7 +59,8 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
             test_writer_abort_with_concurrent,
             test_writer_futures_copy,
             test_writer_futures_copy_with_concurrent,
-            test_writer_return_metadata
+            test_writer_return_metadata,
+            test_writer_write_non_contiguous_data
         ))
     }
 
@@ -758,7 +759,7 @@ pub async fn test_write_with_if_none_match(op: Operator) -> 
Result<()> {
     Ok(())
 }
 
-/// Write an file with if_not_exists will get a ConditionNotMatch error if 
file exists.
+/// Write a file with if_not_exists will get a ConditionNotMatch error if file 
exists.
 pub async fn test_write_with_if_not_exists(op: Operator) -> Result<()> {
     if !op.info().full_capability().write_with_if_not_exists {
         return Ok(());
@@ -782,7 +783,7 @@ pub async fn test_write_with_if_not_exists(op: Operator) -> 
Result<()> {
     Ok(())
 }
 
-/// Write an file with if_match will get a ConditionNotMatch error if file's 
etag does not match.
+/// Write a file with if_match will get a ConditionNotMatch error if file's 
etag does not match.
 pub async fn test_write_with_if_match(op: Operator) -> Result<()> {
     if !op.info().full_capability().write_with_if_match {
         return Ok(());
@@ -819,3 +820,35 @@ pub async fn test_write_with_if_match(op: Operator) -> 
Result<()> {
 
     Ok(())
 }
+
+pub async fn test_writer_write_non_contiguous_data(op: Operator) -> Result<()> 
{
+    let path = TEST_FIXTURE.new_file_path();
+    let size = 5 * 1024 * 1024; // write file with 5 MiB
+    let content_a = gen_fixed_bytes(size);
+    let digest_a = Sha256::digest(&content_a);
+    let content_b = gen_fixed_bytes(size);
+    let digest_b = Sha256::digest(&content_b);
+
+    let mut w = op.writer(&path).await?;
+    w.write(vec![Bytes::from(content_a), Bytes::from(content_b)])
+        .await?;
+    w.close().await?;
+
+    let meta = op.stat(&path).await.expect("stat must succeed");
+    assert_eq!(meta.content_length(), (size * 2) as u64);
+
+    let bs = op.read(&path).await?.to_bytes();
+    assert_eq!(bs.len(), size * 2, "read size");
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[..size])),
+        format!("{:x}", digest_a),
+        "read content a"
+    );
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[size..])),
+        format!("{:x}", digest_b),
+        "read content b"
+    );
+
+    Ok(())
+}

Reply via email to