This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new f6112e5f99 chore: Make compfs able to test (#4878)
f6112e5f99 is described below
commit f6112e5f995520639b4c6e17b6fa936fb159f516
Author: Xuanwo <[email protected]>
AuthorDate: Thu Jul 11 12:10:00 2024 +0800
chore: Make compfs able to test (#4878)
* chore: Make compfs able to test
Signed-off-by: Xuanwo <[email protected]>
* Fix compfs reading
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Close file during close
Signed-off-by: Xuanwo <[email protected]>
* Fix typo
Signed-off-by: Xuanwo <[email protected]>
* Remove option
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/raw/tests/utils.rs | 7 +++++--
core/src/services/compfs/backend.rs | 29 ++++++++++++++++++++++++-----
core/src/services/compfs/reader.rs | 32 ++++++++++++++++++++++----------
core/src/services/compfs/writer.rs | 6 ++++++
core/src/types/operator/builder.rs | 2 ++
5 files changed, 59 insertions(+), 17 deletions(-)
diff --git a/core/src/raw/tests/utils.rs b/core/src/raw/tests/utils.rs
index 6a39332234..09db7177d9 100644
--- a/core/src/raw/tests/utils.rs
+++ b/core/src/raw/tests/utils.rs
@@ -79,8 +79,11 @@ pub fn init_test_service() -> Result<Option<Operator>> {
// Enable blocking layer if needed.
if !op.info().full_capability().blocking {
- let _guard = TEST_RUNTIME.enter();
- op = op.layer(layers::BlockingLayer::create().expect("blocking layer
must be created"));
+ // Don't enable blocking layer for compfs
+ if op.info().scheme() != Scheme::Compfs {
+ let _guard = TEST_RUNTIME.enter();
+ op = op.layer(layers::BlockingLayer::create().expect("blocking
layer must be created"));
+ }
}
Ok(Some(op))
diff --git a/core/src/services/compfs/backend.rs
b/core/src/services/compfs/backend.rs
index 9926e04f97..b69b55f66f 100644
--- a/core/src/services/compfs/backend.rs
+++ b/core/src/services/compfs/backend.rs
@@ -63,6 +63,19 @@ impl Builder for CompfsBuilder {
"root is not specified",
)),
}?;
+
+ // If root dir does not exist, we must create it.
+ if let Err(e) = std::fs::metadata(&root) {
+ if e.kind() == std::io::ErrorKind::NotFound {
+ std::fs::create_dir_all(&root).map_err(|e| {
+ Error::new(ErrorKind::Unexpected, "create root dir failed")
+ .with_operation("Builder::build")
+ .with_context("root", root.to_string_lossy())
+ .set_source(e)
+ })?;
+ }
+ }
+
let dispatcher = Dispatcher::new().map_err(|_| {
Error::new(
ErrorKind::Unexpected,
@@ -151,11 +164,17 @@ impl Access for CompfsBackend {
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
- let path = self.core.prepare_path(path);
-
- self.core
- .exec(move || async move { compio::fs::remove_file(path).await })
- .await?;
+ if path.ends_with('/') {
+ let path = self.core.prepare_path(path);
+ self.core
+ .exec(move || async move { compio::fs::remove_dir(path).await
})
+ .await?;
+ } else {
+ let path = self.core.prepare_path(path);
+ self.core
+ .exec(move || async move { compio::fs::remove_file(path).await
})
+ .await?;
+ }
Ok(RpDelete::default())
}
diff --git a/core/src/services/compfs/reader.rs
b/core/src/services/compfs/reader.rs
index 421efd8af1..334db05161 100644
--- a/core/src/services/compfs/reader.rs
+++ b/core/src/services/compfs/reader.rs
@@ -27,31 +27,43 @@ use crate::*;
pub struct CompfsReader {
core: Arc<CompfsCore>,
file: compio::fs::File,
- range: BytesRange,
+ offset: u64,
+ end: Option<u64>,
}
impl CompfsReader {
pub(super) fn new(core: Arc<CompfsCore>, file: compio::fs::File, range:
BytesRange) -> Self {
- Self { core, file, range }
+ Self {
+ core,
+ file,
+ offset: range.offset(),
+ end: range.size().map(|v| v + range.offset()),
+ }
}
}
impl oio::Read for CompfsReader {
async fn read(&mut self) -> Result<Buffer> {
- let mut bs = self.core.buf_pool.get();
+ let pos = self.offset;
+ if let Some(end) = self.end {
+ if end >= pos {
+ return Ok(Buffer::new());
+ }
+ }
- let pos = self.range.offset();
- let len = self.range.size().expect("range size is always Some");
- bs.reserve(len as _);
+ let mut bs = self.core.buf_pool.get();
+ // reserve 64KB buffer by default, we should allow user to configure
this or make it adaptive.
+ bs.reserve(64 * 1024);
let f = self.file.clone();
- let mut bs = self
+ let (n, mut bs) = self
.core
.exec(move || async move {
- let (_, bs) = buf_try!(@try f.read_at(bs, pos).await);
- Ok(bs)
+ let (n, bs) = buf_try!(@try f.read_at(bs, pos).await);
+ Ok((n, bs))
})
.await?;
- let frozen = bs.split().freeze();
+ let frozen = bs.split_to(n).freeze();
+ self.offset += frozen.len() as u64;
self.core.buf_pool.put(bs);
Ok(Buffer::from(frozen))
}
diff --git a/core/src/services/compfs/writer.rs
b/core/src/services/compfs/writer.rs
index d46c1b69b7..2e12ea2dc7 100644
--- a/core/src/services/compfs/writer.rs
+++ b/core/src/services/compfs/writer.rs
@@ -51,8 +51,14 @@ impl oio::Write for CompfsWriter {
async fn close(&mut self) -> Result<()> {
let f = self.file.clone();
+
self.core
.exec(move || async move { f.get_ref().sync_all().await })
+ .await?;
+
+ let f = self.file.clone();
+ self.core
+ .exec(move || async move { f.into_inner().close().await })
.await
}
diff --git a/core/src/types/operator/builder.rs
b/core/src/types/operator/builder.rs
index ab1d96bc9c..562cabf0fb 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -156,6 +156,8 @@ impl Operator {
Scheme::Atomicserver =>
Self::from_map::<services::Atomicserver>(map)?.finish(),
#[cfg(feature = "services-alluxio")]
Scheme::Alluxio =>
Self::from_map::<services::Alluxio>(map)?.finish(),
+ #[cfg(feature = "services-compfs")]
+ Scheme::Compfs =>
Self::from_map::<services::Compfs>(map)?.finish(),
#[cfg(feature = "services-upyun")]
Scheme::Upyun => Self::from_map::<services::Upyun>(map)?.finish(),
#[cfg(feature = "services-koofr")]