This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch branch-gvfs-fuse-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-gvfs-fuse-dev by this 
push:
     new 8dcc7b887 [#5982] feat (gvfs-fuse): Implement Gravitino fileset file 
system (#5984)
8dcc7b887 is described below

commit 8dcc7b88718043a7d2e9298fcc0e3e32c3579675
Author: Yuhui <[email protected]>
AuthorDate: Tue Dec 31 10:15:39 2024 +0800

    [#5982] feat (gvfs-fuse): Implement Gravitino fileset file system (#5984)
    
    ### What changes were proposed in this pull request?
    
    Implement an Gravitino fileset file system, Support mount fileset to
    local directory
    
    ### Why are the changes needed?
    
    Fix: #5982
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT and IT
---
 clients/filesystem-fuse/Cargo.toml                 |   7 +
 .../{Cargo.toml => conf/gvfs_fuse.toml}            |  41 ++-
 clients/filesystem-fuse/src/config.rs              | 330 +++++++++++++++++++++
 .../filesystem-fuse/src/default_raw_filesystem.rs  |  32 +-
 clients/filesystem-fuse/src/error.rs               |  69 +++++
 clients/filesystem-fuse/src/filesystem.rs          |  56 ++--
 clients/filesystem-fuse/src/fuse_api_handle.rs     |   3 +-
 clients/filesystem-fuse/src/fuse_server.rs         |   8 +-
 clients/filesystem-fuse/src/gravitino_client.rs    | 277 +++++++++++++++++
 .../src/gravitino_fileset_filesystem.rs            | 130 ++++++++
 clients/filesystem-fuse/src/gvfs_fuse.rs           | 246 +++++++++++++++
 clients/filesystem-fuse/src/lib.rs                 |  17 +-
 clients/filesystem-fuse/src/main.rs                |  21 +-
 clients/filesystem-fuse/src/memory_filesystem.rs   |   8 +-
 clients/filesystem-fuse/src/mount.rs               | 118 --------
 clients/filesystem-fuse/src/utils.rs               |   3 +
 .../conf/gvfs_fuse_memory.toml}                    |  43 ++-
 .../{Cargo.toml => tests/conf/gvfs_fuse_test.toml} |  43 ++-
 clients/filesystem-fuse/tests/fuse_test.rs         |  10 +-
 19 files changed, 1218 insertions(+), 244 deletions(-)

diff --git a/clients/filesystem-fuse/Cargo.toml 
b/clients/filesystem-fuse/Cargo.toml
index 75a4dd713..4008ec5ca 100644
--- a/clients/filesystem-fuse/Cargo.toml
+++ b/clients/filesystem-fuse/Cargo.toml
@@ -35,11 +35,18 @@ name = "gvfs_fuse"
 [dependencies]
 async-trait = "0.1"
 bytes = "1.6.0"
+config = "0.13"
 dashmap = "6.1.0"
 fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] }
 futures-util = "0.3.30"
 libc = "0.2.168"
 log = "0.4.22"
 once_cell = "1.20.2"
+reqwest = { version = "0.12.9", features = ["json"] }
+serde = { version = "1.0.216", features = ["derive"] }
 tokio = { version = "1.38.0", features = ["full"] }
 tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+urlencoding = "2.1.3"
+
+[dev-dependencies]
+mockito = "0.31"
diff --git a/clients/filesystem-fuse/Cargo.toml 
b/clients/filesystem-fuse/conf/gvfs_fuse.toml
similarity index 54%
copy from clients/filesystem-fuse/Cargo.toml
copy to clients/filesystem-fuse/conf/gvfs_fuse.toml
index 75a4dd713..94d3d8560 100644
--- a/clients/filesystem-fuse/Cargo.toml
+++ b/clients/filesystem-fuse/conf/gvfs_fuse.toml
@@ -15,31 +15,24 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[package]
-name = "filesystem-fuse"
-version = "0.8.0-incubating-SNAPSHOT"
-rust-version = "1.75"
-edition = "2021"
+# fuse settings
+[fuse]
+file_mask = 0o600
+dir_mask = 0o700
+fs_type = "memory"
 
-homepage = "https://gravitino.apache.org";
-license = "Apache-2.0"
-repository = "https://github.com/apache/gravitino";
+[fuse.properties]
 
-[[bin]]
-name = "gvfs-fuse"
-path = "src/main.rs"
+# filesystem settings
+[filesystem]
+block_size = 8192
 
-[lib]
-name = "gvfs_fuse"
+# Gravitino settings
+[gravitino]
+uri = "http://localhost:8090";
+metalake = "your_metalake"
 
