This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new acb8ef6128 feat(bin/ofs): implement fuse for linux (#4179)
acb8ef6128 is described below
commit acb8ef61283fc234dca4a474a15649e1d462e02e
Author: Ho 229 <[email protected]>
AuthorDate: Thu Feb 22 20:49:43 2024 +0800
feat(bin/ofs): implement fuse for linux (#4179)
* refactor: project structure
* feat: impl fuse for linux [wip]
* feat: impl fuse for linux
* refactor: use Operator::lister in readdir
* refactor: remove frontend folder
* chore: cleanup code
* chore: make clippy happy
* Update Cargo.lock
* fix: dependencies
---
bin/ofs/Cargo.lock | 25 +-
bin/ofs/Cargo.toml | 14 +-
bin/ofs/src/bin/ofs.rs | 62 +----
bin/ofs/src/config.rs | 33 +++
bin/ofs/src/fuse.rs | 670 +++++++++++++++++++++++++++++++++++++++++++++++++
bin/ofs/src/lib.rs | 246 +++++-------------
6 files changed, 789 insertions(+), 261 deletions(-)
diff --git a/bin/ofs/Cargo.lock b/bin/ofs/Cargo.lock
index 6a3952492a..8ca5394fa0 100644
--- a/bin/ofs/Cargo.lock
+++ b/bin/ofs/Cargo.lock
@@ -57,9 +57,9 @@ dependencies = [
[[package]]
name = "anstyle"
-version = "1.0.5"
+version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220"
+checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc"
[[package]]
name = "anstyle-parse"
@@ -216,9 +216,9 @@ checksum =
"baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
-version = "0.4.33"
+version = "0.4.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
+checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b"
dependencies = [
"android-tzdata",
"iana-time-zone",
@@ -419,9 +419,9 @@ dependencies = [
[[package]]
name = "env_logger"
-version = "0.11.1"
+version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "05e7cf40684ae96ade6232ed84582f40ce0a66efcd43a5117aef610534f8e0b8"
+checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d"
dependencies = [
"anstream",
"anstyle",
@@ -1026,7 +1026,8 @@ name = "ofs"
version = "0.0.1+core.0.45.0"
dependencies = [
"anyhow",
- "async-trait",
+ "bytes",
+ "chrono",
"clap",
"env_logger",
"fuse3",
@@ -1035,6 +1036,7 @@ dependencies = [
"log",
"nix 0.27.1",
"opendal",
+ "sharded-slab",
"tokio",
"url",
]
@@ -1559,6 +1561,15 @@ dependencies = [
"digest",
]
+[[package]]
+name = "sharded-slab"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
+dependencies = [
+ "lazy_static",
+]
+
[[package]]
name = "signature"
version = "2.2.0"
diff --git a/bin/ofs/Cargo.toml b/bin/ofs/Cargo.toml
index 187e81d4f8..f6a64bc5e0 100644
--- a/bin/ofs/Cargo.toml
+++ b/bin/ofs/Cargo.toml
@@ -31,14 +31,10 @@ rust-version = "1.67"
[dependencies]
anyhow = "1"
-async-trait = "0.1.75"
clap = { version = "4.5.1", features = ["derive", "env"] }
-env_logger = "0.11"
-fuse3 = { "version" = "0.6.1", "features" = ["tokio-runtime", "unprivileged"] }
+env_logger = "0.11.2"
futures-util = "0.3.30"
-libc = "0.2.151"
log = "0.4.20"
-nix = { version = "0.27.1", features = ["user"] }
opendal = { version = "0.45.0", path = "../../core" }
tokio = { version = "1.34", features = [
"fs",
@@ -47,3 +43,11 @@ tokio = { version = "1.34", features = [
"io-std",
] }
url = "2.5.0"
+chrono = "0.4.34"
+sharded-slab = "0.1.7"
+bytes = "1.5.0"
+
+[target.'cfg(target_os = "linux")'.dependencies]
+libc = "0.2.151"
+fuse3 = { "version" = "0.6.1", "features" = ["tokio-runtime", "unprivileged"] }
+nix = { version = "0.27.1", features = ["user"] }
diff --git a/bin/ofs/src/bin/ofs.rs b/bin/ofs/src/bin/ofs.rs
index 08399b56ef..89a6199287 100644
--- a/bin/ofs/src/bin/ofs.rs
+++ b/bin/ofs/src/bin/ofs.rs
@@ -15,69 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
-use std::str::FromStr;
-
-use anyhow::anyhow;
-use anyhow::Context;
use anyhow::Result;
use clap::Parser;
-use fuse3::path::Session;
-use fuse3::MountOptions;
-use ofs::Ofs;
-use opendal::Operator;
-use opendal::Scheme;
-use url::Url;
#[tokio::main]
async fn main() -> Result<()> {
- env_logger::init();
- fuse().await
-}
-
-#[derive(Parser, Debug)]
-#[command(version, about)]
-struct Config {
- /// fuse mount path
- #[arg(env = "OFS_MOUNT_PATH", index = 1)]
- mount_path: String,
-
- /// location of opendal service
- /// format: <scheme>://?<key>=<value>&<key>=<value>
- /// example: fs://root=/tmp
- #[arg(env = "OFS_BACKEND", index = 2)]
- backend: String,
-}
-
-async fn fuse() -> Result<()> {
- let cfg = Config::try_parse().context("parse command line arguments")?;
-
- let location = Url::parse(&cfg.backend)?;
- if location.has_host() {
- Err(anyhow!("Host part in a location is not supported."))?;
- }
+ let cfg = ofs::Config::parse();
- let scheme_str = location.scheme();
-
- let op_args = location
- .query_pairs()
- .into_owned()
- .collect::<HashMap<String, String>>();
-
- let scheme = Scheme::from_str(scheme_str).context("unsupported scheme")?;
- let op = Operator::via_map(scheme, op_args)?;
-
- let mut mount_option = MountOptions::default();
- mount_option.uid(nix::unistd::getuid().into());
- mount_option.gid(nix::unistd::getgid().into());
-
- let ofs = Ofs { op };
-
- let mounthandle = Session::new(mount_option)
- .mount_with_unprivileged(ofs, cfg.mount_path)
- .await?;
-
- mounthandle.await?;
-
- Ok(())
+ env_logger::init();
+ ofs::new_app(cfg).await
}
diff --git a/bin/ofs/src/config.rs b/bin/ofs/src/config.rs
new file mode 100644
index 0000000000..6afa729a79
--- /dev/null
+++ b/bin/ofs/src/config.rs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use clap::Parser;
+use url::Url;
+
+#[derive(Parser, Debug)]
+#[command(version, about)]
+pub struct Config {
+ /// fuse mount path
+ #[arg(env = "OFS_MOUNT_PATH", index = 1)]
+ pub(crate) mount_path: String,
+
+ /// location of opendal service
+ /// format: <scheme>://?<key>=<value>&<key>=<value>
+ /// example: fs://?root=/tmp
+ #[arg(env = "OFS_BACKEND", index = 2)]
+ pub(crate) backend: Url,
+}
diff --git a/bin/ofs/src/fuse.rs b/bin/ofs/src/fuse.rs
new file mode 100644
index 0000000000..214187e861
--- /dev/null
+++ b/bin/ofs/src/fuse.rs
@@ -0,0 +1,670 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ffi::OsStr;
+use std::ffi::OsString;
+use std::ops::Deref;
+use std::path::PathBuf;
+use std::time::Duration;
+use std::time::SystemTime;
+
+use bytes::Bytes;
+use fuse3::async_trait;
+use fuse3::path::prelude::*;
+use fuse3::Errno;
+use fuse3::Result;
+use futures_util::stream;
+use futures_util::stream::BoxStream;
+use futures_util::StreamExt;
+
+use opendal::Entry;
+use opendal::EntryMode;
+use opendal::ErrorKind;
+use opendal::Metadata;
+use opendal::Operator;
+use sharded_slab::Slab;
+
+const TTL: Duration = Duration::from_secs(1); // 1 second
+
+#[derive(Debug, Clone)]
+struct OpenedFile {
+ path: OsString,
+ is_read: bool,
+ is_write: bool,
+ is_append: bool,
+}
+
+pub(super) struct Ofs {
+ op: Operator,
+ gid: u32,
+ uid: u32,
+ opened_files: Slab<OpenedFile>,
+}
+
+impl Ofs {
+ pub fn new(op: Operator, uid: u32, gid: u32) -> Self {
+ Self {
+ op,
+ uid,
+ gid,
+ opened_files: Slab::new(),
+ }
+ }
+
+ fn check_flags(&self, flags: u32) -> Result<(bool, bool)> {
+ let mode = flags & libc::O_ACCMODE as u32;
+ let is_read = mode == libc::O_RDONLY as u32 || mode == libc::O_RDWR as
u32;
+ let is_write = mode == libc::O_WRONLY as u32 || mode == libc::O_RDWR
as u32;
+ if !is_read && !is_write {
+ Err(Errno::from(libc::EINVAL))?;
+ }
+
+ let capability = self.op.info().full_capability();
+ if is_read && !capability.read {
+ Err(Errno::from(libc::EACCES))?;
+ }
+ if is_write && !capability.write {
+ Err(Errno::from(libc::EACCES))?;
+ }
+
+ log::trace!("check_flags: is_read={}, is_write={}", is_read, is_write);
+ Ok((is_read, is_write))
+ }
+
+ // Get opened file and check given path
+ fn get_opened_file(&self, key: usize, path: Option<&OsStr>) ->
Result<OpenedFile> {
+ let file = self
+ .opened_files
+ .get(key)
+ .as_ref()
+ .ok_or(Errno::from(libc::ENOENT))?
+ .deref()
+ .clone();
+ if matches!(path, Some(path) if path != file.path) {
+ log::trace!(
+ "get_opened_file: path not match: path={:?}, file={:?}",
+ path,
+ file.path
+ );
+ Err(Errno::from(libc::EBADF))?;
+ }
+
+ Ok(file)
+ }
+}
+
+#[async_trait]
+impl PathFilesystem for Ofs {
+ type DirEntryStream = BoxStream<'static, Result<DirectoryEntry>>;
+ type DirEntryPlusStream = BoxStream<'static, Result<DirectoryEntryPlus>>;
+
+ // Init a fuse filesystem
+ async fn init(&self, _req: Request) -> Result<()> {
+ Ok(())
+ }
+
+ // Callback when fs is being destroyed
+ async fn destroy(&self, _req: Request) {}
+
+ async fn lookup(&self, _req: Request, parent: &OsStr, name: &OsStr) ->
Result<ReplyEntry> {
+ log::debug!("lookup(parent={:?}, name={:?})", parent, name);
+
+ let path = PathBuf::from(parent).join(name);
+ let metadata = self
+ .op
+ .stat(&path.to_string_lossy())
+ .await
+ .map_err(opendal_error2errno)?;
+
+ let now = SystemTime::now();
+ let attr = metadata2file_attr(&metadata, now, self.uid, self.gid);
+
+ Ok(ReplyEntry { ttl: TTL, attr })
+ }
+
+ async fn getattr(
+ &self,
+ _req: Request,
+ path: Option<&OsStr>,
+ fh: Option<u64>,
+ flags: u32,
+ ) -> Result<ReplyAttr> {
+ log::debug!("getattr(path={:?}, fh={:?}, flags={:?})", path, fh,
flags);
+
+ let key = fh.unwrap_or_default() - 1;
+ let fh_path = self
+ .opened_files
+ .get(key as usize)
+ .as_ref()
+ .map(|f| &f.path)
+ .cloned();
+
+ let file_path = match (path.map(Into::into), fh_path) {
+ (Some(a), Some(b)) => {
+ if a != b {
+ Err(Errno::from(libc::EBADF))?;
+ }
+ Some(a)
+ }
+ (a, b) => a.or(b),
+ };
+
+ let metadata = self
+ .op
+ .stat(&file_path.unwrap_or_default().to_string_lossy())
+ .await
+ .map_err(opendal_error2errno)?;
+
+ let now = SystemTime::now();
+ let attr = metadata2file_attr(&metadata, now, self.uid, self.gid);
+
+ Ok(ReplyAttr { ttl: TTL, attr })
+ }
+
+ async fn setattr(
+ &self,
+ _req: Request,
+ path: Option<&OsStr>,
+ fh: Option<u64>,
+ set_attr: SetAttr,
+ ) -> Result<ReplyAttr> {
+ log::debug!(
+ "setattr(path={:?}, fh={:?}, set_attr={:?})",
+ path,
+ fh,
+ set_attr
+ );
+ Err(libc::EOPNOTSUPP.into())
+ }
+
+ async fn symlink(
+ &self,
+ _req: Request,
+ parent: &OsStr,
+ name: &OsStr,
+ link_path: &OsStr,
+ ) -> Result<ReplyEntry> {
+ log::debug!(
+ "symlink(parent={:?}, name={:?}, link_path={:?})",
+ parent,
+ name,
+ link_path
+ );
+ Err(libc::EOPNOTSUPP.into())
+ }
+
+ async fn mknod(
+ &self,
+ _req: Request,
+ parent: &OsStr,
+ name: &OsStr,
+ mode: u32,
+ _rdev: u32,
+ ) -> Result<ReplyEntry> {
+ log::debug!(
+ "mknod(parent={:?}, name={:?}, mode=0o{:o})",
+ parent,
+ name,
+ mode
+ );
+ Err(libc::EOPNOTSUPP.into())
+ }
+
+ async fn mkdir(
+ &self,
+ _req: Request,
+ parent: &OsStr,
+ name: &OsStr,
+ mode: u32,
+ _umask: u32,
+ ) -> Result<ReplyEntry> {
+ log::debug!(
+ "mkdir(parent={:?}, name={:?}, mode=0o{:o})",
+ parent,
+ name,
+ mode
+ );
+
+ let mut path = PathBuf::from(parent).join(name);
+ path.push(""); // ref
https://users.rust-lang.org/t/trailing-in-paths/43166
+ self.op
+ .create_dir(&path.to_string_lossy())
+ .await
+ .map_err(opendal_error2errno)?;
+
+ let metadata = Metadata::new(EntryMode::DIR);
+ let now = SystemTime::now();
+ let attr = metadata2file_attr(&metadata, now, self.uid, self.gid);
+
+ Ok(ReplyEntry { ttl: TTL, attr })
+ }
+
+ async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) ->
Result<()> {
+ log::debug!("unlink(parent={:?}, name={:?})", parent, name);
+
+ let path = PathBuf::from(parent).join(name);
+ self.op
+ .delete(&path.to_string_lossy())
+ .await
+ .map_err(opendal_error2errno)?;
+
+ Ok(())
+ }
+
+ async fn rmdir(&self, _req: Request, parent: &OsStr, name: &OsStr) ->
Result<()> {
+ log::debug!("rmdir(parent={:?}, name={:?})", parent, name);
+
+ let path = PathBuf::from(parent).join(name);
+ self.op
+ .delete(&path.to_string_lossy())
+ .await
+ .map_err(opendal_error2errno)?;
+
+ Ok(())
+ }
+
+ async fn rename(
+ &self,
+ _req: Request,
+ origin_parent: &OsStr,
+ origin_name: &OsStr,
+ parent: &OsStr,
+ name: &OsStr,
+ ) -> Result<()> {
+ log::debug!(
+ "rename(p={:?}, name={:?}, newp={:?}, newname={:?})",
+ origin_parent,
+ origin_name,
+ parent,
+ name
+ );
+
+ let origin_path = PathBuf::from(origin_parent).join(origin_name);
+ let path = PathBuf::from(parent).join(name);
+
+ self.op
+ .rename(&origin_path.to_string_lossy(), &path.to_string_lossy())
+ .await
+ .map_err(opendal_error2errno)?;
+
+ Ok(())
+ }
+
+ async fn link(
+ &self,
+ _req: Request,
+ path: &OsStr,
+ new_parent: &OsStr,
+ new_name: &OsStr,
+ ) -> Result<ReplyEntry> {
+ log::debug!(
+ "link(path={:?}, new_parent={:?}, new_name={:?})",
+ path,
+ new_parent,
+ new_name
+ );
+ Err(libc::EOPNOTSUPP.into())
+ }
+
+ async fn create(
+ &self,
+ _req: Request,
+ parent: &OsStr,
+ name: &OsStr,
+ mode: u32,
+ flags: u32,
+ ) -> Result<ReplyCreated> {
+ log::debug!(
+ "create(parent={:?}, name={:?}, mode=0o{:o}, flags=0x{:x})",
+ parent,
+ name,
+ mode,
+ flags
+ );
+
+ let (is_read, is_write) = self.check_flags(flags)?;
+
+ let path = PathBuf::from(parent).join(name);
+ self.op
+ .write(&path.to_string_lossy(), Bytes::new())
+ .await
+ .map_err(opendal_error2errno)?;
+
+ let metadata = Metadata::new(EntryMode::FILE);
+ let attr = metadata2file_attr(&metadata, SystemTime::now(), self.uid,
self.gid);
+
+ let fh = self
+ .opened_files
+ .insert(OpenedFile {
+ path: path.into(),
+ is_read,
+ is_write,
+ is_append: flags & libc::O_APPEND as u32 != 0,
+ })
+ .ok_or(Errno::from(libc::EBUSY))? as u64
+ + 1; // ensure fh > 0
+
+ Ok(ReplyCreated {
+ ttl: TTL,
+ attr,
+ generation: 0,
+ fh,
+ flags,
+ })
+ }
+
+ async fn release(
+ &self,
+ _req: Request,
+ path: Option<&OsStr>,
+ fh: u64,
+ flags: u32,
+ lock_owner: u64,
+ flush: bool,
+ ) -> Result<()> {
+ log::debug!(
+ "release(path={:?}, fh={}, flags=0x{:x}, lock_owner={}, flush={})",
+ path,
+ fh,
+ flags,
+ lock_owner,
+ flush
+ );
+
+ let key = fh as usize - 1;
+ let file = self
+ .opened_files
+ .take(key)
+ .ok_or(Errno::from(libc::EBADF))?;
+ if matches!(path, Some(ref p) if p != &file.path) {
+ Err(Errno::from(libc::EBADF))?;
+ }
+
+ Ok(())
+ }
+
+ async fn open(&self, _req: Request, path: &OsStr, flags: u32) ->
Result<ReplyOpen> {
+ log::debug!("open(path={:?}, flags=0x{:x})", path, flags);
+
+ let (is_read, is_write) = self.check_flags(flags)?;
+
+ let fh = self
+ .opened_files
+ .insert(OpenedFile {
+ path: path.into(),
+ is_read,
+ is_write,
+ is_append: flags & libc::O_APPEND as u32 != 0,
+ })
+ .ok_or(Errno::from(libc::EBUSY))? as u64
+ + 1; // ensure fh > 0
+
+ Ok(ReplyOpen { fh, flags })
+ }
+
+ async fn read(
+ &self,
+ _req: Request,
+ path: Option<&OsStr>,
+ fh: u64,
+ offset: u64,
+ size: u32,
+ ) -> Result<ReplyData> {
+ log::debug!(
+ "read(path={:?}, fh={}, offset={}, size={})",
+ path,
+ fh,
+ offset,
+ size
+ );
+
+ if fh == 0 {
+ Err(Errno::from(libc::EBADF))?;
+ }
+ let key = fh - 1;
+ let file = self.get_opened_file(key as _, path)?;
+
+ if !file.is_read {
+ Err(Errno::from(libc::EACCES))?;
+ }
+
+ let data = self
+ .op
+ .read_with(&file.path.to_string_lossy())
+ .range(offset..offset + size as u64)
+ .await
+ .map_err(opendal_error2errno)?;
+
+ Ok(ReplyData { data: data.into() })
+ }
+
+ async fn write(
+ &self,
+ _req: Request,
+ path: Option<&OsStr>,
+ fh: u64,
+ offset: u64,
+ data: &[u8],
+ flags: u32,
+ ) -> Result<ReplyWrite> {
+ log::debug!(
+ "write(path={:?}, fh={}, offset={}, data_len={}, flags=0x{:x})",
+ path,
+ fh,
+ offset,
+ data.len(),
+ flags
+ );
+
+ if offset != 0 {
+ Err(Errno::from(libc::EINVAL))?;
+ }
+
+ if fh == 0 {
+ Err(Errno::from(libc::EBADF))?;
+ }
+ let key = fh - 1;
+
+ let file = self.get_opened_file(key as _, path)?;
+ if !file.is_write {
+ Err(Errno::from(libc::EACCES))?;
+ }
+
+ self.op
+ .write_with(
+ &file.path.clone().to_string_lossy(),
+ Bytes::copy_from_slice(data),
+ )
+ .append(file.is_append)
+ .await
+ .map_err(opendal_error2errno)?;
+
+ Ok(ReplyWrite {
+ written: data.len() as _,
+ })
+ }
+
+ async fn readdir(
+ &self,
+ _req: Request,
+ path: &OsStr,
+ fh: u64,
+ offset: i64,
+ ) -> Result<ReplyDirectory<Self::DirEntryStream>> {
+ log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset);
+
+ let mut current_dir = PathBuf::from(path);
+ current_dir.push(""); // ref
https://users.rust-lang.org/t/trailing-in-paths/43166
+ let children = self
+ .op
+ .lister(¤t_dir.to_string_lossy())
+ .await
+ .map_err(opendal_error2errno)?
+ .enumerate()
+ .map(|(i, entry)| {
+ entry
+ .map(|e| DirectoryEntry {
+ kind: entry_mode2file_type(e.metadata().mode()),
+ name: e.name().trim_matches('/').into(),
+ offset: (i + 3) as i64,
+ })
+ .map_err(opendal_error2errno)
+ });
+
+ let relative_paths = stream::iter([
+ Result::Ok(DirectoryEntry {
+ kind: FileType::Directory,
+ name: ".".into(),
+ offset: 1,
+ }),
+ Result::Ok(DirectoryEntry {
+ kind: FileType::Directory,
+ name: "..".into(),
+ offset: 2,
+ }),
+ ]);
+
+ Ok(ReplyDirectory {
+ entries: relative_paths.chain(children).skip(offset as
usize).boxed(),
+ })
+ }
+
+ async fn access(&self, _req: Request, path: &OsStr, mask: u32) ->
Result<()> {
+ log::debug!("access(path={:?}, mask=0x{:x})", path, mask);
+
+ self.check_flags(mask)?;
+ self.op
+ .stat(&path.to_string_lossy())
+ .await
+ .map_err(opendal_error2errno)?;
+
+ Ok(())
+ }
+
+ async fn readdirplus(
+ &self,
+ _req: Request,
+ parent: &OsStr,
+ fh: u64,
+ offset: u64,
+ _lock_owner: u64,
+ ) -> Result<ReplyDirectoryPlus<Self::DirEntryPlusStream>> {
+ log::debug!(
+ "readdirplus(parent={:?}, fh={}, offset={})",
+ parent,
+ fh,
+ offset
+ );
+
+ let make_entry = |op: Operator, i: usize, entry:
opendal::Result<Entry>, uid, gid, now| async move {
+ let e = entry.map_err(opendal_error2errno)?;
+ let metadata = op
+ .stat(e.name())
+ .await
+ .unwrap_or_else(|_| e.metadata().clone());
+ let attr = metadata2file_attr(&metadata, now, uid, gid);
+ Result::Ok(DirectoryEntryPlus {
+ kind: entry_mode2file_type(metadata.mode()),
+ name: e.name().trim_matches('/').into(),
+ offset: (i + 3) as i64,
+ attr,
+ entry_ttl: TTL,
+ attr_ttl: TTL,
+ })
+ };
+
+ let now = SystemTime::now();
+ let mut current_dir = PathBuf::from(parent);
+ current_dir.push(""); // ref
https://users.rust-lang.org/t/trailing-in-paths/43166
+ let op = self.op.clone();
+ let uid = self.uid;
+ let gid = self.gid;
+ let children = self
+ .op
+ .lister(¤t_dir.to_string_lossy())
+ .await
+ .map_err(opendal_error2errno)?
+ .enumerate()
+ .then(move |(i, entry)| make_entry(op.clone(), i, entry, uid, gid,
now));
+
+ let relative_path_metadata = Metadata::new(EntryMode::DIR);
+ let relative_path_attr = metadata2file_attr(&relative_path_metadata,
now, uid, gid);
+ let relative_paths = stream::iter([
+ Result::Ok(DirectoryEntryPlus {
+ kind: FileType::Directory,
+ name: ".".into(),
+ offset: 1,
+ attr: relative_path_attr,
+ entry_ttl: TTL,
+ attr_ttl: TTL,
+ }),
+ Result::Ok(DirectoryEntryPlus {
+ kind: FileType::Directory,
+ name: "..".into(),
+ offset: 2,
+ attr: relative_path_attr,
+ entry_ttl: TTL,
+ attr_ttl: TTL,
+ }),
+ ]);
+
+ Ok(ReplyDirectoryPlus {
+ entries: relative_paths.chain(children).skip(offset as
usize).boxed(),
+ })
+ }
+}
+
+const fn entry_mode2file_type(mode: EntryMode) -> FileType {
+ match mode {
+ EntryMode::DIR => FileType::Directory,
+ _ => FileType::RegularFile,
+ }
+}
+
+fn metadata2file_attr(metadata: &Metadata, atime: SystemTime, uid: u32, gid:
u32) -> FileAttr {
+ let last_modified = metadata.last_modified().map(|t|
t.into()).unwrap_or(atime);
+ let kind = entry_mode2file_type(metadata.mode());
+ FileAttr {
+ size: metadata.content_length(),
+ blocks: 0,
+ atime,
+ mtime: last_modified,
+ ctime: last_modified,
+ kind,
+ perm: fuse3::perm_from_mode_and_kind(kind, 0o775),
+ nlink: 0,
+ uid,
+ gid,
+ rdev: 0,
+ blksize: 4096,
+ }
+}
+
+fn opendal_error2errno(err: opendal::Error) -> fuse3::Errno {
+ log::trace!("opendal_error2errno: {:?}", err);
+ match err.kind() {
+ ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP),
+ ErrorKind::IsADirectory => Errno::from(libc::EISDIR),
+ ErrorKind::NotFound => Errno::from(libc::ENOENT),
+ ErrorKind::PermissionDenied => Errno::from(libc::EACCES),
+ ErrorKind::AlreadyExists => Errno::from(libc::EEXIST),
+ ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR),
+ ErrorKind::ContentTruncated => Errno::from(libc::EAGAIN),
+ ErrorKind::ContentIncomplete => Errno::from(libc::EIO),
+ _ => Errno::from(libc::ENOENT),
+ }
+}
diff --git a/bin/ofs/src/lib.rs b/bin/ofs/src/lib.rs
index 3fc0cadae6..c82d5e5edc 100644
--- a/bin/ofs/src/lib.rs
+++ b/bin/ofs/src/lib.rs
@@ -15,209 +15,75 @@
// specific language governing permissions and limitations
// under the License.
-use std::ffi::OsStr;
-use std::vec::IntoIter;
-
-use async_trait::async_trait;
-use fuse3::path::prelude::*;
-use fuse3::Result;
-use futures_util::stream::Empty;
-use futures_util::stream::Iter;
-use opendal::Operator;
-
-pub struct Ofs {
- pub op: Operator,
-}
-
-#[async_trait]
-impl PathFilesystem for Ofs {
- type DirEntryStream = Empty<Result<DirectoryEntry>>;
- type DirEntryPlusStream = Iter<IntoIter<Result<DirectoryEntryPlus>>>;
-
- // Init a fuse filesystem
- async fn init(&self, _req: Request) -> Result<()> {
- Ok(())
- }
+use std::collections::HashMap;
+use std::str::FromStr;
- // Callback when fs is being destroyed
- async fn destroy(&self, _req: Request) {}
+use anyhow::anyhow;
+use anyhow::Result;
+use opendal::Operator;
+use opendal::Scheme;
- async fn lookup(&self, _req: Request, _parent: &OsStr, _name: &OsStr) ->
Result<ReplyEntry> {
- // TODO
- Err(libc::ENOSYS.into())
- }
+pub mod config;
+pub use config::Config;
- async fn getattr(
- &self,
- _req: Request,
- path: Option<&OsStr>,
- _fh: Option<u64>,
- _flags: u32,
- ) -> Result<ReplyAttr> {
- // TODO
- log::debug!("getattr(path={:?})", path);
-
- Err(libc::ENOSYS.into())
- }
-
- async fn read(
- &self,
- _req: Request,
- path: Option<&OsStr>,
- fh: u64,
- offset: u64,
- size: u32,
- ) -> Result<ReplyData> {
- // TODO
- log::debug!(
- "read(path={:?}, fh={}, offset={}, size={})",
- path,
- fh,
- offset,
- size
- );
-
- Err(libc::ENOSYS.into())
- }
+#[cfg(target_os = "linux")]
+mod fuse;
- async fn mkdir(
- &self,
- _req: Request,
- parent: &OsStr,
- name: &OsStr,
- mode: u32,
- _umask: u32,
- ) -> Result<ReplyEntry> {
- // TODO
- log::debug!(
- "mkdir(parent={:?}, name={:?}, mode=0o{:o})",
- parent,
- name,
- mode
- );
-
- Err(libc::ENOSYS.into())
+pub async fn new_app(cfg: Config) -> Result<()> {
+ if cfg.backend.has_host() {
+ log::warn!("backend host will be ignored");
}
- async fn readdir(
- &self,
- _req: Request,
- path: &OsStr,
- fh: u64,
- offset: i64,
- ) -> Result<ReplyDirectory<Self::DirEntryStream>> {
- // TODO
- log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset);
-
- Err(libc::ENOSYS.into())
- }
+ let scheme_str = cfg.backend.scheme();
+ let op_args = cfg
+ .backend
+ .query_pairs()
+ .into_owned()
+ .collect::<HashMap<String, String>>();
+
+ let scheme = match Scheme::from_str(scheme_str) {
+ Ok(Scheme::Custom(_)) | Err(_) => Err(anyhow!("invalid scheme: {}",
scheme_str)),
+ Ok(s) => Ok(s),
+ }?;
+ let backend = Operator::via_map(scheme, op_args)?;
+
+ let args = Args {
+ mount_path: cfg.mount_path,
+ backend,
+ };
+ execute(args).await
+}
- async fn mknod(
- &self,
- _req: Request,
- parent: &OsStr,
- name: &OsStr,
- mode: u32,
- _rdev: u32,
- ) -> Result<ReplyEntry> {
- // TODO
- log::debug!(
- "mknod(parent={:?}, name={:?}, mode=0o{:o})",
- parent,
- name,
- mode
- );
-
- Err(libc::ENOSYS.into())
- }
+struct Args {
+ mount_path: String,
+ backend: Operator,
+}
- async fn open(&self, _req: Request, path: &OsStr, flags: u32) ->
Result<ReplyOpen> {
- // TODO
- log::debug!("open(path={:?}, flags=0x{:x})", path, flags);
+#[cfg(not(target_os = "linux"))]
+async fn execute(_: FrontendArgs) -> Result<()> {
+ Err(anyhow::anyhow!("platform not supported"))
+}
- Err(libc::ENOSYS.into())
- }
+#[cfg(target_os = "linux")]
+async fn execute(args: Args) -> Result<()> {
+ use fuse3::path::Session;
+ use fuse3::MountOptions;
- async fn setattr(
- &self,
- _req: Request,
- path: Option<&OsStr>,
- _fh: Option<u64>,
- _set_attr: SetAttr,
- ) -> Result<ReplyAttr> {
- // TODO
- log::debug!("setattr(path={:?})", path);
-
- Err(libc::ENOSYS.into())
- }
+ let uid = nix::unistd::getuid();
+ let gid = nix::unistd::getgid();
- async fn write(
- &self,
- _req: Request,
- path: Option<&OsStr>,
- fh: u64,
- offset: u64,
- data: &[u8],
- flags: u32,
- ) -> Result<ReplyWrite> {
- // TODO
- log::debug!(
- "write(path={:?}, fh={}, offset={}, len={}, flags=0x{:x})",
- path,
- fh,
- offset,
- data.len(),
- flags
- );
-
- Err(libc::ENOSYS.into())
- }
+ let mut mount_option = MountOptions::default();
+ mount_option.uid(uid.into());
+ mount_option.gid(gid.into());
+ mount_option.no_open_dir_support(true);
- async fn release(
- &self,
- _req: Request,
- path: Option<&OsStr>,
- fh: u64,
- flags: u32,
- _lock_owner: u64,
- flush: bool,
- ) -> Result<()> {
- // TODO
- log::debug!(
- "release(path={:?}, fh={}, flags={}, flush={})",
- path,
- fh,
- flags,
- flush
- );
-
- Err(libc::ENOSYS.into())
- }
+ let ofs = fuse::Ofs::new(args.backend, uid.into(), gid.into());
- async fn rename(
- &self,
- _req: Request,
- origin_parent: &OsStr,
- origin_name: &OsStr,
- parent: &OsStr,
- name: &OsStr,
- ) -> Result<()> {
- // TODO
- log::debug!(
- "rename(p={:?}, name={:?}, newp={:?}, newname={:?})",
- origin_parent,
- origin_name,
- parent,
- name
- );
-
- Err(libc::ENOSYS.into())
- }
+ let mount_handle = Session::new(mount_option)
+ .mount_with_unprivileged(ofs, args.mount_path)
+ .await?;
- async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) ->
Result<()> {
- // TODO
- log::debug!("unlink(parent={:?}, name={:?})", parent, name);
+ mount_handle.await?;
- Err(libc::ENOSYS.into())
- }
+ Ok(())
}