This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a963fb0e fix(services/compfs): behavior async write (#5803)
7a963fb0e is described below

commit 7a963fb0ea41e2fc214ceb3194218c7d7096eb1e
Author: 王宇逸 <[email protected]>
AuthorDate: Tue Mar 18 18:22:07 2025 +0800

    fix(services/compfs): behavior async write (#5803)
---
 core/Cargo.lock                     | 77 ++++++++++++++++++++++++++++---------
 core/Cargo.toml                     |  2 +-
 core/src/services/compfs/backend.rs | 17 +++++---
 core/src/services/compfs/writer.rs  | 40 ++++++++++++-------
 4 files changed, 97 insertions(+), 39 deletions(-)

diff --git a/core/Cargo.lock b/core/Cargo.lock
index 151e8c439..beb76726e 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -1815,9 +1815,9 @@ dependencies = [
 
 [[package]]
 name = "compio"
-version = "0.12.0"
+version = "0.14.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "de9895d3b1b383334e6dd889618d555ecca48988cfd2be47c7ac8a98b0195c90"
+checksum = "6c52183b7eefcaa6441fe810885a34fcdec7378e2883673fc3d74ca6e9ff738b"
 dependencies = [
  "compio-buf",
  "compio-dispatcher",
@@ -1826,6 +1826,8 @@ dependencies = [
  "compio-io",
  "compio-log",
  "compio-net",
+ "compio-process",
+ "compio-quic",
  "compio-runtime",
  "compio-signal",
 ]
@@ -1843,9 +1845,9 @@ dependencies = [
 
 [[package]]
 name = "compio-dispatcher"
-version = "0.4.0"
+version = "0.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c6f9231b6ee942b5e69eda275fc84d72247055aad197c65ddf3c63cd3cb01d60"
+checksum = "6ae8fab55190537c8634232f395302011ce39c18facbd4b85363df41114677ac"
 dependencies = [
  "compio-driver",
  "compio-runtime",
@@ -1855,12 +1857,13 @@ dependencies = [
 
 [[package]]
 name = "compio-driver"
-version = "0.5.1"
+version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a6be49fe37cd203d925e3850522a47f453b4cb98960846be5e4ebae42e26a64c"
+checksum = "c9493b0c88b90d386bb3fd9b5618260d96ba2b09cab5c4a5ba50ec9b77f0711b"
 dependencies = [
  "aligned-array",
  "cfg-if",
+ "cfg_aliases",
  "compio-buf",
  "compio-log",
  "crossbeam-channel",
@@ -1869,7 +1872,6 @@ dependencies = [
  "io-uring 0.7.3",
  "libc",
  "once_cell",
- "os_pipe",
  "paste",
  "polling",
  "socket2",
@@ -1878,11 +1880,12 @@ dependencies = [
 
 [[package]]
 name = "compio-fs"
-version = "0.5.1"
+version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "36f645c7bd9c1e1ce5b0ca6aa9a77ec3908d2ed9200c6708a72bccd1c3f875c8"
+checksum = "dee2c5ba7c96f0caf3d62ed745278b26eebd4e9296817c4ef2ad6c359629f8ab"
 dependencies = [
  "cfg-if",
+ "cfg_aliases",
  "compio-buf",
  "compio-driver",
  "compio-io",
@@ -1895,9 +1898,9 @@ dependencies = [
 
 [[package]]
 name = "compio-io"
-version = "0.4.1"
+version = "0.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "db908087365769933042c157adf860e19bff5a8cdb846ec2b5dd03d0dacf7a35"
+checksum = "1c18b1d7d4c058e3e92e9265d59f74981fda2693809b1e45f8ed7717d892c8ac"
 dependencies = [
  "compio-buf",
  "futures-util",
@@ -1915,9 +1918,9 @@ dependencies = [
 
 [[package]]
 name = "compio-net"
-version = "0.5.1"
+version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b75d9bb79502ac1abb73df8a34e83e51efcb805038cf30c1c48827203a4c6b49"
+checksum = "0882a85c535c7b5d6ea3b9b37cc7421ec3f8ae8b83a09eb53f4295fb87b54995"
 dependencies = [
  "cfg-if",
  "compio-buf",
@@ -1926,16 +1929,54 @@ dependencies = [
  "compio-runtime",
  "either",
  "libc",
+ "once_cell",
  "socket2",
  "widestring",
  "windows-sys 0.52.0",
 ]
 
+[[package]]
+name = "compio-process"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9dc299e4c0a2cdc4455bb4df86c554845d1abe611a1922e4b12a8af2a0fadc35"
+dependencies = [
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
+ "futures-util",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-quic"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d8853537ade322b0d5dee3dca216465c463f480e530caeabc2b2df15b986068b"
+dependencies = [
+ "cfg_aliases",
+ "compio-buf",
+ "compio-io",
+ "compio-log",
+ "compio-net",
+ "compio-runtime",
+ "flume",
+ "futures-util",
+ "libc",
+ "quinn-proto",
+ "rustc-hash 2.1.0",
+ "rustls 0.23.20",
+ "thiserror 2.0.9",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "compio-runtime"
-version = "0.5.1"
+version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9b2d856e9017fdde73918cb1a2f15b6e47fe0aeb93d547201a457b12bb2da74a"
+checksum = "a95ef126945a166879ef37d494015be13a1e4e452419bc4e5c4c5799f441756a"
 dependencies = [
  "async-task",
  "cfg-if",
@@ -1946,19 +1987,17 @@ dependencies = [
  "futures-util",
  "libc",
  "once_cell",
- "os_pipe",
  "scoped-tls",
  "slab",
- "smallvec",
  "socket2",
  "windows-sys 0.52.0",
 ]
 
 [[package]]
 name = "compio-signal"
-version = "0.3.0"
+version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0cc8476edd2311b8d34cef15eddd0f81a3a9d2dc622dbefd154a39171fc6dba8"
+checksum = "fd30ba3a28cd73fa49a6e4f1c31c1ad4742fb33802662aadf9ed188ae8a8f0e4"
 dependencies = [
  "compio-buf",
  "compio-driver",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index e659729f8..4a6a3a0d9 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -340,7 +340,7 @@ hdfs-native = { version = "0.10", optional = true }
 # for services-surrealdb
 surrealdb = { version = "2", optional = true, features = ["protocol-http"] }
 # for services-compfs
-compio = { version = "0.12.0", optional = true, features = [
+compio = { version = "0.14.0", optional = true, features = [
   "runtime",
   "bytes",
   "polling",
diff --git a/core/src/services/compfs/backend.rs 
b/core/src/services/compfs/backend.rs
index 1179641b2..4b0be9935 100644
--- a/core/src/services/compfs/backend.rs
+++ b/core/src/services/compfs/backend.rs
@@ -172,7 +172,9 @@ impl Access for CompfsBackend {
             EntryMode::Unknown
         };
         let last_mod = meta.modified().map_err(new_std_io_error)?.into();
-        let ret = Metadata::new(mode).with_last_modified(last_mod);
+        let ret = Metadata::new(mode)
+            .with_last_modified(last_mod)
+            .with_content_length(meta.len());
 
         Ok(RpStat::new(ret))
     }
@@ -232,15 +234,20 @@ impl Access for CompfsBackend {
         let file = self
             .core
             .exec(move || async move {
-                compio::fs::OpenOptions::new()
+                let file = compio::fs::OpenOptions::new()
                     .create(true)
                     .write(true)
                     .truncate(!append)
                     .open(path)
-                    .await
+                    .await?;
+                let mut file = Cursor::new(file);
+                if append {
+                    let len = file.get_ref().metadata().await?.len();
+                    file.set_position(len);
+                }
+                Ok(file)
             })
-            .await
-            .map(Cursor::new)?;
+            .await?;
 
         let w = CompfsWriter::new(self.core.clone(), file);
         Ok((RpWrite::new(), w))
diff --git a/core/src/services/compfs/writer.rs 
b/core/src/services/compfs/writer.rs
index 8a209d224..e4573f157 100644
--- a/core/src/services/compfs/writer.rs
+++ b/core/src/services/compfs/writer.rs
@@ -29,12 +29,15 @@ use crate::*;
 #[derive(Debug)]
 pub struct CompfsWriter {
     core: Arc<CompfsCore>,
-    file: Cursor<File>,
+    file: Option<Cursor<File>>,
 }
 
 impl CompfsWriter {
     pub(super) fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
-        Self { core, file }
+        Self {
+            core,
+            file: Some(file),
+        }
     }
 }
 
@@ -45,34 +48,43 @@ impl oio::Write for CompfsWriter {
     ///
     /// The IoBuf::buf_len() only returns the length of the current buffer.
     async fn write(&mut self, bs: Buffer) -> Result<()> {
-        let mut file = self.file.clone();
+        let Some(mut file) = self.file.clone() else {
+            return Err(Error::new(ErrorKind::Unexpected, "file has closed"));
+        };
 
-        self.core
+        let pos = self
+            .core
             .exec(move || async move {
-                buf_try!(@try file.write_all(bs).await);
-                Ok(())
+                for b in bs {
+                    buf_try!(@try file.write_all(b).await);
+                }
+                Ok(file.position())
             })
             .await?;
+        self.file.as_mut().unwrap().set_position(pos);
 
         Ok(())
     }
 
     async fn close(&mut self) -> Result<Metadata> {
-        let f = self.file.clone();
-
-        self.core
-            .exec(move || async move { f.get_ref().sync_all().await })
-            .await?;
+        let Some(f) = self.file.take() else {
+            return Err(Error::new(ErrorKind::Unexpected, "file has closed"));
+        };
 
-        let f = self.file.clone();
         self.core
-            .exec(move || async move { f.into_inner().close().await })
+            .exec(move || async move {
+                f.get_ref().sync_all().await?;
+                f.into_inner().close().await
+            })
             .await?;
 
         Ok(Metadata::default())
     }
 
     async fn abort(&mut self) -> Result<()> {
-        Ok(())
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "cannot abort completion-based operations",
+        ))
     }
 }

Reply via email to