This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 7a963fb0e fix(services/compfs): behavior async write (#5803)
7a963fb0e is described below
commit 7a963fb0ea41e2fc214ceb3194218c7d7096eb1e
Author: 王宇逸 <[email protected]>
AuthorDate: Tue Mar 18 18:22:07 2025 +0800
fix(services/compfs): behavior async write (#5803)
---
core/Cargo.lock | 77 ++++++++++++++++++++++++++++---------
core/Cargo.toml | 2 +-
core/src/services/compfs/backend.rs | 17 +++++---
core/src/services/compfs/writer.rs | 40 ++++++++++++-------
4 files changed, 97 insertions(+), 39 deletions(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 151e8c439..beb76726e 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -1815,9 +1815,9 @@ dependencies = [
[[package]]
name = "compio"
-version = "0.12.0"
+version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "de9895d3b1b383334e6dd889618d555ecca48988cfd2be47c7ac8a98b0195c90"
+checksum = "6c52183b7eefcaa6441fe810885a34fcdec7378e2883673fc3d74ca6e9ff738b"
dependencies = [
"compio-buf",
"compio-dispatcher",
@@ -1826,6 +1826,8 @@ dependencies = [
"compio-io",
"compio-log",
"compio-net",
+ "compio-process",
+ "compio-quic",
"compio-runtime",
"compio-signal",
]
@@ -1843,9 +1845,9 @@ dependencies = [
[[package]]
name = "compio-dispatcher"
-version = "0.4.0"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c6f9231b6ee942b5e69eda275fc84d72247055aad197c65ddf3c63cd3cb01d60"
+checksum = "6ae8fab55190537c8634232f395302011ce39c18facbd4b85363df41114677ac"
dependencies = [
"compio-driver",
"compio-runtime",
@@ -1855,12 +1857,13 @@ dependencies = [
[[package]]
name = "compio-driver"
-version = "0.5.1"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a6be49fe37cd203d925e3850522a47f453b4cb98960846be5e4ebae42e26a64c"
+checksum = "c9493b0c88b90d386bb3fd9b5618260d96ba2b09cab5c4a5ba50ec9b77f0711b"
dependencies = [
"aligned-array",
"cfg-if",
+ "cfg_aliases",
"compio-buf",
"compio-log",
"crossbeam-channel",
@@ -1869,7 +1872,6 @@ dependencies = [
"io-uring 0.7.3",
"libc",
"once_cell",
- "os_pipe",
"paste",
"polling",
"socket2",
@@ -1878,11 +1880,12 @@ dependencies = [
[[package]]
name = "compio-fs"
-version = "0.5.1"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "36f645c7bd9c1e1ce5b0ca6aa9a77ec3908d2ed9200c6708a72bccd1c3f875c8"
+checksum = "dee2c5ba7c96f0caf3d62ed745278b26eebd4e9296817c4ef2ad6c359629f8ab"
dependencies = [
"cfg-if",
+ "cfg_aliases",
"compio-buf",
"compio-driver",
"compio-io",
@@ -1895,9 +1898,9 @@ dependencies = [
[[package]]
name = "compio-io"
-version = "0.4.1"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "db908087365769933042c157adf860e19bff5a8cdb846ec2b5dd03d0dacf7a35"
+checksum = "1c18b1d7d4c058e3e92e9265d59f74981fda2693809b1e45f8ed7717d892c8ac"
dependencies = [
"compio-buf",
"futures-util",
@@ -1915,9 +1918,9 @@ dependencies = [
[[package]]
name = "compio-net"
-version = "0.5.1"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b75d9bb79502ac1abb73df8a34e83e51efcb805038cf30c1c48827203a4c6b49"
+checksum = "0882a85c535c7b5d6ea3b9b37cc7421ec3f8ae8b83a09eb53f4295fb87b54995"
dependencies = [
"cfg-if",
"compio-buf",
@@ -1926,16 +1929,54 @@ dependencies = [
"compio-runtime",
"either",
"libc",
+ "once_cell",
"socket2",
"widestring",
"windows-sys 0.52.0",
]
+[[package]]
+name = "compio-process"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9dc299e4c0a2cdc4455bb4df86c554845d1abe611a1922e4b12a8af2a0fadc35"
+dependencies = [
+ "cfg-if",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
+ "futures-util",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-quic"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d8853537ade322b0d5dee3dca216465c463f480e530caeabc2b2df15b986068b"
+dependencies = [
+ "cfg_aliases",
+ "compio-buf",
+ "compio-io",
+ "compio-log",
+ "compio-net",
+ "compio-runtime",
+ "flume",
+ "futures-util",
+ "libc",
+ "quinn-proto",
+ "rustc-hash 2.1.0",
+ "rustls 0.23.20",
+ "thiserror 2.0.9",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "compio-runtime"
-version = "0.5.1"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9b2d856e9017fdde73918cb1a2f15b6e47fe0aeb93d547201a457b12bb2da74a"
+checksum = "a95ef126945a166879ef37d494015be13a1e4e452419bc4e5c4c5799f441756a"
dependencies = [
"async-task",
"cfg-if",
@@ -1946,19 +1987,17 @@ dependencies = [
"futures-util",
"libc",
"once_cell",
- "os_pipe",
"scoped-tls",
"slab",
- "smallvec",
"socket2",
"windows-sys 0.52.0",
]
[[package]]
name = "compio-signal"
-version = "0.3.0"
+version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0cc8476edd2311b8d34cef15eddd0f81a3a9d2dc622dbefd154a39171fc6dba8"
+checksum = "fd30ba3a28cd73fa49a6e4f1c31c1ad4742fb33802662aadf9ed188ae8a8f0e4"
dependencies = [
"compio-buf",
"compio-driver",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index e659729f8..4a6a3a0d9 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -340,7 +340,7 @@ hdfs-native = { version = "0.10", optional = true }
# for services-surrealdb
surrealdb = { version = "2", optional = true, features = ["protocol-http"] }
# for services-compfs
-compio = { version = "0.12.0", optional = true, features = [
+compio = { version = "0.14.0", optional = true, features = [
"runtime",
"bytes",
"polling",
diff --git a/core/src/services/compfs/backend.rs
b/core/src/services/compfs/backend.rs
index 1179641b2..4b0be9935 100644
--- a/core/src/services/compfs/backend.rs
+++ b/core/src/services/compfs/backend.rs
@@ -172,7 +172,9 @@ impl Access for CompfsBackend {
EntryMode::Unknown
};
let last_mod = meta.modified().map_err(new_std_io_error)?.into();
- let ret = Metadata::new(mode).with_last_modified(last_mod);
+ let ret = Metadata::new(mode)
+ .with_last_modified(last_mod)
+ .with_content_length(meta.len());
Ok(RpStat::new(ret))
}
@@ -232,15 +234,20 @@ impl Access for CompfsBackend {
let file = self
.core
.exec(move || async move {
- compio::fs::OpenOptions::new()
+ let file = compio::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(!append)
.open(path)
- .await
+ .await?;
+ let mut file = Cursor::new(file);
+ if append {
+ let len = file.get_ref().metadata().await?.len();
+ file.set_position(len);
+ }
+ Ok(file)
})
- .await
- .map(Cursor::new)?;
+ .await?;
let w = CompfsWriter::new(self.core.clone(), file);
Ok((RpWrite::new(), w))
diff --git a/core/src/services/compfs/writer.rs
b/core/src/services/compfs/writer.rs
index 8a209d224..e4573f157 100644
--- a/core/src/services/compfs/writer.rs
+++ b/core/src/services/compfs/writer.rs
@@ -29,12 +29,15 @@ use crate::*;
#[derive(Debug)]
pub struct CompfsWriter {
core: Arc<CompfsCore>,
- file: Cursor<File>,
+ file: Option<Cursor<File>>,
}
impl CompfsWriter {
pub(super) fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
- Self { core, file }
+ Self {
+ core,
+ file: Some(file),
+ }
}
}
@@ -45,34 +48,43 @@ impl oio::Write for CompfsWriter {
///
/// The IoBuf::buf_len() only returns the length of the current buffer.
async fn write(&mut self, bs: Buffer) -> Result<()> {
- let mut file = self.file.clone();
+ let Some(mut file) = self.file.clone() else {
+ return Err(Error::new(ErrorKind::Unexpected, "file has closed"));
+ };
- self.core
+ let pos = self
+ .core
.exec(move || async move {
- buf_try!(@try file.write_all(bs).await);
- Ok(())
+ for b in bs {
+ buf_try!(@try file.write_all(b).await);
+ }
+ Ok(file.position())
})
.await?;
+ self.file.as_mut().unwrap().set_position(pos);
Ok(())
}
async fn close(&mut self) -> Result<Metadata> {
- let f = self.file.clone();
-
- self.core
- .exec(move || async move { f.get_ref().sync_all().await })
- .await?;
+ let Some(f) = self.file.take() else {
+ return Err(Error::new(ErrorKind::Unexpected, "file has closed"));
+ };
- let f = self.file.clone();
self.core
- .exec(move || async move { f.into_inner().close().await })
+ .exec(move || async move {
+ f.get_ref().sync_all().await?;
+ f.into_inner().close().await
+ })
.await?;
Ok(Metadata::default())
}
async fn abort(&mut self) -> Result<()> {
- Ok(())
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "cannot abort completion-based operations",
+ ))
}
}