This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 8027430dba feat(services/compfs): implement auxiliary functions (#4778)
8027430dba is described below

commit 8027430dba32cc57311e30bd925853a1c94f8ae2
Author: Pop <[email protected]>
AuthorDate: Thu Jun 20 19:24:54 2024 +0900

    feat(services/compfs): implement auxiliary functions (#4778)
    
    * fix reader
    
    * builder
    
    * implement aux fn's
---
 core/src/services/compfs/backend.rs | 106 ++++++++++++++++++++++++++++++++++--
 core/src/services/compfs/core.rs    |   4 ++
 core/src/services/compfs/lister.rs  |   2 +-
 core/src/services/compfs/reader.rs  |   5 +-
 core/src/services/compfs/writer.rs  |   2 +-
 5 files changed, 109 insertions(+), 10 deletions(-)

diff --git a/core/src/services/compfs/backend.rs 
b/core/src/services/compfs/backend.rs
index 65bd189b68..9926e04f97 100644
--- a/core/src/services/compfs/backend.rs
+++ b/core/src/services/compfs/backend.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use compio::{dispatcher::Dispatcher, fs::OpenOptions};
+
 use super::{core::CompfsCore, lister::CompfsLister, reader::CompfsReader, 
writer::CompfsWriter};
+
 use crate::raw::*;
 use crate::*;
 
@@ -42,7 +45,7 @@ impl CompfsBuilder {
 
 impl Builder for CompfsBuilder {
     const SCHEME: Scheme = Scheme::Compfs;
-    type Accessor = ();
+    type Accessor = CompfsBackend;
 
     fn from_map(map: HashMap<String, String>) -> Self {
         let mut builder = CompfsBuilder::default();
@@ -53,7 +56,27 @@ impl Builder for CompfsBuilder {
     }
 
     fn build(&mut self) -> Result<Self::Accessor> {
-        todo!()
+        let root = match self.root.take() {
+            Some(root) => Ok(root),
+            None => Err(Error::new(
+                ErrorKind::ConfigInvalid,
+                "root is not specified",
+            )),
+        }?;
+        let dispatcher = Dispatcher::new().map_err(|_| {
+            Error::new(
+                ErrorKind::Unexpected,
+                "failed to initiate compio dispatcher",
+            )
+        })?;
+        let core = CompfsCore {
+            root,
+            dispatcher,
+            buf_pool: oio::PooledBuf::new(16),
+        };
+        Ok(CompfsBackend {
+            core: Arc::new(core),
+        })
     }
 }
 
@@ -89,7 +112,6 @@ impl Access for CompfsBackend {
 
                 copy: true,
                 rename: true,
-                blocking: true,
 
                 ..Default::default()
             });
@@ -97,8 +119,79 @@ impl Access for CompfsBackend {
         am
     }
 
+    async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
+        let path = self.core.prepare_path(path);
+
+        self.core
+            .exec(move || async move { compio::fs::create_dir_all(path).await 
})
+            .await?;
+
+        Ok(RpCreateDir::default())
+    }
+
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let path = self.core.prepare_path(path);
+
+        let meta = self
+            .core
+            .exec(move || async move { compio::fs::metadata(path).await })
+            .await?;
+        let ty = meta.file_type();
+        let mode = if ty.is_dir() {
+            EntryMode::DIR
+        } else if ty.is_file() {
+            EntryMode::FILE
+        } else {
+            EntryMode::Unknown
+        };
+        let last_mod = meta.modified().map_err(new_std_io_error)?.into();
+        let ret = Metadata::new(mode).with_last_modified(last_mod);
+
+        Ok(RpStat::new(ret))
+    }
+
+    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+        let path = self.core.prepare_path(path);
+
+        self.core
+            .exec(move || async move { compio::fs::remove_file(path).await })
+            .await?;
+
+        Ok(RpDelete::default())
+    }
+
+    async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
+        let from = self.core.prepare_path(from);
+        let to = self.core.prepare_path(to);
+
+        self.core
+            .exec(move || async move {
+                let from = OpenOptions::new().read(true).open(from).await?;
+                let to = 
OpenOptions::new().write(true).create(true).open(to).await?;
+
+                let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
+                compio::io::copy(&mut from, &mut to).await?;
+
+                Ok(())
+            })
+            .await?;
+
+        Ok(RpCopy::default())
+    }
+
+    async fn rename(&self, from: &str, to: &str, _: OpRename) -> 
Result<RpRename> {
+        let from = self.core.prepare_path(from);
+        let to = self.core.prepare_path(to);
+
+        self.core
+            .exec(move || async move { compio::fs::rename(from, to).await })
+            .await?;
+
+        Ok(RpRename::default())
+    }
+
     async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let path = self.core.root.join(path.trim_end_matches('/'));
