This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new f6112e5f99 chore: Make compfs able to test (#4878)
f6112e5f99 is described below

commit f6112e5f995520639b4c6e17b6fa936fb159f516
Author: Xuanwo <[email protected]>
AuthorDate: Thu Jul 11 12:10:00 2024 +0800

    chore: Make compfs able to test (#4878)
    
    * chore: Make compfs able to test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix compfs reading
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Close file during close
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix typo
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Remove option
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/tests/utils.rs         |  7 +++++--
 core/src/services/compfs/backend.rs | 29 ++++++++++++++++++++++++-----
 core/src/services/compfs/reader.rs  | 32 ++++++++++++++++++++++----------
 core/src/services/compfs/writer.rs  |  6 ++++++
 core/src/types/operator/builder.rs  |  2 ++
 5 files changed, 59 insertions(+), 17 deletions(-)

diff --git a/core/src/raw/tests/utils.rs b/core/src/raw/tests/utils.rs
index 6a39332234..09db7177d9 100644
--- a/core/src/raw/tests/utils.rs
+++ b/core/src/raw/tests/utils.rs
@@ -79,8 +79,11 @@ pub fn init_test_service() -> Result<Option<Operator>> {
 
     // Enable blocking layer if needed.
     if !op.info().full_capability().blocking {
-        let _guard = TEST_RUNTIME.enter();
-        op = op.layer(layers::BlockingLayer::create().expect("blocking layer 
must be created"));
+        // Don't enable blocking layer for compfs
+        if op.info().scheme() != Scheme::Compfs {
+            let _guard = TEST_RUNTIME.enter();
+            op = op.layer(layers::BlockingLayer::create().expect("blocking 
layer must be created"));
+        }
     }
 
     Ok(Some(op))
diff --git a/core/src/services/compfs/backend.rs 
b/core/src/services/compfs/backend.rs
index 9926e04f97..b69b55f66f 100644
--- a/core/src/services/compfs/backend.rs
+++ b/core/src/services/compfs/backend.rs
@@ -63,6 +63,19 @@ impl Builder for CompfsBuilder {
                 "root is not specified",
             )),
         }?;
+
+        // If root dir does not exist, we must create it.
+        if let Err(e) = std::fs::metadata(&root) {
+            if e.kind() == std::io::ErrorKind::NotFound {
+                std::fs::create_dir_all(&root).map_err(|e| {
+                    Error::new(ErrorKind::Unexpected, "create root dir failed")
+                        .with_operation("Builder::build")
+                        .with_context("root", root.to_string_lossy())
+                        .set_source(e)
+                })?;
+            }
+        }
+
         let dispatcher = Dispatcher::new().map_err(|_| {
             Error::new(
                 ErrorKind::Unexpected,
@@ -151,11 +164,17 @@ impl Access for CompfsBackend {
     }
 
     async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
-        let path = self.core.prepare_path(path);
-
-        self.core
-            .exec(move || async move { compio::fs::remove_file(path).await })
-            .await?;
+        if path.ends_with('/') {
+            let path = self.core.prepare_path(path);
+            self.core
+                .exec(move || async move { compio::fs::remove_dir(path).await 
})
+                .await?;
+        } else {
+            let path = self.core.prepare_path(path);
+            self.core
+                .exec(move || async move { compio::fs::remove_file(path).await 
})
+                .await?;
+        }
 
         Ok(RpDelete::default())
     }
diff --git a/core/src/services/compfs/reader.rs 
b/core/src/services/compfs/reader.rs
index 421efd8af1..334db05161 100644
--- a/core/src/services/compfs/reader.rs
+++ b/core/src/services/compfs/reader.rs
@@ -27,31 +27,43 @@ use crate::*;
 pub struct CompfsReader {
     core: Arc<CompfsCore>,
     file: compio::fs::File,
-    range: BytesRange,
+    offset: u64,
+    end: Option<u64>,
 }
 
 impl CompfsReader {
     pub(super) fn new(core: Arc<CompfsCore>, file: compio::fs::File, range: 
BytesRange) -> Self {
-        Self { core, file, range }
+        Self {
+            core,
+            file,
+            offset: range.offset(),
+            end: range.size().map(|v| v + range.offset()),
+        }
     }
 }
 
 impl oio::Read for CompfsReader {
     async fn read(&mut self) -> Result<Buffer> {
-        let mut bs = self.core.buf_pool.get();
+        let pos = self.offset;
+        if let Some(end) = self.end {
+            if end >= pos {
+                return Ok(Buffer::new());
+            }
+        }
 
-        let pos = self.range.offset();
-        let len = self.range.size().expect("range size is always Some");
-        bs.reserve(len as _);
+        let mut bs = self.core.buf_pool.get();
+        // reserve 64KB buffer by default, we should allow user to configure 
this or make it adaptive.
+        bs.reserve(64 * 1024);
         let f = self.file.clone();
-        let mut bs = self
+        let (n, mut bs) = self
             .core
             .exec(move || async move {
-                let (_, bs) = buf_try!(@try f.read_at(bs, pos).await);
-                Ok(bs)
+                let (n, bs) = buf_try!(@try f.read_at(bs, pos).await);
+                Ok((n, bs))
             })
             .await?;
-        let frozen = bs.split().freeze();
+        let frozen = bs.split_to(n).freeze();
+        self.offset += frozen.len() as u64;
         self.core.buf_pool.put(bs);
         Ok(Buffer::from(frozen))
     }
diff --git a/core/src/services/compfs/writer.rs 
b/core/src/services/compfs/writer.rs
index d46c1b69b7..2e12ea2dc7 100644
--- a/core/src/services/compfs/writer.rs
+++ b/core/src/services/compfs/writer.rs
@@ -51,8 +51,14 @@ impl oio::Write for CompfsWriter {
 
     async fn close(&mut self) -> Result<()> {
         let f = self.file.clone();
+
         self.core
             .exec(move || async move { f.get_ref().sync_all().await })
+            .await?;
+
+        let f = self.file.clone();
+        self.core
+            .exec(move || async move { f.into_inner().close().await })
             .await
     }
 
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index ab1d96bc9c..562cabf0fb 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -156,6 +156,8 @@ impl Operator {
             Scheme::Atomicserver => 
Self::from_map::<services::Atomicserver>(map)?.finish(),
             #[cfg(feature = "services-alluxio")]
             Scheme::Alluxio => 
Self::from_map::<services::Alluxio>(map)?.finish(),
+            #[cfg(feature = "services-compfs")]
+            Scheme::Compfs => 
Self::from_map::<services::Compfs>(map)?.finish(),
             #[cfg(feature = "services-upyun")]
             Scheme::Upyun => Self::from_map::<services::Upyun>(map)?.finish(),
             #[cfg(feature = "services-koofr")]

Reply via email to