This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch branch-gvfs-fuse-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-gvfs-fuse-dev by this
push:
new 8dcc7b887 [#5982] feat (gvfs-fuse): Implement Gravitino fileset file
system (#5984)
8dcc7b887 is described below
commit 8dcc7b88718043a7d2e9298fcc0e3e32c3579675
Author: Yuhui <[email protected]>
AuthorDate: Tue Dec 31 10:15:39 2024 +0800
[#5982] feat (gvfs-fuse): Implement Gravitino fileset file system (#5984)
### What changes were proposed in this pull request?
Implement an Gravitino fileset file system, Support mount fileset to
local directory
### Why are the changes needed?
Fix: #5982
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT and IT
---
clients/filesystem-fuse/Cargo.toml | 7 +
.../{Cargo.toml => conf/gvfs_fuse.toml} | 41 ++-
clients/filesystem-fuse/src/config.rs | 330 +++++++++++++++++++++
.../filesystem-fuse/src/default_raw_filesystem.rs | 32 +-
clients/filesystem-fuse/src/error.rs | 69 +++++
clients/filesystem-fuse/src/filesystem.rs | 56 ++--
clients/filesystem-fuse/src/fuse_api_handle.rs | 3 +-
clients/filesystem-fuse/src/fuse_server.rs | 8 +-
clients/filesystem-fuse/src/gravitino_client.rs | 277 +++++++++++++++++
.../src/gravitino_fileset_filesystem.rs | 130 ++++++++
clients/filesystem-fuse/src/gvfs_fuse.rs | 246 +++++++++++++++
clients/filesystem-fuse/src/lib.rs | 17 +-
clients/filesystem-fuse/src/main.rs | 21 +-
clients/filesystem-fuse/src/memory_filesystem.rs | 8 +-
clients/filesystem-fuse/src/mount.rs | 118 --------
clients/filesystem-fuse/src/utils.rs | 3 +
.../conf/gvfs_fuse_memory.toml} | 43 ++-
.../{Cargo.toml => tests/conf/gvfs_fuse_test.toml} | 43 ++-
clients/filesystem-fuse/tests/fuse_test.rs | 10 +-
19 files changed, 1218 insertions(+), 244 deletions(-)
diff --git a/clients/filesystem-fuse/Cargo.toml
b/clients/filesystem-fuse/Cargo.toml
index 75a4dd713..4008ec5ca 100644
--- a/clients/filesystem-fuse/Cargo.toml
+++ b/clients/filesystem-fuse/Cargo.toml
@@ -35,11 +35,18 @@ name = "gvfs_fuse"
[dependencies]
async-trait = "0.1"
bytes = "1.6.0"
+config = "0.13"
dashmap = "6.1.0"
fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] }
futures-util = "0.3.30"
libc = "0.2.168"
log = "0.4.22"
once_cell = "1.20.2"
+reqwest = { version = "0.12.9", features = ["json"] }
+serde = { version = "1.0.216", features = ["derive"] }
tokio = { version = "1.38.0", features = ["full"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+urlencoding = "2.1.3"
+
+[dev-dependencies]
+mockito = "0.31"
diff --git a/clients/filesystem-fuse/Cargo.toml
b/clients/filesystem-fuse/conf/gvfs_fuse.toml
similarity index 54%
copy from clients/filesystem-fuse/Cargo.toml
copy to clients/filesystem-fuse/conf/gvfs_fuse.toml
index 75a4dd713..94d3d8560 100644
--- a/clients/filesystem-fuse/Cargo.toml
+++ b/clients/filesystem-fuse/conf/gvfs_fuse.toml
@@ -15,31 +15,24 @@
# specific language governing permissions and limitations
# under the License.
-[package]
-name = "filesystem-fuse"
-version = "0.8.0-incubating-SNAPSHOT"
-rust-version = "1.75"
-edition = "2021"
+# fuse settings
+[fuse]
+file_mask = 0o600
+dir_mask = 0o700
+fs_type = "memory"
-homepage = "https://gravitino.apache.org"
-license = "Apache-2.0"
-repository = "https://github.com/apache/gravitino"
+[fuse.properties]
-[[bin]]
-name = "gvfs-fuse"
-path = "src/main.rs"
+# filesystem settings
+[filesystem]
+block_size = 8192
-[lib]
-name = "gvfs_fuse"
+# Gravitino settings
+[gravitino]
+uri = "http://localhost:8090"
+metalake = "your_metalake"
-[dependencies]
-async-trait = "0.1"
-bytes = "1.6.0"
-dashmap = "6.1.0"
-fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] }
-futures-util = "0.3.30"
-libc = "0.2.168"
-log = "0.4.22"
-once_cell = "1.20.2"
-tokio = { version = "1.38.0", features = ["full"] }
-tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+# extent settings
+[extend_config]
+access_key = "your access_key"
+secret_key = "your_secret_key"
diff --git a/clients/filesystem-fuse/src/config.rs
b/clients/filesystem-fuse/src/config.rs
new file mode 100644
index 000000000..b381caa75
--- /dev/null
+++ b/clients/filesystem-fuse/src/config.rs
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::ErrorCode::{ConfigNotFound, InvalidConfig};
+use crate::utils::GvfsResult;
+use config::{builder, Config};
+use log::{error, info, warn};
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::fs;
+
+pub(crate) const CONF_FUSE_FILE_MASK: ConfigEntity<u32> = ConfigEntity::new(
+ FuseConfig::MODULE_NAME,
+ "file_mask",
+ "The default file mask for the FUSE filesystem",
+ 0o600,
+);
+
+pub(crate) const CONF_FUSE_DIR_MASK: ConfigEntity<u32> = ConfigEntity::new(
+ FuseConfig::MODULE_NAME,
+ "dir_mask",
+ "The default directory mask for the FUSE filesystem",
+ 0o700,
+);
+
+pub(crate) const CONF_FUSE_FS_TYPE: ConfigEntity<&'static str> =
ConfigEntity::new(
+ FuseConfig::MODULE_NAME,
+ "fs_type",
+ "The type of the FUSE filesystem",
+ "memory",
+);
+
+pub(crate) const CONF_FUSE_CONFIG_PATH: ConfigEntity<&'static str> =
ConfigEntity::new(
+ FuseConfig::MODULE_NAME,
+ "config_path",
+ "The path of the FUSE configuration file",
+ "/etc/gvfs/gvfs.toml",
+);
+
+pub(crate) const CONF_FILESYSTEM_BLOCK_SIZE: ConfigEntity<u32> =
ConfigEntity::new(
+ FilesystemConfig::MODULE_NAME,
+ "block_size",
+ "The block size of the gvfs fuse filesystem",
+ 4096,
+);
+
+pub(crate) const CONF_GRAVITINO_URI: ConfigEntity<&'static str> =
ConfigEntity::new(
+ GravitinoConfig::MODULE_NAME,
+ "uri",
+ "The URI of the Gravitino server",
+ "http://localhost:8090",
+);
+
+pub(crate) const CONF_GRAVITINO_METALAKE: ConfigEntity<&'static str> =
ConfigEntity::new(
+ GravitinoConfig::MODULE_NAME,
+ "metalake",
+ "The metalake of the Gravitino server",
+ "",
+);
+
+pub(crate) struct ConfigEntity<T: 'static> {
+ module: &'static str,
+ name: &'static str,
+ description: &'static str,
+ pub(crate) default: T,
+}
+
+impl<T> ConfigEntity<T> {
+ const fn new(
+ module: &'static str,
+ name: &'static str,
+ description: &'static str,
+ default: T,
+ ) -> Self {
+ ConfigEntity {
+ module: module,
+ name: name,
+ description: description,
+ default: default,
+ }
+ }
+}
+
+enum ConfigValue {
+ I32(ConfigEntity<i32>),
+ U32(ConfigEntity<u32>),
+ String(ConfigEntity<&'static str>),
+ Bool(ConfigEntity<bool>),
+ Float(ConfigEntity<f64>),
+}
+
+struct DefaultConfig {
+ configs: HashMap<String, ConfigValue>,
+}
+
+impl Default for DefaultConfig {
+ fn default() -> Self {
+ let mut configs = HashMap::new();
+
+ configs.insert(
+ Self::compose_key(CONF_FUSE_FILE_MASK),
+ ConfigValue::U32(CONF_FUSE_FILE_MASK),
+ );
+ configs.insert(
+ Self::compose_key(CONF_FUSE_DIR_MASK),
+ ConfigValue::U32(CONF_FUSE_DIR_MASK),
+ );
+ configs.insert(
+ Self::compose_key(CONF_FUSE_FS_TYPE),
+ ConfigValue::String(CONF_FUSE_FS_TYPE),
+ );
+ configs.insert(
+ Self::compose_key(CONF_FUSE_CONFIG_PATH),
+ ConfigValue::String(CONF_FUSE_CONFIG_PATH),
+ );
+ configs.insert(
+ Self::compose_key(CONF_GRAVITINO_URI),
+ ConfigValue::String(CONF_GRAVITINO_URI),
+ );
+ configs.insert(
+ Self::compose_key(CONF_GRAVITINO_METALAKE),
+ ConfigValue::String(CONF_GRAVITINO_METALAKE),
+ );
+ configs.insert(
+ Self::compose_key(CONF_FILESYSTEM_BLOCK_SIZE),
+ ConfigValue::U32(CONF_FILESYSTEM_BLOCK_SIZE),
+ );
+
+ DefaultConfig { configs }
+ }
+}
+
+impl DefaultConfig {
+ fn compose_key<T>(entity: ConfigEntity<T>) -> String {
+ format!("{}.{}", entity.module, entity.name)
+ }
+}
+
+#[derive(Debug, Deserialize)]
+pub struct AppConfig {
+ #[serde(default)]
+ pub fuse: FuseConfig,
+ #[serde(default)]
+ pub filesystem: FilesystemConfig,
+ #[serde(default)]
+ pub gravitino: GravitinoConfig,
+ #[serde(default)]
+ pub extend_config: HashMap<String, String>,
+}
+
+impl Default for AppConfig {
+ fn default() -> Self {
+ let builder = Self::crete_default_config_builder();
+ let conf = builder
+ .build()
+ .expect("Failed to build default configuration");
+ conf.try_deserialize::<AppConfig>()
+ .expect("Failed to deserialize default AppConfig")
+ }
+}
+
+type ConfigBuilder = builder::ConfigBuilder<builder::DefaultState>;
+
+impl AppConfig {
+ fn crete_default_config_builder() -> ConfigBuilder {
+ let default = DefaultConfig::default();
+
+ default
+ .configs
+ .values()
+ .fold(
+ Config::builder(),
+ |builder, config_entity| match config_entity {
+ ConfigValue::I32(entity) => Self::add_config(builder,
entity),
+ ConfigValue::U32(entity) => Self::add_config(builder,
entity),
+ ConfigValue::String(entity) => Self::add_config(builder,
entity),
+ ConfigValue::Bool(entity) => Self::add_config(builder,
entity),
+ ConfigValue::Float(entity) => Self::add_config(builder,
entity),
+ },
+ )
+ }
+
+ fn add_config<T: Clone + Into<config::Value>>(
+ builder: ConfigBuilder,
+ entity: &ConfigEntity<T>,
+ ) -> ConfigBuilder {
+ let name = format!("{}.{}", entity.module, entity.name);
+ builder
+ .set_default(&name, entity.default.clone().into())
+ .unwrap_or_else(|e| panic!("Failed to set default for {}: {}",
entity.name, e))
+ }
+
+ pub fn from_file(config_file_path: Option<&str>) -> GvfsResult<AppConfig> {
+ let builder = Self::crete_default_config_builder();
+
+ let config_path = {
+ if config_file_path.is_some() {
+ let path = config_file_path.unwrap();
+ //check config file exists
+ if fs::metadata(path).is_err() {
+ return Err(
+ ConfigNotFound.to_error("The configuration file not
found".to_string())
+ );
+ }
+ info!("Use configuration file: {}", path);
+ path
+ } else {
+ //use default config
+ if fs::metadata(CONF_FUSE_CONFIG_PATH.default).is_err() {
+ warn!(
+ "The default configuration file is not found, using
the default configuration"
+ );
+ return Ok(AppConfig::default());
+ } else {
+ warn!(
+ "Using the default config file {}",
+ CONF_FUSE_CONFIG_PATH.default
+ );
+ }
+ CONF_FUSE_CONFIG_PATH.default
+ }
+ };
+ let config = builder
+ .add_source(config::File::with_name(config_path).required(true))
+ .build();
+ if let Err(e) = config {
+ let msg = format!("Failed to build configuration: {}", e);
+ error!("{}", msg);
+ return Err(InvalidConfig.to_error(msg));
+ }
+
+ let conf = config.unwrap();
+ let app_config = conf.try_deserialize::<AppConfig>();
+
+ if let Err(e) = app_config {
+ let msg = format!("Failed to deserialize configuration: {}", e);
+ error!("{}", msg);
+ return Err(InvalidConfig.to_error(msg));
+ }
+ Ok(app_config.unwrap())
+ }
+}
+
+#[derive(Debug, Deserialize, Default)]
+pub struct FuseConfig {
+ #[serde(default)]
+ pub file_mask: u32,
+ #[serde(default)]
+ pub dir_mask: u32,
+ #[serde(default)]
+ pub fs_type: String,
+ #[serde(default)]
+ pub config_path: String,
+ #[serde(default)]
+ pub properties: HashMap<String, String>,
+}
+
+impl FuseConfig {
+ const MODULE_NAME: &'static str = "fuse";
+}
+
+#[derive(Debug, Deserialize, Default)]
+pub struct FilesystemConfig {
+ #[serde(default)]
+ pub block_size: u32,
+}
+
+impl FilesystemConfig {
+ const MODULE_NAME: &'static str = "filesystem";
+}
+
+#[derive(Debug, Deserialize, Default)]
+pub struct GravitinoConfig {
+ #[serde(default)]
+ pub uri: String,
+ #[serde(default)]
+ pub metalake: String,
+}
+
+impl GravitinoConfig {
+ const MODULE_NAME: &'static str = "gravitino";
+}
+
+#[cfg(test)]
+mod test {
+ use crate::config::AppConfig;
+
+ #[test]
+ fn test_config_from_file() {
+ let config =
AppConfig::from_file(Some("tests/conf/gvfs_fuse_test.toml")).unwrap();
+ assert_eq!(config.fuse.file_mask, 0o644);
+ assert_eq!(config.fuse.dir_mask, 0o755);
+ assert_eq!(config.filesystem.block_size, 8192);
+ assert_eq!(config.gravitino.uri, "http://localhost:8090");
+ assert_eq!(config.gravitino.metalake, "test");
+ assert_eq!(
+ config.extend_config.get("access_key"),
+ Some(&"XXX_access_key".to_string())
+ );
+ assert_eq!(
+ config.extend_config.get("secret_key"),
+ Some(&"XXX_secret_key".to_string())
+ );
+ }
+
+ #[test]
+ fn test_default_config() {
+ let config = AppConfig::default();
+ assert_eq!(config.fuse.file_mask, 0o600);
+ assert_eq!(config.fuse.dir_mask, 0o700);
+ assert_eq!(config.filesystem.block_size, 4096);
+ assert_eq!(config.gravitino.uri, "http://localhost:8090");
+ assert_eq!(config.gravitino.metalake, "");
+ }
+}
diff --git a/clients/filesystem-fuse/src/default_raw_filesystem.rs
b/clients/filesystem-fuse/src/default_raw_filesystem.rs
index 0ab92e916..0c9836e5b 100644
--- a/clients/filesystem-fuse/src/default_raw_filesystem.rs
+++ b/clients/filesystem-fuse/src/default_raw_filesystem.rs
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+use crate::config::AppConfig;
use crate::filesystem::{
- FileStat, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID,
ROOT_DIR_FILE_ID,
- ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
+ FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result,
INITIAL_FILE_ID,
+ ROOT_DIR_FILE_ID, ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
};
use crate::opened_file::{FileHandle, OpenFileFlags};
use crate::opened_file_manager::OpenedFileManager;
@@ -47,7 +48,7 @@ pub struct DefaultRawFileSystem<T: PathFileSystem> {
}
impl<T: PathFileSystem> DefaultRawFileSystem<T> {
- pub(crate) fn new(fs: T) -> Self {
+ pub(crate) fn new(fs: T, _config: &AppConfig, _fs_context:
&FileSystemContext) -> Self {
Self {
file_entry_manager: RwLock::new(FileEntryManager::new()),
opened_file_manager: OpenedFileManager::new(),
@@ -189,8 +190,7 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
let file_entry = self.get_file_entry(file_id).await?;
let mut child_filestats = self.fs.read_dir(&file_entry.path).await?;
for file_stat in child_filestats.iter_mut() {
- self.resolve_file_id_to_filestat(file_stat, file_stat.file_id)
- .await;
+ self.resolve_file_id_to_filestat(file_stat, file_id).await;
}
Ok(child_filestats)
}
@@ -280,13 +280,7 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
file.close().await
}
- async fn read(
- &self,
- _file_id: u64,
- fh: u64,
- offset: u64,
- size: u32,
- ) -> crate::filesystem::Result<Bytes> {
+ async fn read(&self, _file_id: u64, fh: u64, offset: u64, size: u32) ->
Result<Bytes> {
let (data, file_stat) = {
let opened_file = self
.opened_file_manager
@@ -303,13 +297,7 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
data
}
- async fn write(
- &self,
- _file_id: u64,
- fh: u64,
- offset: u64,
- data: &[u8],
- ) -> crate::filesystem::Result<u32> {
+ async fn write(&self, _file_id: u64, fh: u64, offset: u64, data: &[u8]) ->
Result<u32> {
let (len, file_stat) = {
let opened_file = self
.opened_file_manager
@@ -405,7 +393,11 @@ mod tests {
#[tokio::test]
async fn test_default_raw_file_system() {
let memory_fs = MemoryFileSystem::new().await;
- let raw_fs = DefaultRawFileSystem::new(memory_fs);
+ let raw_fs = DefaultRawFileSystem::new(
+ memory_fs,
+ &AppConfig::default(),
+ &FileSystemContext::default(),
+ );
let _ = raw_fs.init().await;
let mut tester = TestRawFileSystem::new(raw_fs);
tester.test_raw_file_system().await;
diff --git a/clients/filesystem-fuse/src/error.rs
b/clients/filesystem-fuse/src/error.rs
new file mode 100644
index 000000000..ba3c037c5
--- /dev/null
+++ b/clients/filesystem-fuse/src/error.rs
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use fuse3::Errno;
+
+#[derive(Debug, Copy, Clone)]
+pub enum ErrorCode {
+ UnSupportedFilesystem,
+ GravitinoClientError,
+ InvalidConfig,
+ ConfigNotFound,
+}
+
+impl ErrorCode {
+ pub fn to_error(self, message: impl Into<String>) -> GvfsError {
+ GvfsError::Error(self, message.into())
+ }
+}
+
+impl std::fmt::Display for ErrorCode {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ match self {
+ ErrorCode::UnSupportedFilesystem => write!(f, "Unsupported
filesystem"),
+ ErrorCode::GravitinoClientError => write!(f, "Gravitino client
error"),
+ ErrorCode::InvalidConfig => write!(f, "Invalid config"),
+ ErrorCode::ConfigNotFound => write!(f, "Config not found"),
+ }
+ }
+}
+
+#[derive(Debug)]
+pub enum GvfsError {
+ RestError(String, reqwest::Error),
+ Error(ErrorCode, String),
+ Errno(Errno),
+ IOError(std::io::Error),
+}
+impl From<reqwest::Error> for GvfsError {
+ fn from(err: reqwest::Error) -> Self {
+ GvfsError::RestError("Http request failed:".to_owned() +
&err.to_string(), err)
+ }
+}
+
+impl From<Errno> for GvfsError {
+ fn from(errno: Errno) -> Self {
+ GvfsError::Errno(errno)
+ }
+}
+
+impl From<std::io::Error> for GvfsError {
+ fn from(err: std::io::Error) -> Self {
+ GvfsError::IOError(err)
+ }
+}
diff --git a/clients/filesystem-fuse/src/filesystem.rs
b/clients/filesystem-fuse/src/filesystem.rs
index d9440b0e6..742cdd4c8 100644
--- a/clients/filesystem-fuse/src/filesystem.rs
+++ b/clients/filesystem-fuse/src/filesystem.rs
@@ -16,6 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+use crate::config::{
+ AppConfig, CONF_FILESYSTEM_BLOCK_SIZE, CONF_FUSE_DIR_MASK,
CONF_FUSE_FILE_MASK,
+};
use crate::opened_file::{FileHandle, OpenFileFlags, OpenedFile};
use async_trait::async_trait;
use bytes::Bytes;
@@ -129,6 +132,8 @@ pub(crate) trait PathFileSystem: Send + Sync {
/// Remove the directory by file path
async fn remove_dir(&self, path: &Path) -> Result<()>;
+
+ fn get_capacity(&self) -> Result<FileSystemCapacity>;
}
// FileSystemContext is the system environment for the fuse file system.
@@ -150,17 +155,30 @@ pub(crate) struct FileSystemContext {
}
impl FileSystemContext {
- pub(crate) fn new(uid: u32, gid: u32) -> Self {
+ pub(crate) fn new(uid: u32, gid: u32, config: &AppConfig) -> Self {
FileSystemContext {
uid,
gid,
- default_file_perm: 0o644,
- default_dir_perm: 0o755,
- block_size: 4 * 1024,
+ default_file_perm: config.fuse.file_mask as u16,
+ default_dir_perm: config.fuse.dir_mask as u16,
+ block_size: config.filesystem.block_size,
+ }
+ }
+
+ pub(crate) fn default() -> Self {
+ FileSystemContext {
+ uid: 0,
+ gid: 0,
+ default_file_perm: CONF_FUSE_FILE_MASK.default as u16,
+ default_dir_perm: CONF_FUSE_DIR_MASK.default as u16,
+ block_size: CONF_FILESYSTEM_BLOCK_SIZE.default,
}
}
}
+// capacity of the file system
+pub struct FileSystemCapacity {}
+
// FileStat is the file metadata of the file
#[derive(Clone, Debug)]
pub struct FileStat {
@@ -336,7 +354,7 @@ pub(crate) mod tests {
let opened_file = self.fs.create_file(path,
OpenFileFlags(0)).await;
assert!(opened_file.is_ok());
let file = opened_file.unwrap();
- self.assert_file_stat(&file.file_stat, path,
FileType::RegularFile, 0);
+ self.assert_file_stat(&file.file_stat, path, RegularFile, 0);
self.test_stat_file(path, RegularFile, 0).await;
}
@@ -410,6 +428,9 @@ pub(crate) mod tests {
// Test root dir
self.test_root_dir().await;
+ // test read root dir
+ self.test_list_dir(ROOT_DIR_FILE_ID, false).await;
+
let parent_file_id = ROOT_DIR_FILE_ID;
// Test lookup file
let file_id = self
@@ -445,7 +466,7 @@ pub(crate) mod tests {
self.test_create_dir(parent_file_id, "dir1".as_ref()).await;
// Test list dir
- self.test_list_dir(parent_file_id).await;
+ self.test_list_dir(parent_file_id, true).await;
// Test remove file
self.test_remove_file(parent_file_id, "file1.txt".as_ref())
@@ -455,7 +476,7 @@ pub(crate) mod tests {
self.test_remove_dir(parent_file_id, "dir1".as_ref()).await;
// Test list dir again
- self.test_list_dir(parent_file_id).await;
+ self.test_list_dir(parent_file_id, true).await;
// Test file not found
self.test_file_not_found(23).await;
@@ -465,12 +486,7 @@ pub(crate) mod tests {
let root_file_stat = self.fs.stat(ROOT_DIR_FILE_ID).await;
assert!(root_file_stat.is_ok());
let root_file_stat = root_file_stat.unwrap();
- self.assert_file_stat(
- &root_file_stat,
- Path::new(ROOT_DIR_PATH),
- FileType::Directory,
- 0,
- );
+ self.assert_file_stat(&root_file_stat, Path::new(ROOT_DIR_PATH),
Directory, 0);
}
async fn test_lookup_file(
@@ -582,10 +598,14 @@ pub(crate) mod tests {
.await;
}
- async fn test_list_dir(&self, root_file_id: u64) {
+ async fn test_list_dir(&self, root_file_id: u64, check_child: bool) {
let list_dir = self.fs.read_dir(root_file_id).await;
assert!(list_dir.is_ok());
let list_dir = list_dir.unwrap();
+
+ if !check_child {
+ return;
+ }
assert_eq!(list_dir.len(), self.files.len());
for file_stat in list_dir {
assert!(self.files.contains_key(&file_stat.file_id));
@@ -650,28 +670,28 @@ pub(crate) mod tests {
assert_eq!(file_stat.name, "b");
assert_eq!(file_stat.path, Path::new("a/b"));
assert_eq!(file_stat.size, 10);
- assert_eq!(file_stat.kind, FileType::RegularFile);
+ assert_eq!(file_stat.kind, RegularFile);
//test new dir
let file_stat = FileStat::new_dir_filestat("a".as_ref(), "b".as_ref());
assert_eq!(file_stat.name, "b");
assert_eq!(file_stat.path, Path::new("a/b"));
assert_eq!(file_stat.size, 0);
- assert_eq!(file_stat.kind, FileType::Directory);
+ assert_eq!(file_stat.kind, Directory);
//test new file with path
let file_stat = FileStat::new_file_filestat_with_path("a/b".as_ref(),
10);
assert_eq!(file_stat.name, "b");
assert_eq!(file_stat.path, Path::new("a/b"));
assert_eq!(file_stat.size, 10);
- assert_eq!(file_stat.kind, FileType::RegularFile);
+ assert_eq!(file_stat.kind, RegularFile);
//test new dir with path
let file_stat = FileStat::new_dir_filestat_with_path("a/b".as_ref());
assert_eq!(file_stat.name, "b");
assert_eq!(file_stat.path, Path::new("a/b"));
assert_eq!(file_stat.size, 0);
- assert_eq!(file_stat.kind, FileType::Directory);
+ assert_eq!(file_stat.kind, Directory);
}
#[test]
diff --git a/clients/filesystem-fuse/src/fuse_api_handle.rs
b/clients/filesystem-fuse/src/fuse_api_handle.rs
index 1f24e94ee..153e32389 100644
--- a/clients/filesystem-fuse/src/fuse_api_handle.rs
+++ b/clients/filesystem-fuse/src/fuse_api_handle.rs
@@ -17,6 +17,7 @@
* under the License.
*/
+use crate::config::AppConfig;
use crate::filesystem::{FileStat, FileSystemContext, RawFileSystem};
use fuse3::path::prelude::{ReplyData, ReplyOpen, ReplyStatFs, ReplyWrite};
use fuse3::path::Request;
@@ -44,7 +45,7 @@ impl<T: RawFileSystem> FuseApiHandle<T> {
const DEFAULT_ATTR_TTL: Duration = Duration::from_secs(1);
const DEFAULT_MAX_WRITE_SIZE: u32 = 16 * 1024;
- pub fn new(fs: T, context: FileSystemContext) -> Self {
+ pub fn new(fs: T, _config: &AppConfig, context: FileSystemContext) -> Self
{
Self {
fs: fs,
default_ttl: Self::DEFAULT_ATTR_TTL,
diff --git a/clients/filesystem-fuse/src/fuse_server.rs
b/clients/filesystem-fuse/src/fuse_server.rs
index dae7c28a6..a059686e1 100644
--- a/clients/filesystem-fuse/src/fuse_server.rs
+++ b/clients/filesystem-fuse/src/fuse_server.rs
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+use crate::utils::GvfsResult;
use fuse3::raw::{Filesystem, Session};
-use fuse3::{MountOptions, Result};
+use fuse3::MountOptions;
use log::{error, info};
use std::process::exit;
use std::sync::Arc;
@@ -43,7 +44,7 @@ impl FuseServer {
}
/// Starts the FUSE filesystem and blocks until it is stopped.
- pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) ->
Result<()> {
+ pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) ->
GvfsResult<()> {
//check if the mount point exists
if !std::path::Path::new(&self.mount_point).exists() {
error!("Mount point {} does not exist", self.mount_point);
@@ -83,11 +84,12 @@ impl FuseServer {
}
/// Stops the FUSE filesystem.
- pub async fn stop(&self) {
+ pub async fn stop(&self) -> GvfsResult<()> {
info!("Stopping FUSE filesystem...");
self.close_notify.notify_one();
// wait for the filesystem to stop
self.close_notify.notified().await;
+ Ok(())
}
}
diff --git a/clients/filesystem-fuse/src/gravitino_client.rs
b/clients/filesystem-fuse/src/gravitino_client.rs
new file mode 100644
index 000000000..e5553c9f6
--- /dev/null
+++ b/clients/filesystem-fuse/src/gravitino_client.rs
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::GravitinoConfig;
+use crate::error::{ErrorCode, GvfsError};
+use reqwest::Client;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use urlencoding::encode;
+
+#[derive(Debug, Deserialize)]
+pub(crate) struct Fileset {
+ pub(crate) name: String,
+ #[serde(rename = "type")]
+ pub(crate) fileset_type: String,
+ comment: String,
+ #[serde(rename = "storageLocation")]
+ pub(crate) storage_location: String,
+ properties: HashMap<String, String>,
+}
+
+#[derive(Debug, Deserialize)]
+struct FilesetResponse {
+ code: u32,
+ fileset: Fileset,
+}
+
+#[derive(Debug, Deserialize)]
+struct FileLocationResponse {
+ code: u32,
+ #[serde(rename = "fileLocation")]
+ location: String,
+}
+
+pub(crate) struct GravitinoClient {
+ gravitino_uri: String,
+ metalake: String,
+
+ client: Client,
+}
+
+impl GravitinoClient {
+ pub fn new(config: &GravitinoConfig) -> Self {
+ Self {
+ gravitino_uri: config.uri.clone(),
+ metalake: config.metalake.clone(),
+ client: Client::new(),
+ }
+ }
+
+ pub fn init(&self) {}
+
+ pub fn do_post(&self, _path: &str, _data: &str) {
+ todo!()
+ }
+
+ pub fn request(&self, _path: &str, _data: &str) -> Result<(), GvfsError> {
+ todo!()
+ }
+
+ pub fn list_schema(&self) -> Result<(), GvfsError> {
+ todo!()
+ }
+
+ pub fn list_fileset(&self) -> Result<(), GvfsError> {
+ todo!()
+ }
+
+ fn get_fileset_url(&self, catalog_name: &str, schema_name: &str,
fileset_name: &str) -> String {
+ format!(
+ "{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}",
+ self.gravitino_uri, self.metalake, catalog_name, schema_name,
fileset_name
+ )
+ }
+
+ async fn do_get<T>(&self, url: &str) -> Result<T, GvfsError>
+ where
+ T: for<'de> Deserialize<'de>,
+ {
+ let http_resp =
+ self.client.get(url).send().await.map_err(|e| {
+ GvfsError::RestError(format!("Failed to send request to {}",
url), e)
+ })?;
+
+ let res = http_resp.json::<T>().await.map_err(|e| {
+ GvfsError::RestError(format!("Failed to parse response from {}",
url), e)
+ })?;
+
+ Ok(res)
+ }
+
+ pub async fn get_fileset(
+ &self,
+ catalog_name: &str,
+ schema_name: &str,
+ fileset_name: &str,
+ ) -> Result<Fileset, GvfsError> {
+ let url = self.get_fileset_url(catalog_name, schema_name,
fileset_name);
+ let res = self.do_get::<FilesetResponse>(&url).await?;
+
+ if res.code != 0 {
+ return Err(GvfsError::Error(
+ ErrorCode::GravitinoClientError,
+ "Failed to get fileset".to_string(),
+ ));
+ }
+ Ok(res.fileset)
+ }
+
+ pub fn get_file_location_url(
+ &self,
+ catalog_name: &str,
+ schema_name: &str,
+ fileset_name: &str,
+ path: &str,
+ ) -> String {
+ let encoded_path = encode(path);
+ format!(
+
"{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}",
+ self.gravitino_uri,
+ self.metalake,
+ catalog_name,
+ schema_name,
+ fileset_name,
+ encoded_path
+ )
+ }
+
+ pub async fn get_file_location(
+ &self,
+ catalog_name: &str,
+ schema_name: &str,
+ fileset_name: &str,
+ path: &str,
+ ) -> Result<String, GvfsError> {
+ let url = self.get_file_location_url(catalog_name, schema_name,
fileset_name, path);
+ let res = self.do_get::<FileLocationResponse>(&url).await?;
+
+ if res.code != 0 {
+ return Err(GvfsError::Error(
+ ErrorCode::GravitinoClientError,
+ "Failed to get file location".to_string(),
+ ));
+ }
+ Ok(res.location)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use mockito::mock;
+
+ #[tokio::test]
+ async fn test_get_fileset_success() {
+ let fileset_response = r#"
+ {
+ "code": 0,
+ "fileset": {
+ "name": "example_fileset",
+ "type": "example_type",
+ "comment": "This is a test fileset",
+ "storageLocation": "/example/path",
+ "properties": {
+ "key1": "value1",
+ "key2": "value2"
+ }
+ }
+ }"#;
+
+ let mock_server_url = &mockito::server_url();
+
+ let url = format!(
+ "/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}",
+ "test", "catalog1", "schema1", "fileset1"
+ );
+ let _m = mock("GET", url.as_str())
+ .with_status(200)
+ .with_header("content-type", "application/json")
+ .with_body(fileset_response)
+ .create();
+
+ let config = GravitinoConfig {
+ uri: mock_server_url.to_string(),
+ metalake: "test".to_string(),
+ };
+ let client = GravitinoClient::new(&config);
+
+ let result = client.get_fileset("catalog1", "schema1",
"fileset1").await;
+
+ match result {
+ Ok(fileset) => {
+ assert_eq!(fileset.name, "example_fileset");
+ assert_eq!(fileset.fileset_type, "example_type");
+ assert_eq!(fileset.storage_location, "/example/path");
+ assert_eq!(fileset.properties.get("key1"),
Some(&"value1".to_string()));
+ }
+ Err(e) => panic!("Expected Ok, but got Err: {:?}", e),
+ }
+ }
+
+ #[tokio::test]
+ async fn test_get_file_location_success() {
+ let file_location_response = r#"
+ {
+ "code": 0,
+ "fileLocation": "/mybucket/a"
+ }"#;
+
+ let mock_server_url = &mockito::server_url();
+
+ let url = format!(
+
"/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}",
+ "test",
+ "catalog1",
+ "schema1",
+ "fileset1",
+ encode("/example/path")
+ );
+ let _m = mock("GET", url.as_str())
+ .with_status(200)
+ .with_header("content-type", "application/json")
+ .with_body(file_location_response)
+ .create();
+
+ let config = GravitinoConfig {
+ uri: mock_server_url.to_string(),
+ metalake: "test".to_string(),
+ };
+ let client = GravitinoClient::new(&config);
+
+ let result = client
+ .get_file_location("catalog1", "schema1", "fileset1",
"/example/path")
+ .await;
+
+ match result {
+ Ok(location) => {
+ assert_eq!(location, "/mybucket/a");
+ }
+ Err(e) => panic!("Expected Ok, but got Err: {:?}", e),
+ }
+ }
+
+ async fn get_fileset_example() {
+ tracing_subscriber::fmt::init();
+ let config = GravitinoConfig {
+ uri: "http://localhost:8090".to_string(),
+ metalake: "test".to_string(),
+ };
+ let client = GravitinoClient::new(&config);
+ client.init();
+ let result = client.get_fileset("c1", "s1", "fileset1").await;
+ if let Err(e) = &result {
+ println!("{:?}", e);
+ }
+
+ let fileset = result.unwrap();
+ println!("{:?}", fileset);
+ assert_eq!(fileset.name, "fileset1");
+ }
+}
diff --git a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
new file mode 100644
index 000000000..98a295dbb
--- /dev/null
+++ b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::AppConfig;
+use crate::filesystem::{FileStat, FileSystemCapacity, FileSystemContext,
PathFileSystem, Result};
+use crate::gravitino_client::GravitinoClient;
+use crate::opened_file::{OpenFileFlags, OpenedFile};
+use async_trait::async_trait;
+use fuse3::Errno;
+use std::path::{Path, PathBuf};
+
+/// GravitinoFileSystem is a filesystem that is associated with a fileset in
Gravitino.
+/// It mapping the fileset path to the original data storage path. and
delegate the operation
+/// to the inner filesystem like S3 GCS, JuiceFS.
+pub(crate) struct GravitinoFilesetFileSystem {
+ physical_fs: Box<dyn PathFileSystem>,
+ client: GravitinoClient,
+ fileset_location: PathBuf,
+}
+
+impl GravitinoFilesetFileSystem {
+ pub async fn new(
+ fs: Box<dyn PathFileSystem>,
+ location: &Path,
+ client: GravitinoClient,
+ _config: &AppConfig,
+ _context: &FileSystemContext,
+ ) -> Self {
+ Self {
+ physical_fs: fs,
+ client: client,
+ fileset_location: location.into(),
+ }
+ }
+
+ fn gvfs_path_to_raw_path(&self, path: &Path) -> PathBuf {
+ self.fileset_location.join(path)
+ }
+
+ fn raw_path_to_gvfs_path(&self, path: &Path) -> Result<PathBuf> {
+ path.strip_prefix(&self.fileset_location)
+ .map_err(|_| Errno::from(libc::EBADF))?;
+ Ok(path.into())
+ }
+}
+
+#[async_trait]
+impl PathFileSystem for GravitinoFilesetFileSystem {
+ async fn init(&self) -> Result<()> {
+ self.physical_fs.init().await
+ }
+
+ async fn stat(&self, path: &Path) -> Result<FileStat> {
+ let raw_path = self.gvfs_path_to_raw_path(path);
+ let mut file_stat = self.physical_fs.stat(&raw_path).await?;
+ file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?;
+ Ok(file_stat)
+ }
+
+ async fn read_dir(&self, path: &Path) -> Result<Vec<FileStat>> {
+ let raw_path = self.gvfs_path_to_raw_path(path);
+ let mut child_filestats = self.physical_fs.read_dir(&raw_path).await?;
+ for file_stat in child_filestats.iter_mut() {
+ file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?;
+ }
+ Ok(child_filestats)
+ }
+
+ async fn open_file(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
+ let raw_path = self.gvfs_path_to_raw_path(path);
+ let mut opened_file = self.physical_fs.open_file(&raw_path,
flags).await?;
+ opened_file.file_stat.path =
self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?;
+ Ok(opened_file)
+ }
+
+ async fn open_dir(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
+ let raw_path = self.gvfs_path_to_raw_path(path);
+ let mut opened_file = self.physical_fs.open_dir(&raw_path,
flags).await?;
+ opened_file.file_stat.path =
self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?;
+ Ok(opened_file)
+ }
+
+ async fn create_file(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
+ let raw_path = self.gvfs_path_to_raw_path(path);
+ let mut opened_file = self.physical_fs.create_file(&raw_path,
flags).await?;
+ opened_file.file_stat.path =
self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?;
+ Ok(opened_file)
+ }
+
+ async fn create_dir(&self, path: &Path) -> Result<FileStat> {
+ let raw_path = self.gvfs_path_to_raw_path(path);
+ let mut file_stat = self.physical_fs.create_dir(&raw_path).await?;
+ file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?;
+ Ok(file_stat)
+ }
+
+ async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool)
-> Result<()> {
+ let raw_path = self.gvfs_path_to_raw_path(path);
+ self.physical_fs.set_attr(&raw_path, file_stat, flush).await
+ }
+
+ async fn remove_file(&self, path: &Path) -> Result<()> {
+ let raw_path = self.gvfs_path_to_raw_path(path);
+ self.physical_fs.remove_file(&raw_path).await
+ }
+
+ async fn remove_dir(&self, path: &Path) -> Result<()> {
+ let raw_path = self.gvfs_path_to_raw_path(path);
+ self.physical_fs.remove_dir(&raw_path).await
+ }
+
+ fn get_capacity(&self) -> Result<FileSystemCapacity> {
+ self.physical_fs.get_capacity()
+ }
+}
diff --git a/clients/filesystem-fuse/src/gvfs_fuse.rs
b/clients/filesystem-fuse/src/gvfs_fuse.rs
new file mode 100644
index 000000000..d472895d2
--- /dev/null
+++ b/clients/filesystem-fuse/src/gvfs_fuse.rs
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::AppConfig;
+use crate::default_raw_filesystem::DefaultRawFileSystem;
+use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem};
+use crate::filesystem::FileSystemContext;
+use crate::fuse_api_handle::FuseApiHandle;
+use crate::fuse_server::FuseServer;
+use crate::gravitino_client::GravitinoClient;
+use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
+use crate::memory_filesystem::MemoryFileSystem;
+use crate::utils::GvfsResult;
+use log::info;
+use once_cell::sync::Lazy;
+use std::path::Path;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
+const FILESET_PREFIX: &str = "gvfs://fileset/";
+
+static SERVER: Lazy<Mutex<Option<Arc<FuseServer>>>> = Lazy::new(||
Mutex::new(None));
+
+pub(crate) enum CreateFileSystemResult {
+ Memory(MemoryFileSystem),
+ Gvfs(GravitinoFilesetFileSystem),
+ FuseMemoryFs(FuseApiHandle<DefaultRawFileSystem<MemoryFileSystem>>),
+ FuseGvfs(FuseApiHandle<DefaultRawFileSystem<GravitinoFilesetFileSystem>>),
+ None,
+}
+
+pub enum FileSystemSchema {
+ S3,
+}
+
+pub async fn mount(mount_to: &str, mount_from: &str, config: &AppConfig) ->
GvfsResult<()> {
+ info!("Starting gvfs-fuse server...");
+ let svr = Arc::new(FuseServer::new(mount_to));
+ {
+ let mut server = SERVER.lock().await;
+ *server = Some(svr.clone());
+ }
+ let fs = create_fuse_fs(mount_from, config).await?;
+ match fs {
+ CreateFileSystemResult::FuseMemoryFs(vfs) => svr.start(vfs).await?,
+ CreateFileSystemResult::FuseGvfs(vfs) => svr.start(vfs).await?,
+ _ => return Err(UnSupportedFilesystem.to_error("Unsupported filesystem
type".to_string())),
+ }
+ Ok(())
+}
+
+pub async fn unmount() -> GvfsResult<()> {
+ info!("Stop gvfs-fuse server...");
+ let svr = {
+ let mut server = SERVER.lock().await;
+ if server.is_none() {
+ info!("Server is already stopped.");
+ return Ok(());
+ }
+ server.take().unwrap()
+ };
+ svr.stop().await
+}
+
+pub(crate) async fn create_fuse_fs(
+ mount_from: &str,
+ config: &AppConfig,
+) -> GvfsResult<CreateFileSystemResult> {
+ let uid = unsafe { libc::getuid() };
+ let gid = unsafe { libc::getgid() };
+ let fs_context = FileSystemContext::new(uid, gid, config);
+ let fs = create_path_fs(mount_from, config, &fs_context).await?;
+ create_raw_fs(fs, config, fs_context).await
+}
+
+pub async fn create_raw_fs(
+ path_fs: CreateFileSystemResult,
+ config: &AppConfig,
+ fs_context: FileSystemContext,
+) -> GvfsResult<CreateFileSystemResult> {
+ match path_fs {
+ CreateFileSystemResult::Memory(fs) => {
+ let fs = FuseApiHandle::new(
+ DefaultRawFileSystem::new(fs, config, &fs_context),
+ config,
+ fs_context,
+ );
+ Ok(CreateFileSystemResult::FuseMemoryFs(fs))
+ }
+ CreateFileSystemResult::Gvfs(fs) => {
+ let fs = FuseApiHandle::new(
+ DefaultRawFileSystem::new(fs, config, &fs_context),
+ config,
+ fs_context,
+ );
+ Ok(CreateFileSystemResult::FuseGvfs(fs))
+ }
+ _ => Err(UnSupportedFilesystem.to_error("Unsupported filesystem
type".to_string())),
+ }
+}
+
+pub async fn create_path_fs(
+ mount_from: &str,
+ config: &AppConfig,
+ fs_context: &FileSystemContext,
+) -> GvfsResult<CreateFileSystemResult> {
+ if config.fuse.fs_type == "memory" {
+ Ok(CreateFileSystemResult::Memory(
+ MemoryFileSystem::new().await,
+ ))
+ } else {
+ create_gvfs_filesystem(mount_from, config, fs_context).await
+ }
+}
+
+pub async fn create_gvfs_filesystem(
+ mount_from: &str,
+ config: &AppConfig,
+ fs_context: &FileSystemContext,
+) -> GvfsResult<CreateFileSystemResult> {
+ // Gvfs-fuse filesystem structure:
+ // FuseApiHandle
+ // ├─ DefaultRawFileSystem (RawFileSystem)
+ // │ └─ FileSystemLog (PathFileSystem)
+ // │ ├─ GravitinoComposedFileSystem (PathFileSystem)
+ // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
+ // │ │ │ └─ S3FileSystem (PathFileSystem)
+ // │ │ │ └─ OpenDALFileSystem (PathFileSystem)
+ // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
+ // │ │ │ └─ HDFSFileSystem (PathFileSystem)
+ // │ │ │ └─ OpenDALFileSystem (PathFileSystem)
+ // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
+ // │ │ │ └─ JuiceFileSystem (PathFileSystem)
+ // │ │ │ └─ NasFileSystem (PathFileSystem)
+ // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
+ // │ │ │ └─ XXXFileSystem (PathFileSystem)
+ //
+ // `SimpleFileSystem` is a low-level filesystem designed to communicate
with FUSE APIs.
+ // It manages file and directory relationships, as well as file mappings.
+ // It delegates file operations to the PathFileSystem
+ //
+ // `FileSystemLog` is a decorator that adds extra debug logging
functionality to file system APIs.
+ // Similar implementations include permissions, caching, and metrics.
+ //
+ // `GravitinoComposeFileSystem` is a composite file system that can
combine multiple `GravitinoFilesetFileSystem`.
+ // It use the part of catalog and schema of fileset path to a find actual
GravitinoFilesetFileSystem. delegate the operation to the real storage.
+ // If the user only mounts a fileset, this layer is not present. There
will only be one below layer.
+ //
+ // `GravitinoFilesetFileSystem` is a file system that can access a
fileset.It translates the fileset path to the real storage path.
+ // and delegate the operation to the real storage.
+ //
+ // `OpenDALFileSystem` is a file system that use the OpenDAL to access
real storage.
+ // it can assess the S3, HDFS, gcs, azblob and other storage.
+ //
+ // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access
S3 storage.
+ //
+ // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to
access HDFS storage.
+ //
+ // `NasFileSystem` is a filesystem that uses a locally accessible path
mounted by NAS tools, such as JuiceFS.
+ //
+ // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS
storage.
+ //
+ // `XXXFileSystem is a filesystem that allows you to implement file access
through your own extensions.
+
+ let client = GravitinoClient::new(&config.gravitino);
+
+ let (catalog, schema, fileset) = extract_fileset(mount_from)?;
+ let location = client
+ .get_fileset(&catalog, &schema, &fileset)
+ .await?
+ .storage_location;
+ let (_schema, location) = extract_storage_filesystem(&location).unwrap();
+
+ // todo need to replace the inner filesystem with the real storage
filesystem
+ let inner_fs = MemoryFileSystem::new().await;
+
+ let fs = GravitinoFilesetFileSystem::new(
+ Box::new(inner_fs),
+ Path::new(&location),
+ client,
+ config,
+ fs_context,
+ )
+ .await;
+ Ok(CreateFileSystemResult::Gvfs(fs))
+}
+
+pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> {
+ if !path.starts_with(FILESET_PREFIX) {
+ return Err(InvalidConfig.to_error("Invalid fileset path".to_string()));
+ }
+
+ let path_without_prefix = &path[FILESET_PREFIX.len()..];
+
+ let parts: Vec<&str> = path_without_prefix.split('/').collect();
+
+ if parts.len() != 3 {
+ return Err(InvalidConfig.to_error("Invalid fileset path".to_string()));
+ }
+ // todo handle mount catalog or schema
+
+ let catalog = parts[1].to_string();
+ let schema = parts[2].to_string();
+ let fileset = parts[3].to_string();
+
+ Ok((catalog, schema, fileset))
+}
+
+pub fn extract_storage_filesystem(path: &str) -> Option<(FileSystemSchema,
String)> {
+ // todo need to improve the logic
+ if let Some(pos) = path.find("://") {
+ let protocol = &path[..pos];
+ let location = &path[pos + 3..];
+ let location = match location.find('/') {
+ Some(index) => &location[index + 1..],
+ None => "",
+ };
+ let location = match location.ends_with('/') {
+ true => location.to_string(),
+ false => format!("{}/", location),
+ };
+
+ match protocol {
+ "s3" => Some((FileSystemSchema::S3, location.to_string())),
+ "s3a" => Some((FileSystemSchema::S3, location.to_string())),
+ _ => None,
+ }
+ } else {
+ None
+ }
+}
diff --git a/clients/filesystem-fuse/src/lib.rs
b/clients/filesystem-fuse/src/lib.rs
index 36e8c28d3..5532d619e 100644
--- a/clients/filesystem-fuse/src/lib.rs
+++ b/clients/filesystem-fuse/src/lib.rs
@@ -16,20 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
+use crate::config::AppConfig;
+use crate::utils::GvfsResult;
+
+pub mod config;
mod default_raw_filesystem;
+mod error;
mod filesystem;
mod fuse_api_handle;
mod fuse_server;
+mod gravitino_client;
+mod gravitino_fileset_filesystem;
+mod gvfs_fuse;
mod memory_filesystem;
-mod mount;
mod opened_file;
mod opened_file_manager;
mod utils;
-pub async fn gvfs_mount(mount_point: &str) -> fuse3::Result<()> {
- mount::mount(mount_point).await
+pub async fn gvfs_mount(mount_to: &str, mount_from: &str, config: &AppConfig)
-> GvfsResult<()> {
+ gvfs_fuse::mount(mount_to, mount_from, config).await
}
-pub async fn gvfs_unmount() {
- mount::unmount().await;
+pub async fn gvfs_unmount() -> GvfsResult<()> {
+ gvfs_fuse::unmount().await
}
diff --git a/clients/filesystem-fuse/src/main.rs
b/clients/filesystem-fuse/src/main.rs
index 28866a9bb..8eab5ec0d 100644
--- a/clients/filesystem-fuse/src/main.rs
+++ b/clients/filesystem-fuse/src/main.rs
@@ -16,18 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
+use fuse3::Errno;
+use gvfs_fuse::config::AppConfig;
use gvfs_fuse::{gvfs_mount, gvfs_unmount};
-use log::info;
+use log::{error, info};
use tokio::signal;
#[tokio::main]
async fn main() -> fuse3::Result<()> {
tracing_subscriber::fmt().init();
- tokio::spawn(async { gvfs_mount("gvfs").await });
+
+ //todo(read config file from args)
+ let config = AppConfig::from_file(Some("conf/gvfs_fuse.toml"));
+ if let Err(e) = &config {
+ error!("Failed to load config: {:?}", e);
+ return Err(Errno::from(libc::EINVAL));
+ }
+ let config = config.unwrap();
+ let handle = tokio::spawn(async move { gvfs_mount("gvfs", "",
&config).await });
let _ = signal::ctrl_c().await;
info!("Received Ctrl+C, Unmounting gvfs...");
- gvfs_unmount().await;
+ if let Err(e) = handle.await {
+ error!("Failed to mount gvfs: {:?}", e);
+ return Err(Errno::from(libc::EINVAL));
+ }
+
+ let _ = gvfs_unmount().await;
Ok(())
}
diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs
b/clients/filesystem-fuse/src/memory_filesystem.rs
index ca3f13fd9..b94d16b8d 100644
--- a/clients/filesystem-fuse/src/memory_filesystem.rs
+++ b/clients/filesystem-fuse/src/memory_filesystem.rs
@@ -16,7 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-use crate::filesystem::{FileReader, FileStat, FileWriter, PathFileSystem,
Result};
+use crate::filesystem::{
+ FileReader, FileStat, FileSystemCapacity, FileWriter, PathFileSystem,
Result,
+};
use crate::opened_file::{OpenFileFlags, OpenedFile};
use async_trait::async_trait;
use bytes::Bytes;
@@ -193,6 +195,10 @@ impl PathFileSystem for MemoryFileSystem {
}
Ok(())
}
+
+ fn get_capacity(&self) -> Result<FileSystemCapacity> {
+ Ok(FileSystemCapacity {})
+ }
}
pub(crate) struct MemoryFileReader {
diff --git a/clients/filesystem-fuse/src/mount.rs
b/clients/filesystem-fuse/src/mount.rs
deleted file mode 100644
index 102e24016..000000000
--- a/clients/filesystem-fuse/src/mount.rs
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-use crate::default_raw_filesystem::DefaultRawFileSystem;
-use crate::filesystem::FileSystemContext;
-use crate::fuse_api_handle::FuseApiHandle;
-use crate::fuse_server::FuseServer;
-use crate::memory_filesystem::MemoryFileSystem;
-use fuse3::raw::Filesystem;
-use log::info;
-use once_cell::sync::Lazy;
-use std::sync::Arc;
-use tokio::sync::Mutex;
-
-static SERVER: Lazy<Mutex<Option<Arc<FuseServer>>>> = Lazy::new(||
Mutex::new(None));
-
-pub async fn mount(mount_point: &str) -> fuse3::Result<()> {
- info!("Starting gvfs-fuse server...");
- let svr = Arc::new(FuseServer::new(mount_point));
- {
- let mut server = SERVER.lock().await;
- *server = Some(svr.clone());
- }
- let fs = create_fuse_fs().await;
- svr.start(fs).await
-}
-
-pub async fn unmount() {
- info!("Stop gvfs-fuse server...");
- let svr = {
- let mut server = SERVER.lock().await;
- if server.is_none() {
- info!("Server is already stopped.");
- return;
- }
- server.take().unwrap()
- };
- let _ = svr.stop().await;
-}
-
-pub async fn create_fuse_fs() -> impl Filesystem + Sync + 'static {
- let uid = unsafe { libc::getuid() };
- let gid = unsafe { libc::getgid() };
- let fs_context = FileSystemContext {
- uid: uid,
- gid: gid,
- default_file_perm: 0o644,
- default_dir_perm: 0o755,
- block_size: 4 * 1024,
- };
-
- let gvfs = MemoryFileSystem::new().await;
- let fs = DefaultRawFileSystem::new(gvfs);
- FuseApiHandle::new(fs, fs_context)
-}
-
-pub async fn create_gvfs_filesystem() {
- // Gvfs-fuse filesystem structure:
- // FuseApiHandle
- // ├─ DefaultRawFileSystem (RawFileSystem)
- // │ └─ FileSystemLog (PathFileSystem)
- // │ ├─ GravitinoComposedFileSystem (PathFileSystem)
- // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
- // │ │ │ └─ S3FileSystem (PathFileSystem)
- // │ │ │ └─ OpenDALFileSystem (PathFileSystem)
- // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
- // │ │ │ └─ HDFSFileSystem (PathFileSystem)
- // │ │ │ └─ OpenDALFileSystem (PathFileSystem)
- // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
- // │ │ │ └─ JuiceFileSystem (PathFileSystem)
- // │ │ │ └─ NasFileSystem (PathFileSystem)
- // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
- // │ │ │ └─ XXXFileSystem (PathFileSystem)
- //
- // `SimpleFileSystem` is a low-level filesystem designed to communicate
with FUSE APIs.
- // It manages file and directory relationships, as well as file mappings.
- // It delegates file operations to the PathFileSystem
- //
- // `FileSystemLog` is a decorator that adds extra debug logging
functionality to file system APIs.
- // Similar implementations include permissions, caching, and metrics.
- //
- // `GravitinoComposeFileSystem` is a composite file system that can
combine multiple `GravitinoFilesetFileSystem`.
- // It use the part of catalog and schema of fileset path to a find actual
GravitinoFilesetFileSystem. delegate the operation to the real storage.
- // If the user only mounts a fileset, this layer is not present. There
will only be one below layer.
- //
- // `GravitinoFilesetFileSystem` is a file system that can access a
fileset.It translates the fileset path to the real storage path.
- // and delegate the operation to the real storage.
- //
- // `OpenDALFileSystem` is a file system that use the OpenDAL to access
real storage.
- // it can assess the S3, HDFS, gcs, azblob and other storage.
- //
- // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access
S3 storage.
- //
- // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to
access HDFS storage.
- //
- // `NasFileSystem` is a filesystem that uses a locally accessible path
mounted by NAS tools, such as JuiceFS.
- //
- // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS
storage.
- //
- // `XXXFileSystem is a filesystem that allows you to implement file access
through your own extensions.
-
- todo!("Implement the createGvfsFuseFileSystem function");
-}
diff --git a/clients/filesystem-fuse/src/utils.rs
b/clients/filesystem-fuse/src/utils.rs
index 21e52f86a..bbc8d7d7f 100644
--- a/clients/filesystem-fuse/src/utils.rs
+++ b/clients/filesystem-fuse/src/utils.rs
@@ -16,6 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+use crate::error::GvfsError;
+
+pub type GvfsResult<T> = Result<T, GvfsError>;
#[cfg(test)]
mod tests {}
diff --git a/clients/filesystem-fuse/Cargo.toml
b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
similarity index 54%
copy from clients/filesystem-fuse/Cargo.toml
copy to clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
index 75a4dd713..013df6cfc 100644
--- a/clients/filesystem-fuse/Cargo.toml
+++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
@@ -15,31 +15,26 @@
# specific language governing permissions and limitations
# under the License.
-[package]
-name = "filesystem-fuse"
-version = "0.8.0-incubating-SNAPSHOT"
-rust-version = "1.75"
-edition = "2021"
+# fuse settings
+[fuse]
+file_mask= 0o600
+dir_mask= 0o700
+fs_type = "memory"
-homepage = "https://gravitino.apache.org"
-license = "Apache-2.0"
-repository = "https://github.com/apache/gravitino"
+[fuse.properties]
+key1 = "value1"
+key2 = "value2"
-[[bin]]
-name = "gvfs-fuse"
-path = "src/main.rs"
+# filesystem settings
+[filesystem]
+block_size = 8192
-[lib]
-name = "gvfs_fuse"
+# Gravitino settings
+[gravitino]
+uri = "http://localhost:8090"
+metalake = "test"
-[dependencies]
-async-trait = "0.1"
-bytes = "1.6.0"
-dashmap = "6.1.0"
-fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] }
-futures-util = "0.3.30"
-libc = "0.2.168"
-log = "0.4.22"
-once_cell = "1.20.2"
-tokio = { version = "1.38.0", features = ["full"] }
-tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+# extent settings
+[extent_config]
+access_key = "XXX_access_key"
+secret_key = "XXX_secret_key"
diff --git a/clients/filesystem-fuse/Cargo.toml
b/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
similarity index 54%
copy from clients/filesystem-fuse/Cargo.toml
copy to clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
index 75a4dd713..ff7c6936f 100644
--- a/clients/filesystem-fuse/Cargo.toml
+++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
@@ -15,31 +15,26 @@
# specific language governing permissions and limitations
# under the License.
-[package]
-name = "filesystem-fuse"
-version = "0.8.0-incubating-SNAPSHOT"
-rust-version = "1.75"
-edition = "2021"
+# fuse settings
+[fuse]
+file_mask= 0o644
+dir_mask= 0o755
+fs_type = "memory"
-homepage = "https://gravitino.apache.org"
-license = "Apache-2.0"
-repository = "https://github.com/apache/gravitino"
+[fuse.properties]
+key1 = "value1"
+key2 = "value2"
-[[bin]]
-name = "gvfs-fuse"
-path = "src/main.rs"
+# filesystem settings
+[filesystem]
+block_size = 8192
-[lib]
-name = "gvfs_fuse"
+# Gravitino settings
+[gravitino]
+uri = "http://localhost:8090"
+metalake = "test"
-[dependencies]
-async-trait = "0.1"
-bytes = "1.6.0"
-dashmap = "6.1.0"
-fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] }
-futures-util = "0.3.30"
-libc = "0.2.168"
-log = "0.4.22"
-once_cell = "1.20.2"
-tokio = { version = "1.38.0", features = ["full"] }
-tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+# extent settings
+[extend_config]
+access_key = "XXX_access_key"
+secret_key = "XXX_secret_key"
diff --git a/clients/filesystem-fuse/tests/fuse_test.rs
b/clients/filesystem-fuse/tests/fuse_test.rs
index 23aafbaf6..e761fabc5 100644
--- a/clients/filesystem-fuse/tests/fuse_test.rs
+++ b/clients/filesystem-fuse/tests/fuse_test.rs
@@ -17,6 +17,7 @@
* under the License.
*/
+use gvfs_fuse::config::AppConfig;
use gvfs_fuse::{gvfs_mount, gvfs_unmount};
use log::info;
use std::fs;
@@ -38,15 +39,18 @@ impl FuseTest {
pub fn setup(&mut self) {
info!("Start gvfs fuse server");
let mount_point = self.mount_point.clone();
+
+ let config =
AppConfig::from_file(Some("tests/conf/gvfs_fuse_memory.toml"))
+ .expect("Failed to load config");
self.runtime
- .spawn(async move { gvfs_mount(&mount_point).await });
+ .spawn(async move { gvfs_mount(&mount_point, "", &config).await });
let success = Self::wait_for_fuse_server_ready(&self.mount_point,
Duration::from_secs(15));
assert!(success, "Fuse server cannot start up at 15 seconds");
}
pub fn shutdown(&mut self) {
self.runtime.block_on(async {
- gvfs_unmount().await;
+ let _ = gvfs_unmount().await;
});
}
@@ -76,7 +80,7 @@ impl Drop for FuseTest {
fn test_fuse_system_with_auto() {
tracing_subscriber::fmt().init();
- let mount_point = "build/gvfs";
+ let mount_point = "target/gvfs";
let _ = fs::create_dir_all(mount_point);
let mut test = FuseTest {