+        let path = self.core.prepare_path(path);
 
         let file = self
             .core
@@ -110,7 +203,7 @@ impl Access for CompfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let path = self.core.root.join(path.trim_end_matches('/'));
+        let path = self.core.prepare_path(path);
         let append = args.append();
         let file = self
             .core
@@ -130,7 +223,8 @@ impl Access for CompfsBackend {
     }
 
     async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let path = self.core.root.join(path.trim_end_matches('/'));
+        let path = self.core.prepare_path(path);
+
         let read_dir = match self
             .core
             .exec_blocking(move || std::fs::read_dir(path))
diff --git a/core/src/services/compfs/core.rs b/core/src/services/compfs/core.rs
index 60c68106f1..e600edbe7d 100644
--- a/core/src/services/compfs/core.rs
+++ b/core/src/services/compfs/core.rs
@@ -44,6 +44,10 @@ pub(super) struct CompfsCore {
 }
 
 impl CompfsCore {
+    pub fn prepare_path(&self, path: &str) -> PathBuf {
+        self.root.join(path.trim_end_matches('/'))
+    }
+
     pub async fn exec<Fn, Fut, R>(&self, f: Fn) -> crate::Result<R>
     where
         Fn: FnOnce() -> Fut + Send + 'static,
diff --git a/core/src/services/compfs/lister.rs 
b/core/src/services/compfs/lister.rs
index 7a380d48b7..ac12fc7820 100644
--- a/core/src/services/compfs/lister.rs
+++ b/core/src/services/compfs/lister.rs
@@ -28,7 +28,7 @@ pub struct CompfsLister {
 }
 
 impl CompfsLister {
-    pub fn new(core: Arc<CompfsCore>, read_dir: ReadDir) -> Self {
+    pub(super) fn new(core: Arc<CompfsCore>, read_dir: ReadDir) -> Self {
         Self {
             core,
             read_dir: Some(read_dir),
diff --git a/core/src/services/compfs/reader.rs 
b/core/src/services/compfs/reader.rs
index 758c110894..421efd8af1 100644
--- a/core/src/services/compfs/reader.rs
+++ b/core/src/services/compfs/reader.rs
@@ -31,7 +31,7 @@ pub struct CompfsReader {
 }
 
 impl CompfsReader {
-    pub fn new(core: Arc<CompfsCore>, file: compio::fs::File, range: 
BytesRange) -> Self {
+    pub(super) fn new(core: Arc<CompfsCore>, file: compio::fs::File, range: 
BytesRange) -> Self {
         Self { core, file, range }
     }
 }
@@ -40,13 +40,14 @@ impl oio::Read for CompfsReader {
     async fn read(&mut self) -> Result<Buffer> {
         let mut bs = self.core.buf_pool.get();
 
+        let pos = self.range.offset();
         let len = self.range.size().expect("range size is always Some");
         bs.reserve(len as _);
         let f = self.file.clone();
         let mut bs = self
             .core
             .exec(move || async move {
-                let (_, bs) = buf_try!(@try f.read_at(bs, len).await);
+                let (_, bs) = buf_try!(@try f.read_at(bs, pos).await);
                 Ok(bs)
             })
             .await?;
diff --git a/core/src/services/compfs/writer.rs 
b/core/src/services/compfs/writer.rs
index 95a9751910..d46c1b69b7 100644
--- a/core/src/services/compfs/writer.rs
+++ b/core/src/services/compfs/writer.rs
@@ -30,7 +30,7 @@ pub struct CompfsWriter {
 }
 
 impl CompfsWriter {
-    pub fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
+    pub(super) fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
         Self { core, file }
     }
 }

Reply via email to