This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new acb8ef6128 feat(bin/ofs): implement fuse for linux (#4179)
acb8ef6128 is described below

commit acb8ef61283fc234dca4a474a15649e1d462e02e
Author: Ho 229 <[email protected]>
AuthorDate: Thu Feb 22 20:49:43 2024 +0800

    feat(bin/ofs): implement fuse for linux (#4179)
    
    * refactor: project structure
    
    * feat: impl fuse for linux [wip]
    
    * feat: impl fuse for linux
    
    * refactor: use Operator::lister in readdir
    
    * refactor: remove frontend folder
    
    * chore: cleanup code
    
    * chore: make clippy happy
    
    * Update Cargo.lock
    
    * fix: dependencies
---
 bin/ofs/Cargo.lock     |  25 +-
 bin/ofs/Cargo.toml     |  14 +-
 bin/ofs/src/bin/ofs.rs |  62 +----
 bin/ofs/src/config.rs  |  33 +++
 bin/ofs/src/fuse.rs    | 670 +++++++++++++++++++++++++++++++++++++++++++++++++
 bin/ofs/src/lib.rs     | 246 +++++-------------
 6 files changed, 789 insertions(+), 261 deletions(-)

diff --git a/bin/ofs/Cargo.lock b/bin/ofs/Cargo.lock
index 6a3952492a..8ca5394fa0 100644
--- a/bin/ofs/Cargo.lock
+++ b/bin/ofs/Cargo.lock
@@ -57,9 +57,9 @@ dependencies = [
 
 [[package]]
 name = "anstyle"
-version = "1.0.5"
+version = "1.0.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220"
+checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc"
 
 [[package]]
 name = "anstyle-parse"
@@ -216,9 +216,9 @@ checksum = 
"baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
 [[package]]
 name = "chrono"
-version = "0.4.33"
+version = "0.4.34"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
+checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b"
 dependencies = [
  "android-tzdata",
  "iana-time-zone",
@@ -419,9 +419,9 @@ dependencies = [
 
 [[package]]
 name = "env_logger"
-version = "0.11.1"
+version = "0.11.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "05e7cf40684ae96ade6232ed84582f40ce0a66efcd43a5117aef610534f8e0b8"
+checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d"
 dependencies = [
  "anstream",
  "anstyle",
@@ -1026,7 +1026,8 @@ name = "ofs"
 version = "0.0.1+core.0.45.0"
 dependencies = [
  "anyhow",
- "async-trait",
+ "bytes",
+ "chrono",
  "clap",
  "env_logger",
  "fuse3",
@@ -1035,6 +1036,7 @@ dependencies = [
  "log",
  "nix 0.27.1",
  "opendal",
+ "sharded-slab",
  "tokio",
  "url",
 ]
@@ -1559,6 +1561,15 @@ dependencies = [
  "digest",
 ]
 
+[[package]]
+name = "sharded-slab"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
+dependencies = [
+ "lazy_static",
+]
+
 [[package]]
 name = "signature"
 version = "2.2.0"
diff --git a/bin/ofs/Cargo.toml b/bin/ofs/Cargo.toml
index 187e81d4f8..f6a64bc5e0 100644
--- a/bin/ofs/Cargo.toml
+++ b/bin/ofs/Cargo.toml
@@ -31,14 +31,10 @@ rust-version = "1.67"
 
 [dependencies]
 anyhow = "1"
-async-trait = "0.1.75"
 clap = { version = "4.5.1", features = ["derive", "env"] }
-env_logger = "0.11"
-fuse3 = { "version" = "0.6.1", "features" = ["tokio-runtime", "unprivileged"] }
+env_logger = "0.11.2"
 futures-util = "0.3.30"
-libc = "0.2.151"
 log = "0.4.20"
-nix = { version = "0.27.1", features = ["user"] }
 opendal = { version = "0.45.0", path = "../../core" }
 tokio = { version = "1.34", features = [
   "fs",
@@ -47,3 +43,11 @@ tokio = { version = "1.34", features = [
   "io-std",
 ] }
 url = "2.5.0"
+chrono = "0.4.34"
+sharded-slab = "0.1.7"
+bytes = "1.5.0"
+
+[target.'cfg(target_os = "linux")'.dependencies]
+libc = "0.2.151"
+fuse3 = { "version" = "0.6.1", "features" = ["tokio-runtime", "unprivileged"] }
+nix = { version = "0.27.1", features = ["user"] }
diff --git a/bin/ofs/src/bin/ofs.rs b/bin/ofs/src/bin/ofs.rs
index 08399b56ef..89a6199287 100644
--- a/bin/ofs/src/bin/ofs.rs
+++ b/bin/ofs/src/bin/ofs.rs
@@ -15,69 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
-use std::str::FromStr;
-
-use anyhow::anyhow;
-use anyhow::Context;
 use anyhow::Result;
 use clap::Parser;
-use fuse3::path::Session;
-use fuse3::MountOptions;
-use ofs::Ofs;
-use opendal::Operator;
-use opendal::Scheme;
-use url::Url;
 
 #[tokio::main]
 async fn main() -> Result<()> {
-    env_logger::init();
-    fuse().await
-}
-
-#[derive(Parser, Debug)]
-#[command(version, about)]
-struct Config {
-    /// fuse mount path
-    #[arg(env = "OFS_MOUNT_PATH", index = 1)]
-    mount_path: String,
-
-    /// location of opendal service
-    /// format: <scheme>://?<key>=<value>&<key>=<value>
-    /// example: fs://root=/tmp
-    #[arg(env = "OFS_BACKEND", index = 2)]
-    backend: String,
-}
-
-async fn fuse() -> Result<()> {
-    let cfg = Config::try_parse().context("parse command line arguments")?;
-
-    let location = Url::parse(&cfg.backend)?;
-    if location.has_host() {
-        Err(anyhow!("Host part in a location is not supported."))?;
-    }
+    let cfg = ofs::Config::parse();
 
-    let scheme_str = location.scheme();
-
-    let op_args = location
-        .query_pairs()
-        .into_owned()
-        .collect::<HashMap<String, String>>();
-
-    let scheme = Scheme::from_str(scheme_str).context("unsupported scheme")?;
-    let op = Operator::via_map(scheme, op_args)?;
-
-    let mut mount_option = MountOptions::default();
-    mount_option.uid(nix::unistd::getuid().into());
-    mount_option.gid(nix::unistd::getgid().into());
-
-    let ofs = Ofs { op };
-
-    let mounthandle = Session::new(mount_option)
-        .mount_with_unprivileged(ofs, cfg.mount_path)
-        .await?;
-
-    mounthandle.await?;
-
-    Ok(())
+    env_logger::init();
+    ofs::new_app(cfg).await
 }
diff --git a/bin/ofs/src/config.rs b/bin/ofs/src/config.rs
new file mode 100644
index 0000000000..6afa729a79
--- /dev/null
+++ b/bin/ofs/src/config.rs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use clap::Parser;
+use url::Url;
+
+#[derive(Parser, Debug)]
+#[command(version, about)]
+pub struct Config {
+    /// fuse mount path
+    #[arg(env = "OFS_MOUNT_PATH", index = 1)]
+    pub(crate) mount_path: String,
+
+    /// location of opendal service
+    /// format: <scheme>://?<key>=<value>&<key>=<value>
+    /// example: fs://?root=/tmp
+    #[arg(env = "OFS_BACKEND", index = 2)]
+    pub(crate) backend: Url,
+}
diff --git a/bin/ofs/src/fuse.rs b/bin/ofs/src/fuse.rs
new file mode 100644
index 0000000000..214187e861
--- /dev/null
+++ b/bin/ofs/src/fuse.rs
@@ -0,0 +1,670 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ffi::OsStr;
+use std::ffi::OsString;
+use std::ops::Deref;
+use std::path::PathBuf;
+use std::time::Duration;
+use std::time::SystemTime;
+
+use bytes::Bytes;
+use fuse3::async_trait;
+use fuse3::path::prelude::*;
+use fuse3::Errno;
+use fuse3::Result;
+use futures_util::stream;
+use futures_util::stream::BoxStream;
+use futures_util::StreamExt;
+
+use opendal::Entry;
+use opendal::EntryMode;
+use opendal::ErrorKind;
+use opendal::Metadata;
+use opendal::Operator;
+use sharded_slab::Slab;
+
+const TTL: Duration = Duration::from_secs(1); // 1 second
+
+#[derive(Debug, Clone)]
+struct OpenedFile {
+    path: OsString,
+    is_read: bool,
+    is_write: bool,
+    is_append: bool,
+}
+
+pub(super) struct Ofs {
+    op: Operator,
+    gid: u32,
+    uid: u32,
+    opened_files: Slab<OpenedFile>,
+}
+
+impl Ofs {
+    pub fn new(op: Operator, uid: u32, gid: u32) -> Self {
+        Self {
+            op,
+            uid,
+            gid,
+            opened_files: Slab::new(),
+        }
+    }
+
+    fn check_flags(&self, flags: u32) -> Result<(bool, bool)> {
+        let mode = flags & libc::O_ACCMODE as u32;
+        let is_read = mode == libc::O_RDONLY as u32 || mode == libc::O_RDWR as 
u32;
+        let is_write = mode == libc::O_WRONLY as u32 || mode == libc::O_RDWR 
as u32;
+        if !is_read && !is_write {
+            Err(Errno::from(libc::EINVAL))?;
+        }
+
+        let capability = self.op.info().full_capability();
+        if is_read && !capability.read {
+            Err(Errno::from(libc::EACCES))?;
+        }
+        if is_write && !capability.write {
+            Err(Errno::from(libc::EACCES))?;
+        }
+
+        log::trace!("check_flags: is_read={}, is_write={}", is_read, is_write);
+        Ok((is_read, is_write))
+    }
+
+    // Get opened file and check given path
+    fn get_opened_file(&self, key: usize, path: Option<&OsStr>) -> 
Result<OpenedFile> {
+        let file = self
+            .opened_files
+            .get(key)
+            .as_ref()
+            .ok_or(Errno::from(libc::ENOENT))?
+            .deref()
+            .clone();
+        if matches!(path, Some(path) if path != file.path) {
+            log::trace!(
+                "get_opened_file: path not match: path={:?}, file={:?}",
+                path,
+                file.path
+            );
+            Err(Errno::from(libc::EBADF))?;
+        }
+
+        Ok(file)
+    }
+}
+
+#[async_trait]
+impl PathFilesystem for Ofs {
+    type DirEntryStream = BoxStream<'static, Result<DirectoryEntry>>;
+    type DirEntryPlusStream = BoxStream<'static, Result<DirectoryEntryPlus>>;
+
+    // Init a fuse filesystem
+    async fn init(&self, _req: Request) -> Result<()> {
+        Ok(())
+    }
+
+    // Callback when fs is being destroyed
+    async fn destroy(&self, _req: Request) {}
+
+    async fn lookup(&self, _req: Request, parent: &OsStr, name: &OsStr) -> 
Result<ReplyEntry> {
+        log::debug!("lookup(parent={:?}, name={:?})", parent, name);
+
+        let path = PathBuf::from(parent).join(name);
+        let metadata = self
+            .op
+            .stat(&path.to_string_lossy())
+            .await
+            .map_err(opendal_error2errno)?;
+
+        let now = SystemTime::now();
+        let attr = metadata2file_attr(&metadata, now, self.uid, self.gid);
+
+        Ok(ReplyEntry { ttl: TTL, attr })
+    }
+
+    async fn getattr(
+        &self,
+        _req: Request,
+        path: Option<&OsStr>,
+        fh: Option<u64>,
+        flags: u32,
+    ) -> Result<ReplyAttr> {
+        log::debug!("getattr(path={:?}, fh={:?}, flags={:?})", path, fh, 
flags);
+
+        let key = fh.unwrap_or_default() - 1;
+        let fh_path = self
+            .opened_files
+            .get(key as usize)
+            .as_ref()
+            .map(|f| &f.path)
+            .cloned();
+
+        let file_path = match (path.map(Into::into), fh_path) {
+            (Some(a), Some(b)) => {
+                if a != b {
+                    Err(Errno::from(libc::EBADF))?;
+                }
+                Some(a)
+            }
+            (a, b) => a.or(b),
+        };
+
+        let metadata = self
+            .op
+            .stat(&file_path.unwrap_or_default().to_string_lossy())
+            .await
+            .map_err(opendal_error2errno)?;
+
+        let now = SystemTime::now();
+        let attr = metadata2file_attr(&metadata, now, self.uid, self.gid);
+
+        Ok(ReplyAttr { ttl: TTL, attr })
+    }
+
+    async fn setattr(
+        &self,
+        _req: Request,
+        path: Option<&OsStr>,
+        fh: Option<u64>,
+        set_attr: SetAttr,
+    ) -> Result<ReplyAttr> {
+        log::debug!(
+            "setattr(path={:?}, fh={:?}, set_attr={:?})",
+            path,
+            fh,
+            set_attr
+        );
+        Err(libc::EOPNOTSUPP.into())
+    }
+
+    async fn symlink(
+        &self,
+        _req: Request,
+        parent: &OsStr,
+        name: &OsStr,
+        link_path: &OsStr,
+    ) -> Result<ReplyEntry> {
+        log::debug!(
+            "symlink(parent={:?}, name={:?}, link_path={:?})",
+            parent,
+            name,
+            link_path
+        );
+        Err(libc::EOPNOTSUPP.into())
+    }
+
+    async fn mknod(
+        &self,
+        _req: Request,
+        parent: &OsStr,
+        name: &OsStr,
+        mode: u32,
+        _rdev: u32,
+    ) -> Result<ReplyEntry> {
+        log::debug!(
+            "mknod(parent={:?}, name={:?}, mode=0o{:o})",
+            parent,
+            name,
+            mode
+        );
+        Err(libc::EOPNOTSUPP.into())
+    }
+
+    async fn mkdir(
+        &self,
+        _req: Request,
+        parent: &OsStr,
+        name: &OsStr,
+        mode: u32,
+        _umask: u32,
+    ) -> Result<ReplyEntry> {
+        log::debug!(
+            "mkdir(parent={:?}, name={:?}, mode=0o{:o})",
+            parent,
+            name,
+            mode
+        );
+
+        let mut path = PathBuf::from(parent).join(name);
+        path.push(""); // ref 
https://users.rust-lang.org/t/trailing-in-paths/43166
+        self.op
+            .create_dir(&path.to_string_lossy())
+            .await
+            .map_err(opendal_error2errno)?;
+
+        let metadata = Metadata::new(EntryMode::DIR);
+        let now = SystemTime::now();
+        let attr = metadata2file_attr(&metadata, now, self.uid, self.gid);
+
+        Ok(ReplyEntry { ttl: TTL, attr })
+    }
+
+    async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> 
Result<()> {
+        log::debug!("unlink(parent={:?}, name={:?})", parent, name);
+
+        let path = PathBuf::from(parent).join(name);
+        self.op
+            .delete(&path.to_string_lossy())
+            .await
+            .map_err(opendal_error2errno)?;
+
+        Ok(())
+    }
+
+    async fn rmdir(&self, _req: Request, parent: &OsStr, name: &OsStr) -> 
Result<()> {
+        log::debug!("rmdir(parent={:?}, name={:?})", parent, name);
+
+        let path = PathBuf::from(parent).join(name);
+        self.op
+            .delete(&path.to_string_lossy())
+            .await
+            .map_err(opendal_error2errno)?;
+
+        Ok(())
+    }
+
+    async fn rename(
+        &self,
+        _req: Request,
+        origin_parent: &OsStr,
+        origin_name: &OsStr,
+        parent: &OsStr,
+        name: &OsStr,
+    ) -> Result<()> {
+        log::debug!(
+            "rename(p={:?}, name={:?}, newp={:?}, newname={:?})",
+            origin_parent,
+            origin_name,
+            parent,
+            name
+        );
+
+        let origin_path = PathBuf::from(origin_parent).join(origin_name);
+        let path = PathBuf::from(parent).join(name);
+
+        self.op
+            .rename(&origin_path.to_string_lossy(), &path.to_string_lossy())
+            .await
+            .map_err(opendal_error2errno)?;
+
+        Ok(())
+    }
+
+    async fn link(
+        &self,
+        _req: Request,
+        path: &OsStr,
+        new_parent: &OsStr,
+        new_name: &OsStr,
+    ) -> Result<ReplyEntry> {
+        log::debug!(
+            "link(path={:?}, new_parent={:?}, new_name={:?})",
+            path,
+            new_parent,
+            new_name
+        );
+        Err(libc::EOPNOTSUPP.into())
+    }
+
+    async fn create(
+        &self,
+        _req: Request,
+        parent: &OsStr,
+        name: &OsStr,
+        mode: u32,
+        flags: u32,
+    ) -> Result<ReplyCreated> {
+        log::debug!(
+            "create(parent={:?}, name={:?}, mode=0o{:o}, flags=0x{:x})",
+            parent,
+            name,
+            mode,
+            flags
+        );
+
+        let (is_read, is_write) = self.check_flags(flags)?;
+
+        let path = PathBuf::from(parent).join(name);
+        self.op
+            .write(&path.to_string_lossy(), Bytes::new())
+            .await
+            .map_err(opendal_error2errno)?;
+
+        let metadata = Metadata::new(EntryMode::FILE);
+        let attr = metadata2file_attr(&metadata, SystemTime::now(), self.uid, 
self.gid);
+
+        let fh = self
+            .opened_files
+            .insert(OpenedFile {
+                path: path.into(),
+                is_read,
+                is_write,
+                is_append: flags & libc::O_APPEND as u32 != 0,
+            })
+            .ok_or(Errno::from(libc::EBUSY))? as u64
+            + 1; // ensure fh > 0
+
+        Ok(ReplyCreated {
+            ttl: TTL,
+            attr,
+            generation: 0,
+            fh,
+            flags,
+        })
+    }
+
+    async fn release(
+        &self,
+        _req: Request,
+        path: Option<&OsStr>,
+        fh: u64,
+        flags: u32,
+        lock_owner: u64,
+        flush: bool,
+    ) -> Result<()> {
+        log::debug!(
+            "release(path={:?}, fh={}, flags=0x{:x}, lock_owner={}, flush={})",
+            path,
+            fh,
+            flags,
+            lock_owner,
+            flush
+        );
+
+        let key = fh as usize - 1;
+        let file = self
+            .opened_files
+            .take(key)
+            .ok_or(Errno::from(libc::EBADF))?;
+        if matches!(path, Some(ref p) if p != &file.path) {
+            Err(Errno::from(libc::EBADF))?;
+        }
+
+        Ok(())
+    }
+
+    async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> 
Result<ReplyOpen> {
+        log::debug!("open(path={:?}, flags=0x{:x})", path, flags);
+
+        let (is_read, is_write) = self.check_flags(flags)?;
+
+        let fh = self
+            .opened_files
+            .insert(OpenedFile {
+                path: path.into(),
+                is_read,
+                is_write,
+                is_append: flags & libc::O_APPEND as u32 != 0,
+            })
+            .ok_or(Errno::from(libc::EBUSY))? as u64
+            + 1; // ensure fh > 0
+
+        Ok(ReplyOpen { fh, flags })
+    }
+
+    async fn read(
+        &self,
+        _req: Request,
+        path: Option<&OsStr>,
+        fh: u64,
+        offset: u64,
+        size: u32,
+    ) -> Result<ReplyData> {
+        log::debug!(
+            "read(path={:?}, fh={}, offset={}, size={})",
+            path,
+            fh,
+            offset,
+            size
+        );
+
+        if fh == 0 {
+            Err(Errno::from(libc::EBADF))?;
+        }
+        let key = fh - 1;
+        let file = self.get_opened_file(key as _, path)?;
+
+        if !file.is_read {
+            Err(Errno::from(libc::EACCES))?;
+        }
+
+        let data = self
+            .op
+            .read_with(&file.path.to_string_lossy())
+            .range(offset..offset + size as u64)
+            .await
+            .map_err(opendal_error2errno)?;
+
+        Ok(ReplyData { data: data.into() })
+    }
+
+    async fn write(
+        &self,
+        _req: Request,
+        path: Option<&OsStr>,
+        fh: u64,
+        offset: u64,
+        data: &[u8],
+        flags: u32,
+    ) -> Result<ReplyWrite> {
+        log::debug!(
+            "write(path={:?}, fh={}, offset={}, data_len={}, flags=0x{:x})",
+            path,
+            fh,
+            offset,
+            data.len(),
+            flags
+        );
+
+        if offset != 0 {
+            Err(Errno::from(libc::EINVAL))?;
+        }
+
+        if fh == 0 {
+            Err(Errno::from(libc::EBADF))?;
+        }
+        let key = fh - 1;
+
+        let file = self.get_opened_file(key as _, path)?;
+        if !file.is_write {
+            Err(Errno::from(libc::EACCES))?;
+        }
+
+        self.op
+            .write_with(
+                &file.path.clone().to_string_lossy(),
+                Bytes::copy_from_slice(data),
+            )
+            .append(file.is_append)
+            .await
+            .map_err(opendal_error2errno)?;
+
+        Ok(ReplyWrite {
+            written: data.len() as _,
+        })
+    }
+
+    async fn readdir(
+        &self,
+        _req: Request,
+        path: &OsStr,
+        fh: u64,
+        offset: i64,
+    ) -> Result<ReplyDirectory<Self::DirEntryStream>> {
+        log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset);
+
+        let mut current_dir = PathBuf::from(path);
+        current_dir.push(""); // ref 
https://users.rust-lang.org/t/trailing-in-paths/43166
+        let children = self
+            .op
+            .lister(&current_dir.to_string_lossy())
+            .await
+            .map_err(opendal_error2errno)?
+            .enumerate()
+            .map(|(i, entry)| {
+                entry
+                    .map(|e| DirectoryEntry {
+                        kind: entry_mode2file_type(e.metadata().mode()),
+                        name: e.name().trim_matches('/').into(),
+                        offset: (i + 3) as i64,
+                    })
+                    .map_err(opendal_error2errno)
+            });
+
+        let relative_paths = stream::iter([
+            Result::Ok(DirectoryEntry {
+                kind: FileType::Directory,
+                name: ".".into(),
+                offset: 1,
+            }),
+            Result::Ok(DirectoryEntry {
+                kind: FileType::Directory,
+                name: "..".into(),
+                offset: 2,
+            }),
+        ]);
+
+        Ok(ReplyDirectory {
+            entries: relative_paths.chain(children).skip(offset as 
usize).boxed(),
+        })
+    }
+
+    async fn access(&self, _req: Request, path: &OsStr, mask: u32) -> 
Result<()> {
+        log::debug!("access(path={:?}, mask=0x{:x})", path, mask);
+
+        self.check_flags(mask)?;
+        self.op
+            .stat(&path.to_string_lossy())
+            .await
+            .map_err(opendal_error2errno)?;
+
+        Ok(())
+    }
+
+    async fn readdirplus(
+        &self,
+        _req: Request,
+        parent: &OsStr,
+        fh: u64,
+        offset: u64,
+        _lock_owner: u64,
+    ) -> Result<ReplyDirectoryPlus<Self::DirEntryPlusStream>> {
+        log::debug!(
+            "readdirplus(parent={:?}, fh={}, offset={})",
+            parent,
+            fh,
+            offset
+        );
+
+        let make_entry = |op: Operator, i: usize, entry: 
opendal::Result<Entry>, uid, gid, now| async move {
+            let e = entry.map_err(opendal_error2errno)?;
+            let metadata = op
+                .stat(e.name())
+                .await
+                .unwrap_or_else(|_| e.metadata().clone());
+            let attr = metadata2file_attr(&metadata, now, uid, gid);
+            Result::Ok(DirectoryEntryPlus {
+                kind: entry_mode2file_type(metadata.mode()),
+                name: e.name().trim_matches('/').into(),
+                offset: (i + 3) as i64,
+                attr,
+                entry_ttl: TTL,
+                attr_ttl: TTL,
+            })
+        };
+
+        let now = SystemTime::now();
+        let mut current_dir = PathBuf::from(parent);
+        current_dir.push(""); // ref 
https://users.rust-lang.org/t/trailing-in-paths/43166
+        let op = self.op.clone();
+        let uid = self.uid;
+        let gid = self.gid;
+        let children = self
+            .op
+            .lister(&current_dir.to_string_lossy())
+            .await
+            .map_err(opendal_error2errno)?
+            .enumerate()
+            .then(move |(i, entry)| make_entry(op.clone(), i, entry, uid, gid, 
now));
+
+        let relative_path_metadata = Metadata::new(EntryMode::DIR);
+        let relative_path_attr = metadata2file_attr(&relative_path_metadata, 
now, uid, gid);
+        let relative_paths = stream::iter([
+            Result::Ok(DirectoryEntryPlus {
+                kind: FileType::Directory,
+                name: ".".into(),
+                offset: 1,
+                attr: relative_path_attr,
+                entry_ttl: TTL,
+                attr_ttl: TTL,
+            }),
+            Result::Ok(DirectoryEntryPlus {
+                kind: FileType::Directory,
+                name: "..".into(),
+                offset: 2,
+                attr: relative_path_attr,
+                entry_ttl: TTL,
+                attr_ttl: TTL,
+            }),
+        ]);
+
+        Ok(ReplyDirectoryPlus {
+            entries: relative_paths.chain(children).skip(offset as 
usize).boxed(),
+        })
+    }
+}
+
+const fn entry_mode2file_type(mode: EntryMode) -> FileType {
+    match mode {
+        EntryMode::DIR => FileType::Directory,
+        _ => FileType::RegularFile,
+    }
+}
+
+fn metadata2file_attr(metadata: &Metadata, atime: SystemTime, uid: u32, gid: 
u32) -> FileAttr {
+    let last_modified = metadata.last_modified().map(|t| 
t.into()).unwrap_or(atime);
+    let kind = entry_mode2file_type(metadata.mode());
+    FileAttr {
+        size: metadata.content_length(),
+        blocks: 0,
+        atime,
+        mtime: last_modified,
+        ctime: last_modified,
+        kind,
+        perm: fuse3::perm_from_mode_and_kind(kind, 0o775),
+        nlink: 0,
+        uid,
+        gid,
+        rdev: 0,
+        blksize: 4096,
+    }
+}
+
+fn opendal_error2errno(err: opendal::Error) -> fuse3::Errno {
+    log::trace!("opendal_error2errno: {:?}", err);
+    match err.kind() {
+        ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP),
+        ErrorKind::IsADirectory => Errno::from(libc::EISDIR),
+        ErrorKind::NotFound => Errno::from(libc::ENOENT),
+        ErrorKind::PermissionDenied => Errno::from(libc::EACCES),
+        ErrorKind::AlreadyExists => Errno::from(libc::EEXIST),
+        ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR),
+        ErrorKind::ContentTruncated => Errno::from(libc::EAGAIN),
+        ErrorKind::ContentIncomplete => Errno::from(libc::EIO),
+        _ => Errno::from(libc::ENOENT),
+    }
+}
diff --git a/bin/ofs/src/lib.rs b/bin/ofs/src/lib.rs
index 3fc0cadae6..c82d5e5edc 100644
--- a/bin/ofs/src/lib.rs
+++ b/bin/ofs/src/lib.rs
@@ -15,209 +15,75 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::ffi::OsStr;
-use std::vec::IntoIter;
-
-use async_trait::async_trait;
-use fuse3::path::prelude::*;
-use fuse3::Result;
-use futures_util::stream::Empty;
-use futures_util::stream::Iter;
-use opendal::Operator;
-
-pub struct Ofs {
-    pub op: Operator,
-}
-
-#[async_trait]
-impl PathFilesystem for Ofs {
-    type DirEntryStream = Empty<Result<DirectoryEntry>>;
-    type DirEntryPlusStream = Iter<IntoIter<Result<DirectoryEntryPlus>>>;
-
-    // Init a fuse filesystem
-    async fn init(&self, _req: Request) -> Result<()> {
-        Ok(())
-    }
+use std::collections::HashMap;
+use std::str::FromStr;
 
-    // Callback when fs is being destroyed
-    async fn destroy(&self, _req: Request) {}
+use anyhow::anyhow;
+use anyhow::Result;
+use opendal::Operator;
+use opendal::Scheme;
 
-    async fn lookup(&self, _req: Request, _parent: &OsStr, _name: &OsStr) -> 
Result<ReplyEntry> {
-        // TODO
-        Err(libc::ENOSYS.into())
-    }
+pub mod config;
+pub use config::Config;
 
-    async fn getattr(
-        &self,
-        _req: Request,
-        path: Option<&OsStr>,
-        _fh: Option<u64>,
-        _flags: u32,
-    ) -> Result<ReplyAttr> {
-        // TODO
-        log::debug!("getattr(path={:?})", path);
-
-        Err(libc::ENOSYS.into())
-    }
-
-    async fn read(
-        &self,
-        _req: Request,
-        path: Option<&OsStr>,
-        fh: u64,
-        offset: u64,
-        size: u32,
-    ) -> Result<ReplyData> {
-        // TODO
-        log::debug!(
-            "read(path={:?}, fh={}, offset={}, size={})",
-            path,
-            fh,
-            offset,
-            size
-        );
-
-        Err(libc::ENOSYS.into())
-    }
+#[cfg(target_os = "linux")]
+mod fuse;
 
-    async fn mkdir(
-        &self,
-        _req: Request,
-        parent: &OsStr,
-        name: &OsStr,
-        mode: u32,
-        _umask: u32,
-    ) -> Result<ReplyEntry> {
-        // TODO
-        log::debug!(
-            "mkdir(parent={:?}, name={:?}, mode=0o{:o})",
-            parent,
-            name,
-            mode
-        );
-
-        Err(libc::ENOSYS.into())
+pub async fn new_app(cfg: Config) -> Result<()> {
+    if cfg.backend.has_host() {
+        log::warn!("backend host will be ignored");
     }
 
-    async fn readdir(
-        &self,
-        _req: Request,
-        path: &OsStr,
-        fh: u64,
-        offset: i64,
-    ) -> Result<ReplyDirectory<Self::DirEntryStream>> {
-        // TODO
-        log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset);
-
-        Err(libc::ENOSYS.into())
-    }
+    let scheme_str = cfg.backend.scheme();
+    let op_args = cfg
+        .backend
+        .query_pairs()
+        .into_owned()
+        .collect::<HashMap<String, String>>();
+
+    let scheme = match Scheme::from_str(scheme_str) {
+        Ok(Scheme::Custom(_)) | Err(_) => Err(anyhow!("invalid scheme: {}", 
scheme_str)),
+        Ok(s) => Ok(s),
+    }?;
+    let backend = Operator::via_map(scheme, op_args)?;
+
+    let args = Args {
+        mount_path: cfg.mount_path,
+        backend,
+    };
+    execute(args).await
+}
 
-    async fn mknod(
-        &self,
-        _req: Request,
-        parent: &OsStr,
-        name: &OsStr,
-        mode: u32,
-        _rdev: u32,
-    ) -> Result<ReplyEntry> {
-        // TODO
-        log::debug!(
-            "mknod(parent={:?}, name={:?}, mode=0o{:o})",
-            parent,
-            name,
-            mode
-        );
-
-        Err(libc::ENOSYS.into())
-    }
+struct Args {
+    mount_path: String,
+    backend: Operator,
+}
 
-    async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> 
Result<ReplyOpen> {
-        // TODO
-        log::debug!("open(path={:?}, flags=0x{:x})", path, flags);
+#[cfg(not(target_os = "linux"))]
+async fn execute(_: FrontendArgs) -> Result<()> {
+    Err(anyhow::anyhow!("platform not supported"))
+}
 
-        Err(libc::ENOSYS.into())
-    }
+#[cfg(target_os = "linux")]
+async fn execute(args: Args) -> Result<()> {
+    use fuse3::path::Session;
+    use fuse3::MountOptions;
 
-    async fn setattr(
-        &self,
-        _req: Request,
-        path: Option<&OsStr>,
-        _fh: Option<u64>,
-        _set_attr: SetAttr,
-    ) -> Result<ReplyAttr> {
-        // TODO
-        log::debug!("setattr(path={:?})", path);
-
-        Err(libc::ENOSYS.into())
-    }
+    let uid = nix::unistd::getuid();
+    let gid = nix::unistd::getgid();
 
-    async fn write(
-        &self,
-        _req: Request,
-        path: Option<&OsStr>,
-        fh: u64,
-        offset: u64,
-        data: &[u8],
-        flags: u32,
-    ) -> Result<ReplyWrite> {
-        // TODO
-        log::debug!(
-            "write(path={:?}, fh={}, offset={}, len={}, flags=0x{:x})",
-            path,
-            fh,
-            offset,
-            data.len(),
-            flags
-        );
-
-        Err(libc::ENOSYS.into())
-    }
+    let mut mount_option = MountOptions::default();
+    mount_option.uid(uid.into());
+    mount_option.gid(gid.into());
+    mount_option.no_open_dir_support(true);
 
-    async fn release(
-        &self,
-        _req: Request,
-        path: Option<&OsStr>,
-        fh: u64,
-        flags: u32,
-        _lock_owner: u64,
-        flush: bool,
-    ) -> Result<()> {
-        // TODO
-        log::debug!(
-            "release(path={:?}, fh={}, flags={}, flush={})",
-            path,
-            fh,
-            flags,
-            flush
-        );
-
-        Err(libc::ENOSYS.into())
-    }
+    let ofs = fuse::Ofs::new(args.backend, uid.into(), gid.into());
 
-    async fn rename(
-        &self,
-        _req: Request,
-        origin_parent: &OsStr,
-        origin_name: &OsStr,
-        parent: &OsStr,
-        name: &OsStr,
-    ) -> Result<()> {
-        // TODO
-        log::debug!(
-            "rename(p={:?}, name={:?}, newp={:?}, newname={:?})",
-            origin_parent,
-            origin_name,
-            parent,
-            name
-        );
-
-        Err(libc::ENOSYS.into())
-    }
+    let mount_handle = Session::new(mount_option)
+        .mount_with_unprivileged(ofs, args.mount_path)
+        .await?;
 
-    async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> 
Result<()> {
-        // TODO
-        log::debug!("unlink(parent={:?}, name={:?})", parent, name);
+    mount_handle.await?;
 
-        Err(libc::ENOSYS.into())
-    }
+    Ok(())
 }

Reply via email to