-[dependencies]
-async-trait = "0.1"
-bytes = "1.6.0"
-dashmap = "6.1.0"
-fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] }
-futures-util = "0.3.30"
-libc = "0.2.168"
-log = "0.4.22"
-once_cell = "1.20.2"
-tokio = { version = "1.38.0", features = ["full"] }
-tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+# extent settings
+[extend_config]
+access_key = "your access_key"
+secret_key = "your_secret_key"
diff --git a/clients/filesystem-fuse/src/config.rs 
b/clients/filesystem-fuse/src/config.rs
new file mode 100644
index 000000000..b381caa75
--- /dev/null
+++ b/clients/filesystem-fuse/src/config.rs
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::ErrorCode::{ConfigNotFound, InvalidConfig};
+use crate::utils::GvfsResult;
+use config::{builder, Config};
+use log::{error, info, warn};
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::fs;
+
+pub(crate) const CONF_FUSE_FILE_MASK: ConfigEntity<u32> = ConfigEntity::new(
+    FuseConfig::MODULE_NAME,
+    "file_mask",
+    "The default file mask for the FUSE filesystem",
+    0o600,
+);
+
+pub(crate) const CONF_FUSE_DIR_MASK: ConfigEntity<u32> = ConfigEntity::new(
+    FuseConfig::MODULE_NAME,
+    "dir_mask",
+    "The default directory mask for the FUSE filesystem",
+    0o700,
+);
+
+pub(crate) const CONF_FUSE_FS_TYPE: ConfigEntity<&'static str> = 
ConfigEntity::new(
+    FuseConfig::MODULE_NAME,
+    "fs_type",
+    "The type of the FUSE filesystem",
+    "memory",
+);
+
+pub(crate) const CONF_FUSE_CONFIG_PATH: ConfigEntity<&'static str> = 
ConfigEntity::new(
+    FuseConfig::MODULE_NAME,
+    "config_path",
+    "The path of the FUSE configuration file",
+    "/etc/gvfs/gvfs.toml",
+);
+
+pub(crate) const CONF_FILESYSTEM_BLOCK_SIZE: ConfigEntity<u32> = 
ConfigEntity::new(
+    FilesystemConfig::MODULE_NAME,
+    "block_size",
+    "The block size of the gvfs fuse filesystem",
+    4096,
+);
+
+pub(crate) const CONF_GRAVITINO_URI: ConfigEntity<&'static str> = 
ConfigEntity::new(
+    GravitinoConfig::MODULE_NAME,
+    "uri",
+    "The URI of the Gravitino server",
+    "http://localhost:8090";,
+);
+
+pub(crate) const CONF_GRAVITINO_METALAKE: ConfigEntity<&'static str> = 
ConfigEntity::new(
+    GravitinoConfig::MODULE_NAME,
+    "metalake",
+    "The metalake of the Gravitino server",
+    "",
+);
+
+pub(crate) struct ConfigEntity<T: 'static> {
+    module: &'static str,
+    name: &'static str,
+    description: &'static str,
+    pub(crate) default: T,
+}
+
+impl<T> ConfigEntity<T> {
+    const fn new(
+        module: &'static str,
+        name: &'static str,
+        description: &'static str,
+        default: T,
+    ) -> Self {
+        ConfigEntity {
+            module: module,
+            name: name,
+            description: description,
+            default: default,
+        }
+    }
+}
+
+enum ConfigValue {
+    I32(ConfigEntity<i32>),
+    U32(ConfigEntity<u32>),
+    String(ConfigEntity<&'static str>),
+    Bool(ConfigEntity<bool>),
+    Float(ConfigEntity<f64>),
+}
+
+struct DefaultConfig {
+    configs: HashMap<String, ConfigValue>,
+}
+
+impl Default for DefaultConfig {
+    fn default() -> Self {
+        let mut configs = HashMap::new();
+
+        configs.insert(
+            Self::compose_key(CONF_FUSE_FILE_MASK),
+            ConfigValue::U32(CONF_FUSE_FILE_MASK),
+        );
+        configs.insert(
+            Self::compose_key(CONF_FUSE_DIR_MASK),
+            ConfigValue::U32(CONF_FUSE_DIR_MASK),
+        );
+        configs.insert(
+            Self::compose_key(CONF_FUSE_FS_TYPE),
+            ConfigValue::String(CONF_FUSE_FS_TYPE),
+        );
+        configs.insert(
+            Self::compose_key(CONF_FUSE_CONFIG_PATH),
+            ConfigValue::String(CONF_FUSE_CONFIG_PATH),
+        );
+        configs.insert(
+            Self::compose_key(CONF_GRAVITINO_URI),
+            ConfigValue::String(CONF_GRAVITINO_URI),
+        );
+        configs.insert(
+            Self::compose_key(CONF_GRAVITINO_METALAKE),
+            ConfigValue::String(CONF_GRAVITINO_METALAKE),
+        );
+        configs.insert(
+            Self::compose_key(CONF_FILESYSTEM_BLOCK_SIZE),
+            ConfigValue::U32(CONF_FILESYSTEM_BLOCK_SIZE),
+        );
+
+        DefaultConfig { configs }
+    }
+}
+
+impl DefaultConfig {
+    fn compose_key<T>(entity: ConfigEntity<T>) -> String {
+        format!("{}.{}", entity.module, entity.name)
+    }
+}
+
+#[derive(Debug, Deserialize)]
+pub struct AppConfig {
+    #[serde(default)]
+    pub fuse: FuseConfig,
+    #[serde(default)]
+    pub filesystem: FilesystemConfig,
+    #[serde(default)]
+    pub gravitino: GravitinoConfig,
+    #[serde(default)]
+    pub extend_config: HashMap<String, String>,
+}
+
+impl Default for AppConfig {
+    fn default() -> Self {
+        let builder = Self::crete_default_config_builder();
+        let conf = builder
+            .build()
+            .expect("Failed to build default configuration");
+        conf.try_deserialize::<AppConfig>()
+            .expect("Failed to deserialize default AppConfig")
+    }
+}
+
+type ConfigBuilder = builder::ConfigBuilder<builder::DefaultState>;
+
+impl AppConfig {
+    fn crete_default_config_builder() -> ConfigBuilder {
+        let default = DefaultConfig::default();
+
+        default
+            .configs
+            .values()
+            .fold(
+                Config::builder(),
+                |builder, config_entity| match config_entity {
+                    ConfigValue::I32(entity) => Self::add_config(builder, 
entity),
+                    ConfigValue::U32(entity) => Self::add_config(builder, 
entity),
+                    ConfigValue::String(entity) => Self::add_config(builder, 
entity),
+                    ConfigValue::Bool(entity) => Self::add_config(builder, 
entity),
+                    ConfigValue::Float(entity) => Self::add_config(builder, 
entity),
+                },
+            )
+    }
+
+    fn add_config<T: Clone + Into<config::Value>>(
+        builder: ConfigBuilder,
+        entity: &ConfigEntity<T>,
+    ) -> ConfigBuilder {
+        let name = format!("{}.{}", entity.module, entity.name);
+        builder
+            .set_default(&name, entity.default.clone().into())
+            .unwrap_or_else(|e| panic!("Failed to set default for {}: {}", 
entity.name, e))
+    }
+
+    pub fn from_file(config_file_path: Option<&str>) -> GvfsResult<AppConfig> {
+        let builder = Self::crete_default_config_builder();
+
+        let config_path = {
+            if config_file_path.is_some() {
+                let path = config_file_path.unwrap();
+                //check config file exists
+                if fs::metadata(path).is_err() {
+                    return Err(
+                        ConfigNotFound.to_error("The configuration file not 
found".to_string())
+                    );
+                }
+                info!("Use configuration file: {}", path);
+                path
+            } else {
+                //use default config
+                if fs::metadata(CONF_FUSE_CONFIG_PATH.default).is_err() {
+                    warn!(
+                        "The default configuration file is not found, using 
the default configuration"
+                    );
+                    return Ok(AppConfig::default());
+                } else {
+                    warn!(
+                        "Using the default config file {}",
+                        CONF_FUSE_CONFIG_PATH.default
+                    );
+                }
+                CONF_FUSE_CONFIG_PATH.default
+            }
+        };
+        let config = builder
+            .add_source(config::File::with_name(config_path).required(true))
+            .build();
+        if let Err(e) = config {
+            let msg = format!("Failed to build configuration: {}", e);
+            error!("{}", msg);
+            return Err(InvalidConfig.to_error(msg));
+        }
+
+        let conf = config.unwrap();
+        let app_config = conf.try_deserialize::<AppConfig>();
+
+        if let Err(e) = app_config {
+            let msg = format!("Failed to deserialize configuration: {}", e);
+            error!("{}", msg);
+            return Err(InvalidConfig.to_error(msg));
+        }
+        Ok(app_config.unwrap())
+    }
+}
+
+#[derive(Debug, Deserialize, Default)]
+pub struct FuseConfig {
+    #[serde(default)]
+    pub file_mask: u32,
+    #[serde(default)]
+    pub dir_mask: u32,
+    #[serde(default)]
+    pub fs_type: String,
+    #[serde(default)]
+    pub config_path: String,
+    #[serde(default)]
+    pub properties: HashMap<String, String>,
+}
+
+impl FuseConfig {
+    const MODULE_NAME: &'static str = "fuse";
+}
+
+#[derive(Debug, Deserialize, Default)]
+pub struct FilesystemConfig {
+    #[serde(default)]
+    pub block_size: u32,
+}
+
+impl FilesystemConfig {
+    const MODULE_NAME: &'static str = "filesystem";
+}
+
+#[derive(Debug, Deserialize, Default)]
+pub struct GravitinoConfig {
+    #[serde(default)]
+    pub uri: String,
+    #[serde(default)]
+    pub metalake: String,
+}
+
+impl GravitinoConfig {
+    const MODULE_NAME: &'static str = "gravitino";
+}
+
+#[cfg(test)]
+mod test {
+    use crate::config::AppConfig;
+
+    #[test]
+    fn test_config_from_file() {
+        let config = 
AppConfig::from_file(Some("tests/conf/gvfs_fuse_test.toml")).unwrap();
+        assert_eq!(config.fuse.file_mask, 0o644);
+        assert_eq!(config.fuse.dir_mask, 0o755);
+        assert_eq!(config.filesystem.block_size, 8192);
+        assert_eq!(config.gravitino.uri, "http://localhost:8090";);
+        assert_eq!(config.gravitino.metalake, "test");
+        assert_eq!(
+            config.extend_config.get("access_key"),
+            Some(&"XXX_access_key".to_string())
+        );
+        assert_eq!(
+            config.extend_config.get("secret_key"),
+            Some(&"XXX_secret_key".to_string())
+        );
+    }
+
+    #[test]
+    fn test_default_config() {
+        let config = AppConfig::default();
+        assert_eq!(config.fuse.file_mask, 0o600);
+        assert_eq!(config.fuse.dir_mask, 0o700);
+        assert_eq!(config.filesystem.block_size, 4096);
+        assert_eq!(config.gravitino.uri, "http://localhost:8090";);
+        assert_eq!(config.gravitino.metalake, "");
+    }
+}
diff --git a/clients/filesystem-fuse/src/default_raw_filesystem.rs 
b/clients/filesystem-fuse/src/default_raw_filesystem.rs
index 0ab92e916..0c9836e5b 100644
--- a/clients/filesystem-fuse/src/default_raw_filesystem.rs
+++ b/clients/filesystem-fuse/src/default_raw_filesystem.rs
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::config::AppConfig;
 use crate::filesystem::{
-    FileStat, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID, 
ROOT_DIR_FILE_ID,
-    ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
+    FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result, 
INITIAL_FILE_ID,
+    ROOT_DIR_FILE_ID, ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
 };
 use crate::opened_file::{FileHandle, OpenFileFlags};
 use crate::opened_file_manager::OpenedFileManager;
@@ -47,7 +48,7 @@ pub struct DefaultRawFileSystem<T: PathFileSystem> {
 }
 
 impl<T: PathFileSystem> DefaultRawFileSystem<T> {
-    pub(crate) fn new(fs: T) -> Self {
+    pub(crate) fn new(fs: T, _config: &AppConfig, _fs_context: 
&FileSystemContext) -> Self {
         Self {
             file_entry_manager: RwLock::new(FileEntryManager::new()),
             opened_file_manager: OpenedFileManager::new(),
@@ -189,8 +190,7 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
         let file_entry = self.get_file_entry(file_id).await?;
         let mut child_filestats = self.fs.read_dir(&file_entry.path).await?;
         for file_stat in child_filestats.iter_mut() {
-            self.resolve_file_id_to_filestat(file_stat, file_stat.file_id)
-                .await;
+            self.resolve_file_id_to_filestat(file_stat, file_id).await;
         }
         Ok(child_filestats)
     }
@@ -280,13 +280,7 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
         file.close().await
     }
 
-    async fn read(
-        &self,
-        _file_id: u64,
-        fh: u64,
-        offset: u64,
-        size: u32,
-    ) -> crate::filesystem::Result<Bytes> {
+    async fn read(&self, _file_id: u64, fh: u64, offset: u64, size: u32) -> 
Result<Bytes> {
         let (data, file_stat) = {
             let opened_file = self
                 .opened_file_manager
@@ -303,13 +297,7 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
         data
     }
 
-    async fn write(
-        &self,
-        _file_id: u64,
-        fh: u64,
-        offset: u64,
-        data: &[u8],
-    ) -> crate::filesystem::Result<u32> {
+    async fn write(&self, _file_id: u64, fh: u64, offset: u64, data: &[u8]) -> 
Result<u32> {
         let (len, file_stat) = {
             let opened_file = self
                 .opened_file_manager
@@ -405,7 +393,11 @@ mod tests {
     #[tokio::test]
     async fn test_default_raw_file_system() {
         let memory_fs = MemoryFileSystem::new().await;
-        let raw_fs = DefaultRawFileSystem::new(memory_fs);
+        let raw_fs = DefaultRawFileSystem::new(
+            memory_fs,
+            &AppConfig::default(),
+            &FileSystemContext::default(),
+        );
         let _ = raw_fs.init().await;
         let mut tester = TestRawFileSystem::new(raw_fs);
         tester.test_raw_file_system().await;
diff --git a/clients/filesystem-fuse/src/error.rs 
b/clients/filesystem-fuse/src/error.rs
new file mode 100644
index 000000000..ba3c037c5
--- /dev/null
+++ b/clients/filesystem-fuse/src/error.rs
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use fuse3::Errno;
+
+#[derive(Debug, Copy, Clone)]
+pub enum ErrorCode {
+    UnSupportedFilesystem,
+    GravitinoClientError,
+    InvalidConfig,
+    ConfigNotFound,
+}
+
+impl ErrorCode {
+    pub fn to_error(self, message: impl Into<String>) -> GvfsError {
+        GvfsError::Error(self, message.into())
+    }
+}
+
+impl std::fmt::Display for ErrorCode {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        match self {
+            ErrorCode::UnSupportedFilesystem => write!(f, "Unsupported 
filesystem"),
+            ErrorCode::GravitinoClientError => write!(f, "Gravitino client 
error"),
+            ErrorCode::InvalidConfig => write!(f, "Invalid config"),
+            ErrorCode::ConfigNotFound => write!(f, "Config not found"),
+        }
+    }
+}
+
+#[derive(Debug)]
+pub enum GvfsError {
+    RestError(String, reqwest::Error),
+    Error(ErrorCode, String),
+    Errno(Errno),
+    IOError(std::io::Error),
+}
+impl From<reqwest::Error> for GvfsError {
+    fn from(err: reqwest::Error) -> Self {
+        GvfsError::RestError("Http request failed:".to_owned() + 
&err.to_string(), err)
+    }
+}
+
+impl From<Errno> for GvfsError {
+    fn from(errno: Errno) -> Self {
+        GvfsError::Errno(errno)
+    }
+}
+
+impl From<std::io::Error> for GvfsError {
+    fn from(err: std::io::Error) -> Self {
+        GvfsError::IOError(err)
+    }
+}
diff --git a/clients/filesystem-fuse/src/filesystem.rs 
b/clients/filesystem-fuse/src/filesystem.rs
index d9440b0e6..742cdd4c8 100644
--- a/clients/filesystem-fuse/src/filesystem.rs
+++ b/clients/filesystem-fuse/src/filesystem.rs
@@ -16,6 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::config::{
+    AppConfig, CONF_FILESYSTEM_BLOCK_SIZE, CONF_FUSE_DIR_MASK, 
CONF_FUSE_FILE_MASK,
+};
 use crate::opened_file::{FileHandle, OpenFileFlags, OpenedFile};
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -129,6 +132,8 @@ pub(crate) trait PathFileSystem: Send + Sync {
 
     /// Remove the directory by file path
     async fn remove_dir(&self, path: &Path) -> Result<()>;
+
+    fn get_capacity(&self) -> Result<FileSystemCapacity>;
 }
 
 // FileSystemContext is the system environment for the fuse file system.
@@ -150,17 +155,30 @@ pub(crate) struct FileSystemContext {
 }
 
 impl FileSystemContext {
-    pub(crate) fn new(uid: u32, gid: u32) -> Self {
+    pub(crate) fn new(uid: u32, gid: u32, config: &AppConfig) -> Self {
         FileSystemContext {
             uid,
             gid,
-            default_file_perm: 0o644,
-            default_dir_perm: 0o755,
-            block_size: 4 * 1024,
+            default_file_perm: config.fuse.file_mask as u16,
+            default_dir_perm: config.fuse.dir_mask as u16,
+            block_size: config.filesystem.block_size,
+        }
+    }
+
+    pub(crate) fn default() -> Self {
+        FileSystemContext {
+            uid: 0,
+            gid: 0,
+            default_file_perm: CONF_FUSE_FILE_MASK.default as u16,
+            default_dir_perm: CONF_FUSE_DIR_MASK.default as u16,
+            block_size: CONF_FILESYSTEM_BLOCK_SIZE.default,
         }
     }
 }
 
+// capacity of the file system
+pub struct FileSystemCapacity {}
+
 // FileStat is the file metadata of the file
 #[derive(Clone, Debug)]
 pub struct FileStat {
@@ -336,7 +354,7 @@ pub(crate) mod tests {
             let opened_file = self.fs.create_file(path, 
OpenFileFlags(0)).await;
             assert!(opened_file.is_ok());
             let file = opened_file.unwrap();
-            self.assert_file_stat(&file.file_stat, path, 
FileType::RegularFile, 0);
+            self.assert_file_stat(&file.file_stat, path, RegularFile, 0);
             self.test_stat_file(path, RegularFile, 0).await;
         }
 
@@ -410,6 +428,9 @@ pub(crate) mod tests {
             // Test root dir
             self.test_root_dir().await;
 
+            // test read root dir
+            self.test_list_dir(ROOT_DIR_FILE_ID, false).await;
+
             let parent_file_id = ROOT_DIR_FILE_ID;
             // Test lookup file
             let file_id = self
@@ -445,7 +466,7 @@ pub(crate) mod tests {
             self.test_create_dir(parent_file_id, "dir1".as_ref()).await;
 
             // Test list dir
-            self.test_list_dir(parent_file_id).await;
+            self.test_list_dir(parent_file_id, true).await;
 
             // Test remove file
             self.test_remove_file(parent_file_id, "file1.txt".as_ref())
@@ -455,7 +476,7 @@ pub(crate) mod tests {
             self.test_remove_dir(parent_file_id, "dir1".as_ref()).await;
 
             // Test list dir again
-            self.test_list_dir(parent_file_id).await;
+            self.test_list_dir(parent_file_id, true).await;
 
             // Test file not found
             self.test_file_not_found(23).await;
@@ -465,12 +486,7 @@ pub(crate) mod tests {
             let root_file_stat = self.fs.stat(ROOT_DIR_FILE_ID).await;
             assert!(root_file_stat.is_ok());
             let root_file_stat = root_file_stat.unwrap();
-            self.assert_file_stat(
-                &root_file_stat,
-                Path::new(ROOT_DIR_PATH),
-                FileType::Directory,
-                0,
-            );
+            self.assert_file_stat(&root_file_stat, Path::new(ROOT_DIR_PATH), 
Directory, 0);
         }
 
         async fn test_lookup_file(
@@ -582,10 +598,14 @@ pub(crate) mod tests {
                 .await;
         }
 
-        async fn test_list_dir(&self, root_file_id: u64) {
+        async fn test_list_dir(&self, root_file_id: u64, check_child: bool) {
             let list_dir = self.fs.read_dir(root_file_id).await;
             assert!(list_dir.is_ok());
             let list_dir = list_dir.unwrap();
+
+            if !check_child {
+                return;
+            }
             assert_eq!(list_dir.len(), self.files.len());
             for file_stat in list_dir {
                 assert!(self.files.contains_key(&file_stat.file_id));
@@ -650,28 +670,28 @@ pub(crate) mod tests {
         assert_eq!(file_stat.name, "b");
         assert_eq!(file_stat.path, Path::new("a/b"));
         assert_eq!(file_stat.size, 10);
-        assert_eq!(file_stat.kind, FileType::RegularFile);
+        assert_eq!(file_stat.kind, RegularFile);
 
         //test new dir
         let file_stat = FileStat::new_dir_filestat("a".as_ref(), "b".as_ref());
         assert_eq!(file_stat.name, "b");
         assert_eq!(file_stat.path, Path::new("a/b"));
         assert_eq!(file_stat.size, 0);
-        assert_eq!(file_stat.kind, FileType::Directory);
+        assert_eq!(file_stat.kind, Directory);
 
         //test new file with path
         let file_stat = FileStat::new_file_filestat_with_path("a/b".as_ref(), 
10);
         assert_eq!(file_stat.name, "b");
         assert_eq!(file_stat.path, Path::new("a/b"));
         assert_eq!(file_stat.size, 10);
-        assert_eq!(file_stat.kind, FileType::RegularFile);
+        assert_eq!(file_stat.kind, RegularFile);
 
         //test new dir with path
         let file_stat = FileStat::new_dir_filestat_with_path("a/b".as_ref());
         assert_eq!(file_stat.name, "b");
         assert_eq!(file_stat.path, Path::new("a/b"));
         assert_eq!(file_stat.size, 0);
-        assert_eq!(file_stat.kind, FileType::Directory);
+        assert_eq!(file_stat.kind, Directory);
     }
 
     #[test]
diff --git a/clients/filesystem-fuse/src/fuse_api_handle.rs 
b/clients/filesystem-fuse/src/fuse_api_handle.rs
index 1f24e94ee..153e32389 100644
--- a/clients/filesystem-fuse/src/fuse_api_handle.rs
+++ b/clients/filesystem-fuse/src/fuse_api_handle.rs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+use crate::config::AppConfig;
 use crate::filesystem::{FileStat, FileSystemContext, RawFileSystem};
 use fuse3::path::prelude::{ReplyData, ReplyOpen, ReplyStatFs, ReplyWrite};
 use fuse3::path::Request;
@@ -44,7 +45,7 @@ impl<T: RawFileSystem> FuseApiHandle<T> {
     const DEFAULT_ATTR_TTL: Duration = Duration::from_secs(1);
     const DEFAULT_MAX_WRITE_SIZE: u32 = 16 * 1024;
 
-    pub fn new(fs: T, context: FileSystemContext) -> Self {
+    pub fn new(fs: T, _config: &AppConfig, context: FileSystemContext) -> Self 
{
         Self {
             fs: fs,
             default_ttl: Self::DEFAULT_ATTR_TTL,
diff --git a/clients/filesystem-fuse/src/fuse_server.rs 
b/clients/filesystem-fuse/src/fuse_server.rs
index dae7c28a6..a059686e1 100644
--- a/clients/filesystem-fuse/src/fuse_server.rs
+++ b/clients/filesystem-fuse/src/fuse_server.rs
@@ -16,8 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::utils::GvfsResult;
 use fuse3::raw::{Filesystem, Session};
-use fuse3::{MountOptions, Result};
+use fuse3::MountOptions;
 use log::{error, info};
 use std::process::exit;
 use std::sync::Arc;
@@ -43,7 +44,7 @@ impl FuseServer {
     }
 
     /// Starts the FUSE filesystem and blocks until it is stopped.
-    pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) -> 
Result<()> {
+    pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) -> 
GvfsResult<()> {
         //check if the mount point exists
         if !std::path::Path::new(&self.mount_point).exists() {
             error!("Mount point {} does not exist", self.mount_point);
@@ -83,11 +84,12 @@ impl FuseServer {
     }
 
     /// Stops the FUSE filesystem.
-    pub async fn stop(&self) {
+    pub async fn stop(&self) -> GvfsResult<()> {
         info!("Stopping FUSE filesystem...");
         self.close_notify.notify_one();
 
         // wait for the filesystem to stop
         self.close_notify.notified().await;
+        Ok(())
     }
 }
diff --git a/clients/filesystem-fuse/src/gravitino_client.rs 
b/clients/filesystem-fuse/src/gravitino_client.rs
new file mode 100644
index 000000000..e5553c9f6
--- /dev/null
+++ b/clients/filesystem-fuse/src/gravitino_client.rs
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::GravitinoConfig;
+use crate::error::{ErrorCode, GvfsError};
+use reqwest::Client;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use urlencoding::encode;
+
+#[derive(Debug, Deserialize)]
+pub(crate) struct Fileset {
+    pub(crate) name: String,
+    #[serde(rename = "type")]
+    pub(crate) fileset_type: String,
+    comment: String,
+    #[serde(rename = "storageLocation")]
+    pub(crate) storage_location: String,
+    properties: HashMap<String, String>,
+}
+
+#[derive(Debug, Deserialize)]
+struct FilesetResponse {
+    code: u32,
+    fileset: Fileset,
+}
+
+#[derive(Debug, Deserialize)]
+struct FileLocationResponse {
+    code: u32,
+    #[serde(rename = "fileLocation")]
+    location: String,
+}
+
+pub(crate) struct GravitinoClient {
+    gravitino_uri: String,
+    metalake: String,
+
+    client: Client,
+}
+
+impl GravitinoClient {
+    pub fn new(config: &GravitinoConfig) -> Self {
+        Self {
+            gravitino_uri: config.uri.clone(),
+            metalake: config.metalake.clone(),
+            client: Client::new(),
+        }
+    }
+
+    pub fn init(&self) {}
+
+    pub fn do_post(&self, _path: &str, _data: &str) {
+        todo!()
+    }
+
+    pub fn request(&self, _path: &str, _data: &str) -> Result<(), GvfsError> {
+        todo!()
+    }
+
+    pub fn list_schema(&self) -> Result<(), GvfsError> {
+        todo!()
+    }
+
+    pub fn list_fileset(&self) -> Result<(), GvfsError> {
+        todo!()
+    }
+
+    fn get_fileset_url(&self, catalog_name: &str, schema_name: &str, 
fileset_name: &str) -> String {
+        format!(
+            "{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}",
+            self.gravitino_uri, self.metalake, catalog_name, schema_name, 
fileset_name
+        )
+    }
+
+    async fn do_get<T>(&self, url: &str) -> Result<T, GvfsError>
+    where
+        T: for<'de> Deserialize<'de>,
+    {
+        let http_resp =
+            self.client.get(url).send().await.map_err(|e| {
+                GvfsError::RestError(format!("Failed to send request to {}", 
url), e)
+            })?;
+
+        let res = http_resp.json::<T>().await.map_err(|e| {
+            GvfsError::RestError(format!("Failed to parse response from {}", 
url), e)
+        })?;
+
+        Ok(res)
+    }
+
+    pub async fn get_fileset(
+        &self,
+        catalog_name: &str,
+        schema_name: &str,
+        fileset_name: &str,
+    ) -> Result<Fileset, GvfsError> {
+        let url = self.get_fileset_url(catalog_name, schema_name, 
fileset_name);
+        let res = self.do_get::<FilesetResponse>(&url).await?;
+
+        if res.code != 0 {
+            return Err(GvfsError::Error(
+                ErrorCode::GravitinoClientError,
+                "Failed to get fileset".to_string(),
+            ));
+        }
+        Ok(res.fileset)
+    }
+
+    pub fn get_file_location_url(
+        &self,
+        catalog_name: &str,
+        schema_name: &str,
+        fileset_name: &str,
+        path: &str,
+    ) -> String {
+        let encoded_path = encode(path);
+        format!(
+            
"{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}",
+            self.gravitino_uri,
+            self.metalake,
+            catalog_name,
+            schema_name,
+            fileset_name,
+            encoded_path
+        )
+    }
+
+    pub async fn get_file_location(
+        &self,
+        catalog_name: &str,
+        schema_name: &str,
+        fileset_name: &str,
+        path: &str,
+    ) -> Result<String, GvfsError> {
+        let url = self.get_file_location_url(catalog_name, schema_name, 
fileset_name, path);
+        let res = self.do_get::<FileLocationResponse>(&url).await?;
+
+        if res.code != 0 {
+            return Err(GvfsError::Error(
+                ErrorCode::GravitinoClientError,
+                "Failed to get file location".to_string(),
+            ));
+        }
+        Ok(res.location)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use mockito::mock;
+
+    #[tokio::test]
+    async fn test_get_fileset_success() {
+        let fileset_response = r#"
+        {
+            "code": 0,
+            "fileset": {
+                "name": "example_fileset",
+                "type": "example_type",
+                "comment": "This is a test fileset",
+                "storageLocation": "/example/path",
+                "properties": {
+                    "key1": "value1",
+                    "key2": "value2"
+                }
+            }
+        }"#;
+
+        let mock_server_url = &mockito::server_url();
+
+        let url = format!(
+            "/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}",
+            "test", "catalog1", "schema1", "fileset1"
+        );
+        let _m = mock("GET", url.as_str())
+            .with_status(200)
+            .with_header("content-type", "application/json")
+            .with_body(fileset_response)
+            .create();
+
+        let config = GravitinoConfig {
+            uri: mock_server_url.to_string(),
+            metalake: "test".to_string(),
+        };
+        let client = GravitinoClient::new(&config);
+
+        let result = client.get_fileset("catalog1", "schema1", 
"fileset1").await;
+
+        match result {
+            Ok(fileset) => {
+                assert_eq!(fileset.name, "example_fileset");
+                assert_eq!(fileset.fileset_type, "example_type");
+                assert_eq!(fileset.storage_location, "/example/path");
+                assert_eq!(fileset.properties.get("key1"), 
Some(&"value1".to_string()));
+            }
+            Err(e) => panic!("Expected Ok, but got Err: {:?}", e),
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_file_location_success() {
+        let file_location_response = r#"
+        {
+            "code": 0,
+            "fileLocation": "/mybucket/a"
+        }"#;
+
+        let mock_server_url = &mockito::server_url();
+
+        let url = format!(
+            
"/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}",
+            "test",
+            "catalog1",
+            "schema1",
+            "fileset1",
+            encode("/example/path")
+        );
+        let _m = mock("GET", url.as_str())
+            .with_status(200)
+            .with_header("content-type", "application/json")
+            .with_body(file_location_response)
+            .create();
+
+        let config = GravitinoConfig {
+            uri: mock_server_url.to_string(),
+            metalake: "test".to_string(),
+        };
+        let client = GravitinoClient::new(&config);
+
+        let result = client
+            .get_file_location("catalog1", "schema1", "fileset1", 
"/example/path")
+            .await;
+
+        match result {
+            Ok(location) => {
+                assert_eq!(location, "/mybucket/a");
+            }
+            Err(e) => panic!("Expected Ok, but got Err: {:?}", e),
+        }
+    }
+
+    async fn get_fileset_example() {
+        tracing_subscriber::fmt::init();
+        let config = GravitinoConfig {
+            uri: "http://localhost:8090".to_string(),
+            metalake: "test".to_string(),
+        };
+        let client = GravitinoClient::new(&config);
+        client.init();
+        let result = client.get_fileset("c1", "s1", "fileset1").await;
+        if let Err(e) = &result {
+            println!("{:?}", e);
+        }
+
+        let fileset = result.unwrap();
+        println!("{:?}", fileset);
+        assert_eq!(fileset.name, "fileset1");
+    }
+}
diff --git a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs 
b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
new file mode 100644
index 000000000..98a295dbb
--- /dev/null
+++ b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::AppConfig;
+use crate::filesystem::{FileStat, FileSystemCapacity, FileSystemContext, 
PathFileSystem, Result};
+use crate::gravitino_client::GravitinoClient;
+use crate::opened_file::{OpenFileFlags, OpenedFile};
+use async_trait::async_trait;
+use fuse3::Errno;
+use std::path::{Path, PathBuf};
+
+/// GravitinoFileSystem is a filesystem that is associated with a fileset in 
Gravitino.
+/// It mapping the fileset path to the original data storage path. and 
delegate the operation
+/// to the inner filesystem like S3 GCS, JuiceFS.
+pub(crate) struct GravitinoFilesetFileSystem {
+    physical_fs: Box<dyn PathFileSystem>,
+    client: GravitinoClient,
+    fileset_location: PathBuf,
+}
+
+impl GravitinoFilesetFileSystem {
+    pub async fn new(
+        fs: Box<dyn PathFileSystem>,
+        location: &Path,
+        client: GravitinoClient,
+        _config: &AppConfig,
+        _context: &FileSystemContext,
+    ) -> Self {
+        Self {
+            physical_fs: fs,
+            client: client,
+            fileset_location: location.into(),
+        }
+    }
+
+    fn gvfs_path_to_raw_path(&self, path: &Path) -> PathBuf {
+        self.fileset_location.join(path)
+    }
+
+    fn raw_path_to_gvfs_path(&self, path: &Path) -> Result<PathBuf> {
+        path.strip_prefix(&self.fileset_location)
+            .map_err(|_| Errno::from(libc::EBADF))?;
+        Ok(path.into())
+    }
+}
+
+#[async_trait]
+impl PathFileSystem for GravitinoFilesetFileSystem {
+    async fn init(&self) -> Result<()> {
+        self.physical_fs.init().await
+    }
+
+    async fn stat(&self, path: &Path) -> Result<FileStat> {
+        let raw_path = self.gvfs_path_to_raw_path(path);
+        let mut file_stat = self.physical_fs.stat(&raw_path).await?;
+        file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?;
+        Ok(file_stat)
+    }
+
+    async fn read_dir(&self, path: &Path) -> Result<Vec<FileStat>> {
+        let raw_path = self.gvfs_path_to_raw_path(path);
+        let mut child_filestats = self.physical_fs.read_dir(&raw_path).await?;
+        for file_stat in child_filestats.iter_mut() {
+            file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?;
+        }
+        Ok(child_filestats)
+    }
+
+    async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
+        let raw_path = self.gvfs_path_to_raw_path(path);
+        let mut opened_file = self.physical_fs.open_file(&raw_path, 
flags).await?;
+        opened_file.file_stat.path = 
self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?;
+        Ok(opened_file)
+    }
+
+    async fn open_dir(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
+        let raw_path = self.gvfs_path_to_raw_path(path);
+        let mut opened_file = self.physical_fs.open_dir(&raw_path, 
flags).await?;
+        opened_file.file_stat.path = 
self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?;
+        Ok(opened_file)
+    }
+
+    async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
+        let raw_path = self.gvfs_path_to_raw_path(path);
+        let mut opened_file = self.physical_fs.create_file(&raw_path, 
flags).await?;
+        opened_file.file_stat.path = 
self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?;
+        Ok(opened_file)
+    }
+
+    async fn create_dir(&self, path: &Path) -> Result<FileStat> {
+        let raw_path = self.gvfs_path_to_raw_path(path);
+        let mut file_stat = self.physical_fs.create_dir(&raw_path).await?;
+        file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?;
+        Ok(file_stat)
+    }
+
+    async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) 
-> Result<()> {
+        let raw_path = self.gvfs_path_to_raw_path(path);
+        self.physical_fs.set_attr(&raw_path, file_stat, flush).await
+    }
+
+    async fn remove_file(&self, path: &Path) -> Result<()> {
+        let raw_path = self.gvfs_path_to_raw_path(path);
+        self.physical_fs.remove_file(&raw_path).await
+    }
+
+    async fn remove_dir(&self, path: &Path) -> Result<()> {
+        let raw_path = self.gvfs_path_to_raw_path(path);
+        self.physical_fs.remove_dir(&raw_path).await
+    }
+
+    fn get_capacity(&self) -> Result<FileSystemCapacity> {
+        self.physical_fs.get_capacity()
+    }
+}
diff --git a/clients/filesystem-fuse/src/gvfs_fuse.rs 
b/clients/filesystem-fuse/src/gvfs_fuse.rs
new file mode 100644
index 000000000..d472895d2
--- /dev/null
+++ b/clients/filesystem-fuse/src/gvfs_fuse.rs
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::AppConfig;
+use crate::default_raw_filesystem::DefaultRawFileSystem;
+use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem};
+use crate::filesystem::FileSystemContext;
+use crate::fuse_api_handle::FuseApiHandle;
+use crate::fuse_server::FuseServer;
+use crate::gravitino_client::GravitinoClient;
+use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
+use crate::memory_filesystem::MemoryFileSystem;
+use crate::utils::GvfsResult;
+use log::info;
+use once_cell::sync::Lazy;
+use std::path::Path;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
+const FILESET_PREFIX: &str = "gvfs://fileset/";
+
+static SERVER: Lazy<Mutex<Option<Arc<FuseServer>>>> = Lazy::new(|| 
Mutex::new(None));
+
+pub(crate) enum CreateFileSystemResult {
+    Memory(MemoryFileSystem),
+    Gvfs(GravitinoFilesetFileSystem),
+    FuseMemoryFs(FuseApiHandle<DefaultRawFileSystem<MemoryFileSystem>>),
+    FuseGvfs(FuseApiHandle<DefaultRawFileSystem<GravitinoFilesetFileSystem>>),
+    None,
+}
+
+pub enum FileSystemSchema {
+    S3,
+}
+
+pub async fn mount(mount_to: &str, mount_from: &str, config: &AppConfig) -> 
GvfsResult<()> {
+    info!("Starting gvfs-fuse server...");
+    let svr = Arc::new(FuseServer::new(mount_to));
+    {
+        let mut server = SERVER.lock().await;
+        *server = Some(svr.clone());
+    }
+    let fs = create_fuse_fs(mount_from, config).await?;
+    match fs {
+        CreateFileSystemResult::FuseMemoryFs(vfs) => svr.start(vfs).await?,
+        CreateFileSystemResult::FuseGvfs(vfs) => svr.start(vfs).await?,
+        _ => return Err(UnSupportedFilesystem.to_error("Unsupported filesystem 
type".to_string())),
+    }
+    Ok(())
+}
+
+pub async fn unmount() -> GvfsResult<()> {
+    info!("Stop gvfs-fuse server...");
+    let svr = {
+        let mut server = SERVER.lock().await;
+        if server.is_none() {
+            info!("Server is already stopped.");
+            return Ok(());
+        }
+        server.take().unwrap()
+    };
+    svr.stop().await
+}
+
+pub(crate) async fn create_fuse_fs(
+    mount_from: &str,
+    config: &AppConfig,
+) -> GvfsResult<CreateFileSystemResult> {
+    let uid = unsafe { libc::getuid() };
+    let gid = unsafe { libc::getgid() };
+    let fs_context = FileSystemContext::new(uid, gid, config);
+    let fs = create_path_fs(mount_from, config, &fs_context).await?;
+    create_raw_fs(fs, config, fs_context).await
+}
+
+pub async fn create_raw_fs(
+    path_fs: CreateFileSystemResult,
+    config: &AppConfig,
+    fs_context: FileSystemContext,
+) -> GvfsResult<CreateFileSystemResult> {
+    match path_fs {
+        CreateFileSystemResult::Memory(fs) => {
+            let fs = FuseApiHandle::new(
+                DefaultRawFileSystem::new(fs, config, &fs_context),
+                config,
+                fs_context,
+            );
+            Ok(CreateFileSystemResult::FuseMemoryFs(fs))
+        }
+        CreateFileSystemResult::Gvfs(fs) => {
+            let fs = FuseApiHandle::new(
+                DefaultRawFileSystem::new(fs, config, &fs_context),
+                config,
+                fs_context,
+            );
+            Ok(CreateFileSystemResult::FuseGvfs(fs))
+        }
+        _ => Err(UnSupportedFilesystem.to_error("Unsupported filesystem 
type".to_string())),
+    }
+}
+
+pub async fn create_path_fs(
+    mount_from: &str,
+    config: &AppConfig,
+    fs_context: &FileSystemContext,
+) -> GvfsResult<CreateFileSystemResult> {
+    if config.fuse.fs_type == "memory" {
+        Ok(CreateFileSystemResult::Memory(
+            MemoryFileSystem::new().await,
+        ))
+    } else {
+        create_gvfs_filesystem(mount_from, config, fs_context).await
+    }
+}
+
+pub async fn create_gvfs_filesystem(
+    mount_from: &str,
+    config: &AppConfig,
+    fs_context: &FileSystemContext,
+) -> GvfsResult<CreateFileSystemResult> {
+    // Gvfs-fuse filesystem structure:
+    // FuseApiHandle
+    // ├─ DefaultRawFileSystem (RawFileSystem)
+    // │ └─ FileSystemLog (PathFileSystem)
+    // │    ├─ GravitinoComposedFileSystem (PathFileSystem)
+    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
+    // │    │  │  └─ S3FileSystem (PathFileSystem)
+    // │    │  │     └─ OpenDALFileSystem (PathFileSystem)
+    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
+    // │    │  │  └─ HDFSFileSystem (PathFileSystem)
+    // │    │  │     └─ OpenDALFileSystem (PathFileSystem)
+    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
+    // │    │  │  └─ JuiceFileSystem (PathFileSystem)
+    // │    │  │     └─ NasFileSystem (PathFileSystem)
+    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
+    // │    │  │  └─ XXXFileSystem (PathFileSystem)
+    //
+    // `SimpleFileSystem` is a low-level filesystem designed to communicate 
with FUSE APIs.
+    // It manages file and directory relationships, as well as file mappings.
+    // It delegates file operations to the PathFileSystem
+    //
+    // `FileSystemLog` is a decorator that adds extra debug logging 
functionality to file system APIs.
+    // Similar implementations include permissions, caching, and metrics.
+    //
+    // `GravitinoComposeFileSystem` is a composite file system that can 
combine multiple `GravitinoFilesetFileSystem`.
+    // It use the part of catalog and schema of fileset path to a find actual 
GravitinoFilesetFileSystem. delegate the operation to the real storage.
+    // If the user only mounts a fileset, this layer is not present. There 
will only be one below layer.
+    //
+    // `GravitinoFilesetFileSystem` is a file system that can access a 
fileset.It translates the fileset path to the real storage path.
+    // and delegate the operation to the real storage.
+    //
+    // `OpenDALFileSystem` is a file system that use the OpenDAL to access 
real storage.
+    // it can assess the S3, HDFS, gcs, azblob and other storage.
+    //
+    // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access 
S3 storage.
+    //
+    // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to 
access HDFS storage.
+    //
+    // `NasFileSystem` is a filesystem that uses a locally accessible path 
mounted by NAS tools, such as JuiceFS.
+    //
+    // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS 
storage.
+    //
+    // `XXXFileSystem is a filesystem that allows you to implement file access 
through your own extensions.
+
+    let client = GravitinoClient::new(&config.gravitino);
+
+    let (catalog, schema, fileset) = extract_fileset(mount_from)?;
+    let location = client
+        .get_fileset(&catalog, &schema, &fileset)
+        .await?
+        .storage_location;
+    let (_schema, location) = extract_storage_filesystem(&location).unwrap();
+
+    // todo need to replace the inner filesystem with the real storage 
filesystem
+    let inner_fs = MemoryFileSystem::new().await;
+
+    let fs = GravitinoFilesetFileSystem::new(
+        Box::new(inner_fs),
+        Path::new(&location),
+        client,
+        config,
+        fs_context,
+    )
+    .await;
+    Ok(CreateFileSystemResult::Gvfs(fs))
+}
+
+pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> {
+    if !path.starts_with(FILESET_PREFIX) {
+        return Err(InvalidConfig.to_error("Invalid fileset path".to_string()));
+    }
+
+    let path_without_prefix = &path[FILESET_PREFIX.len()..];
+
+    let parts: Vec<&str> = path_without_prefix.split('/').collect();
+
+    if parts.len() != 3 {
+        return Err(InvalidConfig.to_error("Invalid fileset path".to_string()));
+    }
+    // todo handle mount catalog or schema
+
+    let catalog = parts[1].to_string();
+    let schema = parts[2].to_string();
+    let fileset = parts[3].to_string();
+
+    Ok((catalog, schema, fileset))
+}
+
+pub fn extract_storage_filesystem(path: &str) -> Option<(FileSystemSchema, 
String)> {
+    // todo need to improve the logic
+    if let Some(pos) = path.find("://") {
+        let protocol = &path[..pos];
+        let location = &path[pos + 3..];
+        let location = match location.find('/') {
+            Some(index) => &location[index + 1..],
+            None => "",
+        };
+        let location = match location.ends_with('/') {
+            true => location.to_string(),
+            false => format!("{}/", location),
+        };
+
+        match protocol {
+            "s3" => Some((FileSystemSchema::S3, location.to_string())),
+            "s3a" => Some((FileSystemSchema::S3, location.to_string())),
+            _ => None,
+        }
+    } else {
+        None
+    }
+}
diff --git a/clients/filesystem-fuse/src/lib.rs 
b/clients/filesystem-fuse/src/lib.rs
index 36e8c28d3..5532d619e 100644
--- a/clients/filesystem-fuse/src/lib.rs
+++ b/clients/filesystem-fuse/src/lib.rs
@@ -16,20 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::config::AppConfig;
+use crate::utils::GvfsResult;
+
+pub mod config;
 mod default_raw_filesystem;
+mod error;
 mod filesystem;
 mod fuse_api_handle;
 mod fuse_server;
+mod gravitino_client;
+mod gravitino_fileset_filesystem;
+mod gvfs_fuse;
 mod memory_filesystem;
-mod mount;
 mod opened_file;
 mod opened_file_manager;
 mod utils;
 
-pub async fn gvfs_mount(mount_point: &str) -> fuse3::Result<()> {
-    mount::mount(mount_point).await
+pub async fn gvfs_mount(mount_to: &str, mount_from: &str, config: &AppConfig) 
-> GvfsResult<()> {
+    gvfs_fuse::mount(mount_to, mount_from, config).await
 }
 
-pub async fn gvfs_unmount() {
-    mount::unmount().await;
+pub async fn gvfs_unmount() -> GvfsResult<()> {
+    gvfs_fuse::unmount().await
 }
diff --git a/clients/filesystem-fuse/src/main.rs 
b/clients/filesystem-fuse/src/main.rs
index 28866a9bb..8eab5ec0d 100644
--- a/clients/filesystem-fuse/src/main.rs
+++ b/clients/filesystem-fuse/src/main.rs
@@ -16,18 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use fuse3::Errno;
+use gvfs_fuse::config::AppConfig;
 use gvfs_fuse::{gvfs_mount, gvfs_unmount};
-use log::info;
+use log::{error, info};
 use tokio::signal;
 
 #[tokio::main]
 async fn main() -> fuse3::Result<()> {
     tracing_subscriber::fmt().init();
-    tokio::spawn(async { gvfs_mount("gvfs").await });
+
+    //todo(read config file from args)
+    let config = AppConfig::from_file(Some("conf/gvfs_fuse.toml"));
+    if let Err(e) = &config {
+        error!("Failed to load config: {:?}", e);
+        return Err(Errno::from(libc::EINVAL));
+    }
+    let config = config.unwrap();
+    let handle = tokio::spawn(async move { gvfs_mount("gvfs", "", 
&config).await });
 
     let _ = signal::ctrl_c().await;
     info!("Received Ctrl+C, Unmounting gvfs...");
-    gvfs_unmount().await;
 
+    if let Err(e) = handle.await {
+        error!("Failed to mount gvfs: {:?}", e);
+        return Err(Errno::from(libc::EINVAL));
+    }
+
+    let _ = gvfs_unmount().await;
     Ok(())
 }
diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs 
b/clients/filesystem-fuse/src/memory_filesystem.rs
index ca3f13fd9..b94d16b8d 100644
--- a/clients/filesystem-fuse/src/memory_filesystem.rs
+++ b/clients/filesystem-fuse/src/memory_filesystem.rs
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::filesystem::{FileReader, FileStat, FileWriter, PathFileSystem, 
Result};
+use crate::filesystem::{
+    FileReader, FileStat, FileSystemCapacity, FileWriter, PathFileSystem, 
Result,
+};
 use crate::opened_file::{OpenFileFlags, OpenedFile};
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -193,6 +195,10 @@ impl PathFileSystem for MemoryFileSystem {
         }
         Ok(())
     }
+
+    fn get_capacity(&self) -> Result<FileSystemCapacity> {
+        Ok(FileSystemCapacity {})
+    }
 }
 
 pub(crate) struct MemoryFileReader {
diff --git a/clients/filesystem-fuse/src/mount.rs 
b/clients/filesystem-fuse/src/mount.rs
deleted file mode 100644
index 102e24016..000000000
--- a/clients/filesystem-fuse/src/mount.rs
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-use crate::default_raw_filesystem::DefaultRawFileSystem;
-use crate::filesystem::FileSystemContext;
-use crate::fuse_api_handle::FuseApiHandle;
-use crate::fuse_server::FuseServer;
-use crate::memory_filesystem::MemoryFileSystem;
-use fuse3::raw::Filesystem;
-use log::info;
-use once_cell::sync::Lazy;
-use std::sync::Arc;
-use tokio::sync::Mutex;
-
-static SERVER: Lazy<Mutex<Option<Arc<FuseServer>>>> = Lazy::new(|| 
Mutex::new(None));
-
-pub async fn mount(mount_point: &str) -> fuse3::Result<()> {
-    info!("Starting gvfs-fuse server...");
-    let svr = Arc::new(FuseServer::new(mount_point));
-    {
-        let mut server = SERVER.lock().await;
-        *server = Some(svr.clone());
-    }
-    let fs = create_fuse_fs().await;
-    svr.start(fs).await
-}
-
-pub async fn unmount() {
-    info!("Stop gvfs-fuse server...");
-    let svr = {
-        let mut server = SERVER.lock().await;
-        if server.is_none() {
-            info!("Server is already stopped.");
-            return;
-        }
-        server.take().unwrap()
-    };
-    let _ = svr.stop().await;
-}
-
-pub async fn create_fuse_fs() -> impl Filesystem + Sync + 'static {
-    let uid = unsafe { libc::getuid() };
-    let gid = unsafe { libc::getgid() };
-    let fs_context = FileSystemContext {
-        uid: uid,
-        gid: gid,
-        default_file_perm: 0o644,
-        default_dir_perm: 0o755,
-        block_size: 4 * 1024,
-    };
-
-    let gvfs = MemoryFileSystem::new().await;
-    let fs = DefaultRawFileSystem::new(gvfs);
-    FuseApiHandle::new(fs, fs_context)
-}
-
-pub async fn create_gvfs_filesystem() {
-    // Gvfs-fuse filesystem structure:
-    // FuseApiHandle
-    // ├─ DefaultRawFileSystem (RawFileSystem)
-    // │ └─ FileSystemLog (PathFileSystem)
-    // │    ├─ GravitinoComposedFileSystem (PathFileSystem)
-    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
-    // │    │  │  └─ S3FileSystem (PathFileSystem)
-    // │    │  │     └─ OpenDALFileSystem (PathFileSystem)
-    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
-    // │    │  │  └─ HDFSFileSystem (PathFileSystem)
-    // │    │  │     └─ OpenDALFileSystem (PathFileSystem)
-    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
-    // │    │  │  └─ JuiceFileSystem (PathFileSystem)
-    // │    │  │     └─ NasFileSystem (PathFileSystem)
-    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
-    // │    │  │  └─ XXXFileSystem (PathFileSystem)
-    //
-    // `SimpleFileSystem` is a low-level filesystem designed to communicate 
with FUSE APIs.
-    // It manages file and directory relationships, as well as file mappings.
-    // It delegates file operations to the PathFileSystem
-    //
-    // `FileSystemLog` is a decorator that adds extra debug logging 
functionality to file system APIs.
-    // Similar implementations include permissions, caching, and metrics.
-    //
-    // `GravitinoComposeFileSystem` is a composite file system that can 
combine multiple `GravitinoFilesetFileSystem`.
-    // It use the part of catalog and schema of fileset path to a find actual 
GravitinoFilesetFileSystem. delegate the operation to the real storage.
-    // If the user only mounts a fileset, this layer is not present. There 
will only be one below layer.
-    //
-    // `GravitinoFilesetFileSystem` is a file system that can access a 
fileset.It translates the fileset path to the real storage path.
-    // and delegate the operation to the real storage.
-    //
-    // `OpenDALFileSystem` is a file system that use the OpenDAL to access 
real storage.
-    // it can assess the S3, HDFS, gcs, azblob and other storage.
-    //
-    // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access 
S3 storage.
-    //
-    // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to 
access HDFS storage.
-    //
-    // `NasFileSystem` is a filesystem that uses a locally accessible path 
mounted by NAS tools, such as JuiceFS.
-    //
-    // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS 
storage.
-    //
-    // `XXXFileSystem is a filesystem that allows you to implement file access 
through your own extensions.
-
-    todo!("Implement the createGvfsFuseFileSystem function");
-}
diff --git a/clients/filesystem-fuse/src/utils.rs 
b/clients/filesystem-fuse/src/utils.rs
index 21e52f86a..bbc8d7d7f 100644
--- a/clients/filesystem-fuse/src/utils.rs
+++ b/clients/filesystem-fuse/src/utils.rs
@@ -16,6 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::error::GvfsError;
+
+pub type GvfsResult<T> = Result<T, GvfsError>;
 
 #[cfg(test)]
 mod tests {}
diff --git a/clients/filesystem-fuse/Cargo.toml 
b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
similarity index 54%
copy from clients/filesystem-fuse/Cargo.toml
copy to clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
index 75a4dd713..013df6cfc 100644
--- a/clients/filesystem-fuse/Cargo.toml
+++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
@@ -15,31 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[package]
-name = "filesystem-fuse"
-version = "0.8.0-incubating-SNAPSHOT"
-rust-version = "1.75"
-edition = "2021"
+# fuse settings
+[fuse]
+file_mask= 0o600
+dir_mask= 0o700
+fs_type = "memory"
 
-homepage = "https://gravitino.apache.org";
-license = "Apache-2.0"
-repository = "https://github.com/apache/gravitino";
+[fuse.properties]
+key1 = "value1"
+key2 = "value2"
 
-[[bin]]
-name = "gvfs-fuse"
-path = "src/main.rs"
+# filesystem settings
+[filesystem]
+block_size = 8192
 
-[lib]
-name = "gvfs_fuse"
+# Gravitino settings
+[gravitino]
+uri = "http://localhost:8090";
+metalake = "test"
 
-[dependencies]
-async-trait = "0.1"
-bytes = "1.6.0"
-dashmap = "6.1.0"
-fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] }
-futures-util = "0.3.30"
-libc = "0.2.168"
-log = "0.4.22"
-once_cell = "1.20.2"
-tokio = { version = "1.38.0", features = ["full"] }
-tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+# extent settings
+[extent_config]
+access_key = "XXX_access_key"
+secret_key = "XXX_secret_key"
diff --git a/clients/filesystem-fuse/Cargo.toml 
b/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
similarity index 54%
copy from clients/filesystem-fuse/Cargo.toml
copy to clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
index 75a4dd713..ff7c6936f 100644
--- a/clients/filesystem-fuse/Cargo.toml
+++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
@@ -15,31 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[package]
-name = "filesystem-fuse"
-version = "0.8.0-incubating-SNAPSHOT"
-rust-version = "1.75"
-edition = "2021"
+# fuse settings
+[fuse]
+file_mask= 0o644
+dir_mask= 0o755
+fs_type = "memory"
 
-homepage = "https://gravitino.apache.org";
-license = "Apache-2.0"
-repository = "https://github.com/apache/gravitino";
+[fuse.properties]
+key1 = "value1"
+key2 = "value2"
 
-[[bin]]
-name = "gvfs-fuse"
-path = "src/main.rs"
+# filesystem settings
+[filesystem]
+block_size = 8192
 
-[lib]
-name = "gvfs_fuse"
+# Gravitino settings
+[gravitino]
+uri = "http://localhost:8090";
+metalake = "test"
 
-[dependencies]
-async-trait = "0.1"
-bytes = "1.6.0"
-dashmap = "6.1.0"
-fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] }
-futures-util = "0.3.30"
-libc = "0.2.168"
-log = "0.4.22"
-once_cell = "1.20.2"
-tokio = { version = "1.38.0", features = ["full"] }
-tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+# extent settings
+[extend_config]
+access_key = "XXX_access_key"
+secret_key = "XXX_secret_key"
diff --git a/clients/filesystem-fuse/tests/fuse_test.rs 
b/clients/filesystem-fuse/tests/fuse_test.rs
index 23aafbaf6..e761fabc5 100644
--- a/clients/filesystem-fuse/tests/fuse_test.rs
+++ b/clients/filesystem-fuse/tests/fuse_test.rs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+use gvfs_fuse::config::AppConfig;
 use gvfs_fuse::{gvfs_mount, gvfs_unmount};
 use log::info;
 use std::fs;
@@ -38,15 +39,18 @@ impl FuseTest {
     pub fn setup(&mut self) {
         info!("Start gvfs fuse server");
         let mount_point = self.mount_point.clone();
+
+        let config = 
AppConfig::from_file(Some("tests/conf/gvfs_fuse_memory.toml"))
+            .expect("Failed to load config");
         self.runtime
-            .spawn(async move { gvfs_mount(&mount_point).await });
+            .spawn(async move { gvfs_mount(&mount_point, "", &config).await });
         let success = Self::wait_for_fuse_server_ready(&self.mount_point, 
Duration::from_secs(15));
         assert!(success, "Fuse server cannot start up at 15 seconds");
     }
 
     pub fn shutdown(&mut self) {
         self.runtime.block_on(async {
-            gvfs_unmount().await;
+            let _ = gvfs_unmount().await;
         });
     }
 
@@ -76,7 +80,7 @@ impl Drop for FuseTest {
 fn test_fuse_system_with_auto() {
     tracing_subscriber::fmt().init();
 
-    let mount_point = "build/gvfs";
+    let mount_point = "target/gvfs";
     let _ = fs::create_dir_all(mount_point);
 
     let mut test = FuseTest {

Reply via email to