This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch branch-gvfs-fuse-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-gvfs-fuse-dev by this 
push:
     new ef2b102a1 [#6012] feat (gvfs-fuse): Support  Gravitino S3 fileset 
filesystem operation in gvfs fuse (#6013)
ef2b102a1 is described below

commit ef2b102a1435cb433461fbd0fd370433de23cb73
Author: Yuhui <[email protected]>
AuthorDate: Fri Jan 3 17:03:39 2025 +0800

    [#6012] feat (gvfs-fuse): Support  Gravitino S3 fileset filesystem 
operation in gvfs fuse (#6013)
    
    ### What changes were proposed in this pull request?
    
    Support a Gravitino S3 fileset filesystem operation in gvfs fuse,
    implemented by OpenDal
    
    ### Why are the changes needed?
    
    Fix: #6012
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Manually test
    
    ---------
    
    Co-authored-by: Qiming Teng <[email protected]>
---
 clients/filesystem-fuse/Cargo.toml                 |   1 +
 clients/filesystem-fuse/conf/gvfs_fuse.toml        |   6 +-
 clients/filesystem-fuse/src/config.rs              |   6 +-
 .../filesystem-fuse/src/default_raw_filesystem.rs  |  98 +++++--
 clients/filesystem-fuse/src/error.rs               |   2 +
 clients/filesystem-fuse/src/filesystem.rs          | 109 +++++---
 clients/filesystem-fuse/src/fuse_api_handle.rs     |  12 +-
 clients/filesystem-fuse/src/gravitino_client.rs    |  76 ++++++
 .../src/gravitino_fileset_filesystem.rs            |  57 +++-
 clients/filesystem-fuse/src/gvfs_creator.rs        | 166 ++++++++++++
 clients/filesystem-fuse/src/gvfs_fuse.rs           | 127 +--------
 clients/filesystem-fuse/src/lib.rs                 |   3 +
 clients/filesystem-fuse/src/main.rs                |  32 ++-
 clients/filesystem-fuse/src/memory_filesystem.rs   |  32 ++-
 clients/filesystem-fuse/src/open_dal_filesystem.rs | 297 +++++++++++++++++++++
 clients/filesystem-fuse/src/opened_file.rs         |  26 ++
 clients/filesystem-fuse/src/s3_filesystem.rs       | 276 +++++++++++++++++++
 clients/filesystem-fuse/src/utils.rs               |  29 +-
 .../conf/{gvfs_fuse_test.toml => config_test.toml} |   6 +-
 .../tests/conf/gvfs_fuse_memory.toml               |   8 +-
 .../{gvfs_fuse_memory.toml => gvfs_fuse_s3.toml}   |  11 +-
 clients/filesystem-fuse/tests/fuse_test.rs         |  21 +-
 22 files changed, 1166 insertions(+), 235 deletions(-)

diff --git a/clients/filesystem-fuse/Cargo.toml 
b/clients/filesystem-fuse/Cargo.toml
index 4008ec5ca..3760bd528 100644
--- a/clients/filesystem-fuse/Cargo.toml
+++ b/clients/filesystem-fuse/Cargo.toml
@@ -42,6 +42,7 @@ futures-util = "0.3.30"
 libc = "0.2.168"
 log = "0.4.22"
 once_cell = "1.20.2"
+opendal = { version = "0.46.0", features = ["services-s3"] }
 reqwest = { version = "0.12.9", features = ["json"] }
 serde = { version = "1.0.216", features = ["derive"] }
 tokio = { version = "1.38.0", features = ["full"] }
diff --git a/clients/filesystem-fuse/conf/gvfs_fuse.toml 
b/clients/filesystem-fuse/conf/gvfs_fuse.toml
index 94d3d8560..4bde0e9e1 100644
--- a/clients/filesystem-fuse/conf/gvfs_fuse.toml
+++ b/clients/filesystem-fuse/conf/gvfs_fuse.toml
@@ -32,7 +32,7 @@ block_size = 8192
 uri = "http://localhost:8090";
 metalake = "your_metalake"
 
-# extent settings
+# extend settings
 [extend_config]
-access_key = "your access_key"
-secret_key = "your_secret_key"
+s3-access_key_id = "your access_key"
+s3-secret_access_key = "your_secret_key"
diff --git a/clients/filesystem-fuse/src/config.rs 
b/clients/filesystem-fuse/src/config.rs
index b381caa75..17908fd08 100644
--- a/clients/filesystem-fuse/src/config.rs
+++ b/clients/filesystem-fuse/src/config.rs
@@ -302,18 +302,18 @@ mod test {
 
     #[test]
     fn test_config_from_file() {
-        let config = 
AppConfig::from_file(Some("tests/conf/gvfs_fuse_test.toml")).unwrap();
+        let config = 
AppConfig::from_file(Some("tests/conf/config_test.toml")).unwrap();
         assert_eq!(config.fuse.file_mask, 0o644);
         assert_eq!(config.fuse.dir_mask, 0o755);
         assert_eq!(config.filesystem.block_size, 8192);
         assert_eq!(config.gravitino.uri, "http://localhost:8090";);
         assert_eq!(config.gravitino.metalake, "test");
         assert_eq!(
-            config.extend_config.get("access_key"),
+            config.extend_config.get("s3-access_key_id"),
             Some(&"XXX_access_key".to_string())
         );
         assert_eq!(
-            config.extend_config.get("secret_key"),
+            config.extend_config.get("s3-secret_access_key"),
             Some(&"XXX_secret_key".to_string())
         );
     }
diff --git a/clients/filesystem-fuse/src/default_raw_filesystem.rs 
b/clients/filesystem-fuse/src/default_raw_filesystem.rs
index 0c9836e5b..944181246 100644
--- a/clients/filesystem-fuse/src/default_raw_filesystem.rs
+++ b/clients/filesystem-fuse/src/default_raw_filesystem.rs
@@ -18,10 +18,11 @@
  */
 use crate::config::AppConfig;
 use crate::filesystem::{
-    FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result, 
INITIAL_FILE_ID,
-    ROOT_DIR_FILE_ID, ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
+    FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result, 
FS_META_FILE_ID,
+    FS_META_FILE_NAME, FS_META_FILE_PATH, INITIAL_FILE_ID, ROOT_DIR_FILE_ID,
+    ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
 };
-use crate::opened_file::{FileHandle, OpenFileFlags};
+use crate::opened_file::{FileHandle, OpenFileFlags, OpenedFile};
 use crate::opened_file_manager::OpenedFileManager;
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -78,6 +79,7 @@ impl<T: PathFileSystem> DefaultRawFileSystem<T> {
     }
 
     async fn resolve_file_id_to_filestat(&self, file_stat: &mut FileStat, 
parent_file_id: u64) {
+        debug_assert!(parent_file_id != 0);
         let mut file_manager = self.file_entry_manager.write().await;
         let file_entry = file_manager.get_file_entry_by_path(&file_stat.path);
         match file_entry {
@@ -132,6 +134,21 @@ impl<T: PathFileSystem> DefaultRawFileSystem<T> {
         let mut file_manager = self.file_entry_manager.write().await;
         file_manager.insert(parent_file_id, file_id, path);
     }
+
+    fn get_meta_file_stat(&self) -> FileStat {
+        let mut meta_file_stat =
+            
FileStat::new_file_filestat_with_path(Path::new(FS_META_FILE_PATH), 0);
+        meta_file_stat.set_file_id(ROOT_DIR_FILE_ID, FS_META_FILE_ID);
+        meta_file_stat
+    }
+
+    fn is_meta_file(&self, file_id: u64) -> bool {
+        file_id == FS_META_FILE_ID
+    }
+
+    fn is_meta_file_name(&self, parent_file_id: u64, name: &OsStr) -> bool {
+        parent_file_id == ROOT_DIR_FILE_ID && name == 
OsStr::new(FS_META_FILE_NAME)
+    }
 }
 
 #[async_trait]
@@ -144,6 +161,13 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
             Path::new(ROOT_DIR_PATH),
         )
         .await;
+
+        self.insert_file_entry_locked(
+            ROOT_DIR_FILE_ID,
+            FS_META_FILE_ID,
+            Path::new(FS_META_FILE_PATH),
+        )
+        .await;
         self.fs.init().await
     }
 
@@ -168,6 +192,10 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
     }
 
     async fn stat(&self, file_id: u64) -> Result<FileStat> {
+        if self.is_meta_file(file_id) {
+            return Ok(self.get_meta_file_stat());
+        }
+
         let file_entry = self.get_file_entry(file_id).await?;
         let mut file_stat = self.fs.stat(&file_entry.path).await?;
         file_stat.set_file_id(file_entry.parent_file_id, file_entry.file_id);
@@ -175,8 +203,11 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
     }
 
     async fn lookup(&self, parent_file_id: u64, name: &OsStr) -> 
Result<FileStat> {
-        let parent_file_entry = self.get_file_entry(parent_file_id).await?;
+        if self.is_meta_file_name(parent_file_id, name) {
+            return Ok(self.get_meta_file_stat());
+        }
 
+        let parent_file_entry = self.get_file_entry(parent_file_id).await?;
         let path = parent_file_entry.path.join(name);
         let mut file_stat = self.fs.stat(&path).await?;
         // fill the file id to file stat
@@ -192,10 +223,21 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
         for file_stat in child_filestats.iter_mut() {
             self.resolve_file_id_to_filestat(file_stat, file_id).await;
         }
+
+        if file_id == ROOT_DIR_FILE_ID {
+            child_filestats.push(self.get_meta_file_stat());
+        }
         Ok(child_filestats)
     }
 
     async fn open_file(&self, file_id: u64, flags: u32) -> Result<FileHandle> {
+        if self.is_meta_file(file_id) {
+            let meta_file = OpenedFile::new(self.get_meta_file_stat());
+            let resutl = self.opened_file_manager.put(meta_file);
+            let file = resutl.lock().await;
+            return Ok(file.file_handle());
+        }
+
         self.open_file_internal(file_id, flags, FileType::RegularFile)
             .await
     }
@@ -211,6 +253,10 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
         name: &OsStr,
         flags: u32,
     ) -> Result<FileHandle> {
+        if self.is_meta_file_name(parent_file_id, name) {
+            return Err(Errno::from(libc::EEXIST));
+        }
+
         let parent_file_entry = self.get_file_entry(parent_file_id).await?;
         let mut file_without_id = self
             .fs
@@ -247,11 +293,19 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
     }
 
     async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()> 
{
+        if self.is_meta_file(file_id) {
+            return Ok(());
+        }
+
         let file_entry = self.get_file_entry(file_id).await?;
         self.fs.set_attr(&file_entry.path, file_stat, true).await
     }
 
     async fn remove_file(&self, parent_file_id: u64, name: &OsStr) -> 
Result<()> {
+        if self.is_meta_file_name(parent_file_id, name) {
+            return Err(Errno::from(libc::EPERM));
+        }
+
         let parent_file_entry = self.get_file_entry(parent_file_id).await?;
         let path = parent_file_entry.path.join(name);
         self.fs.remove_file(&path).await?;
@@ -271,6 +325,15 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
         Ok(())
     }
 
+    async fn flush_file(&self, _file_id: u64, fh: u64) -> Result<()> {
+        let opened_file = self
+            .opened_file_manager
+            .get(fh)
+            .ok_or(Errno::from(libc::EBADF))?;
+        let mut file = opened_file.lock().await;
+        file.flush().await
+    }
+
     async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> {
         let opened_file = self
             .opened_file_manager
@@ -280,7 +343,11 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
         file.close().await
     }
 
-    async fn read(&self, _file_id: u64, fh: u64, offset: u64, size: u32) -> 
Result<Bytes> {
+    async fn read(&self, file_id: u64, fh: u64, offset: u64, size: u32) -> 
Result<Bytes> {
+        if self.is_meta_file(file_id) {
+            return Ok(Bytes::new());
+        }
+
         let (data, file_stat) = {
             let opened_file = self
                 .opened_file_manager
@@ -297,7 +364,11 @@ impl<T: PathFileSystem> RawFileSystem for 
DefaultRawFileSystem<T> {
         data
     }
 
-    async fn write(&self, _file_id: u64, fh: u64, offset: u64, data: &[u8]) -> 
Result<u32> {
+    async fn write(&self, file_id: u64, fh: u64, offset: u64, data: &[u8]) -> 
Result<u32> {
+        if self.is_meta_file(file_id) {
+            return Err(Errno::from(libc::EPERM));
+        }
+
         let (len, file_stat) = {
             let opened_file = self
                 .opened_file_manager
@@ -368,8 +439,6 @@ impl FileEntryManager {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::filesystem::tests::TestRawFileSystem;
-    use crate::memory_filesystem::MemoryFileSystem;
 
     #[test]
     fn test_file_entry_manager() {
@@ -389,17 +458,4 @@ mod tests {
         assert!(manager.get_file_entry_by_id(2).is_none());
         assert!(manager.get_file_entry_by_path(Path::new("a/b")).is_none());
     }
-
-    #[tokio::test]
-    async fn test_default_raw_file_system() {
-        let memory_fs = MemoryFileSystem::new().await;
-        let raw_fs = DefaultRawFileSystem::new(
-            memory_fs,
-            &AppConfig::default(),
-            &FileSystemContext::default(),
-        );
-        let _ = raw_fs.init().await;
-        let mut tester = TestRawFileSystem::new(raw_fs);
-        tester.test_raw_file_system().await;
-    }
 }
diff --git a/clients/filesystem-fuse/src/error.rs 
b/clients/filesystem-fuse/src/error.rs
index ba3c037c5..7e38e4687 100644
--- a/clients/filesystem-fuse/src/error.rs
+++ b/clients/filesystem-fuse/src/error.rs
@@ -24,6 +24,7 @@ pub enum ErrorCode {
     GravitinoClientError,
     InvalidConfig,
     ConfigNotFound,
+    OpenDalError,
 }
 
 impl ErrorCode {
@@ -39,6 +40,7 @@ impl std::fmt::Display for ErrorCode {
             ErrorCode::GravitinoClientError => write!(f, "Gravitino client 
error"),
             ErrorCode::InvalidConfig => write!(f, "Invalid config"),
             ErrorCode::ConfigNotFound => write!(f, "Config not found"),
+            ErrorCode::OpenDalError => write!(f, "OpenDal error"),
         }
     }
 }
diff --git a/clients/filesystem-fuse/src/filesystem.rs 
b/clients/filesystem-fuse/src/filesystem.rs
index 742cdd4c8..dcf35f8eb 100644
--- a/clients/filesystem-fuse/src/filesystem.rs
+++ b/clients/filesystem-fuse/src/filesystem.rs
@@ -36,6 +36,11 @@ pub(crate) const ROOT_DIR_NAME: &str = "";
 pub(crate) const ROOT_DIR_PATH: &str = "/";
 pub(crate) const INITIAL_FILE_ID: u64 = 10000;
 
+// File system meta file is indicated the fuse filesystem is active.
+pub(crate) const FS_META_FILE_PATH: &str = "/.gvfs_meta";
+pub(crate) const FS_META_FILE_NAME: &str = ".gvfs_meta";
+pub(crate) const FS_META_FILE_ID: u64 = 10;
+
 /// RawFileSystem interface for the file system implementation. it use by 
FuseApiHandle,
 /// it ues the file id to operate the file system apis
 /// the `file_id` and `parent_file_id` it is the unique identifier for the 
file system,
@@ -89,6 +94,9 @@ pub(crate) trait RawFileSystem: Send + Sync {
     /// Remove the directory by parent file id and file name
     async fn remove_dir(&self, parent_file_id: u64, name: &OsStr) -> 
Result<()>;
 
+    /// flush the file with file id and file handle, if successful return Ok
+    async fn flush_file(&self, file_id: u64, fh: u64) -> Result<()>;
+
     /// Close the file by file id and file handle, if successful
     async fn close_file(&self, file_id: u64, fh: u64) -> Result<()>;
 
@@ -289,57 +297,53 @@ pub trait FileWriter: Sync + Send {
 #[cfg(test)]
 pub(crate) mod tests {
     use super::*;
+    use libc::{O_APPEND, O_CREAT, O_RDONLY};
     use std::collections::HashMap;
+    use std::path::Component;
 
     pub(crate) struct TestPathFileSystem<F: PathFileSystem> {
         files: HashMap<PathBuf, FileStat>,
         fs: F,
+        cwd: PathBuf,
     }
 
     impl<F: PathFileSystem> TestPathFileSystem<F> {
-        pub(crate) fn new(fs: F) -> Self {
+        pub(crate) fn new(cwd: &Path, fs: F) -> Self {
             Self {
                 files: HashMap::new(),
                 fs,
+                cwd: cwd.into(),
             }
         }
 
         pub(crate) async fn test_path_file_system(&mut self) {
-            // Test root dir
-            self.test_root_dir().await;
+            // test root dir
+            let resutl = self.fs.stat(Path::new("/")).await;
+            assert!(resutl.is_ok());
+            let root_file_stat = resutl.unwrap();
+            self.assert_file_stat(&root_file_stat, Path::new("/"), Directory, 
0);
 
-            // Test stat file
-            self.test_stat_file(Path::new("/.gvfs_meta"), RegularFile, 0)
-                .await;
+            // test list root dir
+            let result = self.fs.read_dir(Path::new("/")).await;
+            assert!(result.is_ok());
 
             // Test create file
-            self.test_create_file(Path::new("/file1.txt")).await;
+            self.test_create_file(&self.cwd.join("file1.txt")).await;
 
             // Test create dir
-            self.test_create_dir(Path::new("/dir1")).await;
+            self.test_create_dir(&self.cwd.join("dir1")).await;
 
             // Test list dir
-            self.test_list_dir(Path::new("/")).await;
+            self.test_list_dir(&self.cwd).await;
 
             // Test remove file
-            self.test_remove_file(Path::new("/file1.txt")).await;
+            self.test_remove_file(&self.cwd.join("file1.txt")).await;
 
             // Test remove dir
-            self.test_remove_dir(Path::new("/dir1")).await;
+            self.test_remove_dir(&self.cwd.join("dir1")).await;
 
             // Test file not found
-            self.test_file_not_found(Path::new("unknown")).await;
-
-            // Test list dir
-            self.test_list_dir(Path::new("/")).await;
-        }
-
-        async fn test_root_dir(&mut self) {
-            let root_dir_path = Path::new("/");
-            let root_file_stat = self.fs.stat(root_dir_path).await;
-            assert!(root_file_stat.is_ok());
-            let root_file_stat = root_file_stat.unwrap();
-            self.assert_file_stat(&root_file_stat, root_dir_path, Directory, 
0);
+            self.test_file_not_found(&self.cwd.join("unknown")).await;
         }
 
         async fn test_stat_file(&mut self, path: &Path, expect_kind: FileType, 
expect_size: u64) {
@@ -370,7 +374,6 @@ pub(crate) mod tests {
             let list_dir = self.fs.read_dir(path).await;
             assert!(list_dir.is_ok());
             let list_dir = list_dir.unwrap();
-            assert_eq!(list_dir.len(), self.files.len());
             for file_stat in list_dir {
                 assert!(self.files.contains_key(&file_stat.path));
                 let actual_file_stat = 
self.files.get(&file_stat.path).unwrap();
@@ -414,13 +417,15 @@ pub(crate) mod tests {
     pub(crate) struct TestRawFileSystem<F: RawFileSystem> {
         fs: F,
         files: HashMap<u64, FileStat>,
+        cwd: PathBuf,
     }
 
     impl<F: RawFileSystem> TestRawFileSystem<F> {
-        pub(crate) fn new(fs: F) -> Self {
+        pub(crate) fn new(cwd: &Path, fs: F) -> Self {
             Self {
                 fs,
                 files: HashMap::new(),
+                cwd: cwd.into(),
             }
         }
 
@@ -431,31 +436,45 @@ pub(crate) mod tests {
             // test read root dir
             self.test_list_dir(ROOT_DIR_FILE_ID, false).await;
 
-            let parent_file_id = ROOT_DIR_FILE_ID;
-            // Test lookup file
+            // Test lookup meta file
             let file_id = self
-                .test_lookup_file(parent_file_id, ".gvfs_meta".as_ref(), 
RegularFile, 0)
+                .test_lookup_file(ROOT_DIR_FILE_ID, ".gvfs_meta".as_ref(), 
RegularFile, 0)
                 .await;
 
-            // Test get file stat
+            // Test get meta file stat
             self.test_stat_file(file_id, Path::new("/.gvfs_meta"), 
RegularFile, 0)
                 .await;
 
             // Test get file path
             self.test_get_file_path(file_id, "/.gvfs_meta").await;
 
-            // Test create file
-            self.test_create_file(parent_file_id, "file1.txt".as_ref())
-                .await;
+            // get cwd file id
+            let mut parent_file_id = ROOT_DIR_FILE_ID;
+            for child in self.cwd.components() {
+                if child == Component::RootDir {
+                    continue;
+                }
+                let file_id = self.fs.create_dir(parent_file_id, 
child.as_os_str()).await;
+                assert!(file_id.is_ok());
+                parent_file_id = file_id.unwrap();
+            }
 
-            // Test open file
+            // Test create file
             let file_handle = self
-                .test_open_file(parent_file_id, "file1.txt".as_ref())
+                .test_create_file(parent_file_id, "file1.txt".as_ref())
                 .await;
 
             // Test write file
             self.test_write_file(&file_handle, "test").await;
 
+            // Test close file
+            self.test_close_file(&file_handle).await;
+
+            // Test open file with read
+            let file_handle = self
+                .test_open_file(parent_file_id, "file1.txt".as_ref(), O_RDONLY 
as u32)
+                .await;
+
             // Test read file
             self.test_read_file(&file_handle, "test").await;
 
@@ -526,8 +545,11 @@ pub(crate) mod tests {
             self.files.insert(file_stat.file_id, file_stat);
         }
 
-        async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) {
-            let file = self.fs.create_file(root_file_id, name, 0).await;
+        async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) 
-> FileHandle {
+            let file = self
+                .fs
+                .create_file(root_file_id, name, (O_CREAT | O_APPEND) as u32)
+                .await;
             assert!(file.is_ok());
             let file = file.unwrap();
             assert!(file.handle_id > 0);
@@ -537,11 +559,12 @@ pub(crate) mod tests {
 
             self.test_stat_file(file.file_id, &file_stat.unwrap().path, 
RegularFile, 0)
                 .await;
+            file
         }
 
-        async fn test_open_file(&self, root_file_id: u64, name: &OsStr) -> 
FileHandle {
+        async fn test_open_file(&self, root_file_id: u64, name: &OsStr, flags: 
u32) -> FileHandle {
             let file = self.fs.lookup(root_file_id, name).await.unwrap();
-            let file_handle = self.fs.open_file(file.file_id, 0).await;
+            let file_handle = self.fs.open_file(file.file_id, flags).await;
             assert!(file_handle.is_ok());
             let file_handle = file_handle.unwrap();
             assert_eq!(file_handle.file_id, file.file_id);
@@ -558,9 +581,16 @@ pub(crate) mod tests {
                     content.as_bytes(),
                 )
                 .await;
+
             assert!(write_size.is_ok());
             assert_eq!(write_size.unwrap(), content.len() as u32);
 
+            let result = self
+                .fs
+                .flush_file(file_handle.file_id, file_handle.handle_id)
+                .await;
+            assert!(result.is_ok());
+
             self.files.get_mut(&file_handle.file_id).unwrap().size = 
content.len() as u64;
         }
 
@@ -606,7 +636,6 @@ pub(crate) mod tests {
             if !check_child {
                 return;
             }
-            assert_eq!(list_dir.len(), self.files.len());
             for file_stat in list_dir {
                 assert!(self.files.contains_key(&file_stat.file_id));
                 let actual_file_stat = 
self.files.get(&file_stat.file_id).unwrap();
@@ -652,7 +681,7 @@ pub(crate) mod tests {
             assert_eq!(file_stat.path, path);
             assert_eq!(file_stat.kind, kind);
             assert_eq!(file_stat.size, size);
-            if file_stat.file_id == 1 {
+            if file_stat.file_id == ROOT_DIR_FILE_ID || file_stat.file_id == 
FS_META_FILE_ID {
                 assert_eq!(file_stat.parent_file_id, 1);
             } else {
                 assert!(file_stat.file_id >= INITIAL_FILE_ID);
diff --git a/clients/filesystem-fuse/src/fuse_api_handle.rs 
b/clients/filesystem-fuse/src/fuse_api_handle.rs
index 153e32389..15679a222 100644
--- a/clients/filesystem-fuse/src/fuse_api_handle.rs
+++ b/clients/filesystem-fuse/src/fuse_api_handle.rs
@@ -227,7 +227,7 @@ impl<T: RawFileSystem> Filesystem for FuseApiHandle<T> {
 
     async fn release(
         &self,
-        _eq: Request,
+        _req: Request,
         inode: Inode,
         fh: u64,
         _flags: u32,
@@ -237,6 +237,16 @@ impl<T: RawFileSystem> Filesystem for FuseApiHandle<T> {
         self.fs.close_file(inode, fh).await
     }
 
+    async fn flush(
+        &self,
+        _req: Request,
+        inode: Inode,
+        fh: u64,
+        _lock_owner: u64,
+    ) -> fuse3::Result<()> {
+        self.fs.flush_file(inode, fh).await
+    }
+
     async fn opendir(&self, _req: Request, inode: Inode, flags: u32) -> 
fuse3::Result<ReplyOpen> {
         let file_handle = self.fs.open_dir(inode, flags).await?;
         Ok(ReplyOpen {
diff --git a/clients/filesystem-fuse/src/gravitino_client.rs 
b/clients/filesystem-fuse/src/gravitino_client.rs
index e5553c9f6..9bdfbb2c2 100644
--- a/clients/filesystem-fuse/src/gravitino_client.rs
+++ b/clients/filesystem-fuse/src/gravitino_client.rs
@@ -48,6 +48,22 @@ struct FileLocationResponse {
     location: String,
 }
 
+#[derive(Debug, Deserialize)]
+pub(crate) struct Catalog {
+    pub(crate) name: String,
+    #[serde(rename = "type")]
+    pub(crate) catalog_type: String,
+    provider: String,
+    comment: String,
+    pub(crate) properties: HashMap<String, String>,
+}
+
+#[derive(Debug, Deserialize)]
+struct CatalogResponse {
+    code: u32,
+    catalog: Catalog,
+}
+
 pub(crate) struct GravitinoClient {
     gravitino_uri: String,
     metalake: String,
@@ -105,6 +121,26 @@ impl GravitinoClient {
         Ok(res)
     }
 
+    pub async fn get_catalog_url(&self, catalog_name: &str) -> String {
+        format!(
+            "{}/api/metalakes/{}/catalogs/{}",
+            self.gravitino_uri, self.metalake, catalog_name
+        )
+    }
+
+    pub async fn get_catalog(&self, catalog_name: &str) -> Result<Catalog, 
GvfsError> {
+        let url = self.get_catalog_url(catalog_name).await;
+        let res = self.do_get::<CatalogResponse>(&url).await?;
+
+        if res.code != 0 {
+            return Err(GvfsError::Error(
+                ErrorCode::GravitinoClientError,
+                "Failed to get catalog".to_string(),
+            ));
+        }
+        Ok(res.catalog)
+    }
+
     pub async fn get_fileset(
         &self,
         catalog_name: &str,
@@ -257,6 +293,46 @@ mod tests {
         }
     }
 
+    #[tokio::test]
+    async fn test_get_catalog_success() {
+        let catalog_response = r#"
+        {
+            "code": 0,
+            "catalog": {
+                "name": "example_catalog",
+                "type": "example_type",
+                "provider": "example_provider",
+                "comment": "This is a test catalog",
+                "properties": {
+                    "key1": "value1",
+                    "key2": "value2"
+                }
+            }
+        }"#;
+
+        let mock_server_url = &mockito::server_url();
+
+        let url = format!("/api/metalakes/{}/catalogs/{}", "test", "catalog1");
+        let _m = mock("GET", url.as_str())
+            .with_status(200)
+            .with_header("content-type", "application/json")
+            .with_body(catalog_response)
+            .create();
+
+        let config = GravitinoConfig {
+            uri: mock_server_url.to_string(),
+            metalake: "test".to_string(),
+        };
+        let client = GravitinoClient::new(&config);
+
+        let result = client.get_catalog("catalog1").await;
+
+        match result {
+            Ok(_) => {}
+            Err(e) => panic!("Expected Ok, but got Err: {:?}", e),
+        }
+    }
+
     async fn get_fileset_example() {
         tracing_subscriber::fmt::init();
         let config = GravitinoConfig {
diff --git a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs 
b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
index 98a295dbb..7da2f572d 100644
--- a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
+++ b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
@@ -30,13 +30,15 @@ use std::path::{Path, PathBuf};
 pub(crate) struct GravitinoFilesetFileSystem {
     physical_fs: Box<dyn PathFileSystem>,
     client: GravitinoClient,
-    fileset_location: PathBuf,
+    // location is a absolute path in the physical filesystem that is 
associated with the fileset.
+    // e.g. fileset location : s3://bucket/path/to/file the location is 
/path/to/file
+    location: PathBuf,
 }
 
 impl GravitinoFilesetFileSystem {
     pub async fn new(
         fs: Box<dyn PathFileSystem>,
-        location: &Path,
+        target_path: &Path,
         client: GravitinoClient,
         _config: &AppConfig,
         _context: &FileSystemContext,
@@ -44,18 +46,25 @@ impl GravitinoFilesetFileSystem {
         Self {
             physical_fs: fs,
             client: client,
-            fileset_location: location.into(),
+            location: target_path.into(),
         }
     }
 
     fn gvfs_path_to_raw_path(&self, path: &Path) -> PathBuf {
-        self.fileset_location.join(path)
+        let relation_path = path.strip_prefix("/").expect("path should start 
with /");
+        if relation_path == Path::new("") {
+            return self.location.clone();
+        }
+        self.location.join(relation_path)
     }
 
     fn raw_path_to_gvfs_path(&self, path: &Path) -> Result<PathBuf> {
-        path.strip_prefix(&self.fileset_location)
+        let stripped_path = path
+            .strip_prefix(&self.location)
             .map_err(|_| Errno::from(libc::EBADF))?;
-        Ok(path.into())
+        let mut result_path = PathBuf::from("/");
+        result_path.push(stripped_path);
+        Ok(result_path)
     }
 }
 
@@ -128,3 +137,39 @@ impl PathFileSystem for GravitinoFilesetFileSystem {
         self.physical_fs.get_capacity()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::config::GravitinoConfig;
+    use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
+    use crate::memory_filesystem::MemoryFileSystem;
+    use std::path::Path;
+
+    #[tokio::test]
+    async fn test_map_fileset_path_to_raw_path() {
+        let fs = GravitinoFilesetFileSystem {
+            physical_fs: Box::new(MemoryFileSystem::new().await),
+            client: super::GravitinoClient::new(&GravitinoConfig::default()),
+            location: "/c1/fileset1".into(),
+        };
+        let path = fs.gvfs_path_to_raw_path(Path::new("/a"));
+        assert_eq!(path, Path::new("/c1/fileset1/a"));
+        let path = fs.gvfs_path_to_raw_path(Path::new("/"));
+        assert_eq!(path, Path::new("/c1/fileset1"));
+    }
+
+    #[tokio::test]
+    async fn test_map_raw_path_to_fileset_path() {
+        let fs = GravitinoFilesetFileSystem {
+            physical_fs: Box::new(MemoryFileSystem::new().await),
+            client: super::GravitinoClient::new(&GravitinoConfig::default()),
+            location: "/c1/fileset1".into(),
+        };
+        let path = fs
+            .raw_path_to_gvfs_path(Path::new("/c1/fileset1/a"))
+            .unwrap();
+        assert_eq!(path, Path::new("/a"));
+        let path = 
fs.raw_path_to_gvfs_path(Path::new("/c1/fileset1")).unwrap();
+        assert_eq!(path, Path::new("/"));
+    }
+}
diff --git a/clients/filesystem-fuse/src/gvfs_creator.rs 
b/clients/filesystem-fuse/src/gvfs_creator.rs
new file mode 100644
index 000000000..aac88ad9d
--- /dev/null
+++ b/clients/filesystem-fuse/src/gvfs_creator.rs
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::AppConfig;
+use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem};
+use crate::filesystem::{FileSystemContext, PathFileSystem};
+use crate::gravitino_client::{Catalog, Fileset, GravitinoClient};
+use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
+use crate::gvfs_fuse::{CreateFileSystemResult, FileSystemSchema};
+use crate::s3_filesystem::S3FileSystem;
+use crate::utils::{extract_root_path, parse_location, GvfsResult};
+
+const GRAVITINO_FILESET_SCHEMA: &str = "gvfs";
+
+pub async fn create_gvfs_filesystem(
+    mount_from: &str,
+    config: &AppConfig,
+    fs_context: &FileSystemContext,
+) -> GvfsResult<CreateFileSystemResult> {
+    // Gvfs-fuse filesystem structure:
+    // FuseApiHandle
+    // ├─ DefaultRawFileSystem (RawFileSystem)
+    // │ └─ FileSystemLog (PathFileSystem)
+    // │    ├─ GravitinoComposedFileSystem (PathFileSystem)
+    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
+    // │    │  │  └─ S3FileSystem (PathFileSystem)
+    // │    │  │     └─ OpenDALFileSystem (PathFileSystem)
+    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
+    // │    │  │  └─ HDFSFileSystem (PathFileSystem)
+    // │    │  │     └─ OpenDALFileSystem (PathFileSystem)
+    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
+    // │    │  │  └─ JuiceFileSystem (PathFileSystem)
+    // │    │  │     └─ NasFileSystem (PathFileSystem)
+    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
+    // │    │  │  └─ XXXFileSystem (PathFileSystem)
+    //
+    // `SimpleFileSystem` is a low-level filesystem designed to communicate 
with FUSE APIs.
+    // It manages file and directory relationships, as well as file mappings.
+    // It delegates file operations to the PathFileSystem
+    //
+    // `FileSystemLog` is a decorator that adds extra debug logging 
functionality to file system APIs.
+    // Similar implementations include permissions, caching, and metrics.
+    //
+    // `GravitinoComposeFileSystem` is a composite file system that can 
combine multiple `GravitinoFilesetFileSystem`.
+    // It use the part of catalog and schema of fileset path to a find actual 
GravitinoFilesetFileSystem. delegate the operation to the real storage.
+    // If the user only mounts a fileset, this layer is not present. There 
will only be one below layer.
+    //
+    // `GravitinoFilesetFileSystem` is a file system that can access a 
fileset.It translates the fileset path to the real storage path.
+    // and delegate the operation to the real storage.
+    //
+    // `OpenDALFileSystem` is a file system that use the OpenDAL to access 
real storage.
+    // it can assess the S3, HDFS, gcs, azblob and other storage.
+    //
+    // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access 
S3 storage.
+    //
+    // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to 
access HDFS storage.
+    //
+    // `NasFileSystem` is a filesystem that uses a locally accessible path 
mounted by NAS tools, such as JuiceFS.
+    //
+    // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS 
storage.
+    //
+    // `XXXFileSystem is a filesystem that allows you to implement file access 
through your own extensions.
+
+    let client = GravitinoClient::new(&config.gravitino);
+
+    let (catalog_name, schema_name, fileset_name) = 
extract_fileset(mount_from)?;
+    let catalog = client.get_catalog(&catalog_name).await?;
+    if catalog.catalog_type != "fileset" {
+        return Err(InvalidConfig.to_error(format!("Catalog {} is not a 
fileset", catalog_name)));
+    }
+    let fileset = client
+        .get_fileset(&catalog_name, &schema_name, &fileset_name)
+        .await?;
+
+    let inner_fs = create_fs_with_fileset(&catalog, &fileset, config, 
fs_context)?;
+
+    let target_path = extract_root_path(fileset.storage_location.as_str())?;
+    let fs =
+        GravitinoFilesetFileSystem::new(inner_fs, &target_path, client, 
config, fs_context).await;
+    Ok(CreateFileSystemResult::Gvfs(fs))
+}
+
+fn create_fs_with_fileset(
+    catalog: &Catalog,
+    fileset: &Fileset,
+    config: &AppConfig,
+    fs_context: &FileSystemContext,
+) -> GvfsResult<Box<dyn PathFileSystem>> {
+    let schema = extract_filesystem_scheme(&fileset.storage_location)?;
+
+    match schema {
+        FileSystemSchema::S3 => Ok(Box::new(S3FileSystem::new(
+            catalog, fileset, config, fs_context,
+        )?)),
+    }
+}
+
+pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> {
+    let path = parse_location(path)?;
+
+    if path.scheme() != GRAVITINO_FILESET_SCHEMA {
+        return Err(InvalidConfig.to_error(format!("Invalid fileset schema: 
{}", path)));
+    }
+
+    let split = path.path_segments();
+    if split.is_none() {
+        return Err(InvalidConfig.to_error(format!("Invalid fileset path: {}", 
path)));
+    }
+    let split = split.unwrap().collect::<Vec<&str>>();
+    if split.len() != 4 {
+        return Err(InvalidConfig.to_error(format!("Invalid fileset path: {}", 
path)));
+    }
+
+    let catalog = split[1].to_string();
+    let schema = split[2].to_string();
+    let fileset = split[3].to_string();
+    Ok((catalog, schema, fileset))
+}
+
+pub fn extract_filesystem_scheme(path: &str) -> GvfsResult<FileSystemSchema> {
+    let url = parse_location(path)?;
+    let scheme = url.scheme();
+
+    match scheme {
+        "s3" => Ok(FileSystemSchema::S3),
+        "s3a" => Ok(FileSystemSchema::S3),
+        _ => Err(UnSupportedFilesystem.to_error(format!("Invalid storage 
schema: {}", path))),
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::gvfs_creator::extract_fileset;
+    use crate::gvfs_fuse::FileSystemSchema;
+
+    #[test]
+    fn test_extract_fileset() {
+        let location = "gvfs://fileset/test/c1/s1/fileset1";
+        let (catalog, schema, fileset) = extract_fileset(location).unwrap();
+        assert_eq!(catalog, "c1");
+        assert_eq!(schema, "s1");
+        assert_eq!(fileset, "fileset1");
+    }
+
+    #[test]
+    fn test_extract_schema() {
+        let location = "s3://bucket/path/to/file";
+        let schema = super::extract_filesystem_scheme(location).unwrap();
+        assert_eq!(schema, FileSystemSchema::S3);
+    }
+}
diff --git a/clients/filesystem-fuse/src/gvfs_fuse.rs 
b/clients/filesystem-fuse/src/gvfs_fuse.rs
index d472895d2..88079e99b 100644
--- a/clients/filesystem-fuse/src/gvfs_fuse.rs
+++ b/clients/filesystem-fuse/src/gvfs_fuse.rs
@@ -18,22 +18,19 @@
  */
 use crate::config::AppConfig;
 use crate::default_raw_filesystem::DefaultRawFileSystem;
-use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem};
+use crate::error::ErrorCode::UnSupportedFilesystem;
 use crate::filesystem::FileSystemContext;
 use crate::fuse_api_handle::FuseApiHandle;
 use crate::fuse_server::FuseServer;
-use crate::gravitino_client::GravitinoClient;
 use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
+use crate::gvfs_creator::create_gvfs_filesystem;
 use crate::memory_filesystem::MemoryFileSystem;
 use crate::utils::GvfsResult;
 use log::info;
 use once_cell::sync::Lazy;
-use std::path::Path;
 use std::sync::Arc;
 use tokio::sync::Mutex;
 
-const FILESET_PREFIX: &str = "gvfs://fileset/";
-
 static SERVER: Lazy<Mutex<Option<Arc<FuseServer>>>> = Lazy::new(|| 
Mutex::new(None));
 
 pub(crate) enum CreateFileSystemResult {
@@ -44,6 +41,7 @@ pub(crate) enum CreateFileSystemResult {
     None,
 }
 
+#[derive(Debug, PartialEq)]
 pub enum FileSystemSchema {
     S3,
 }
@@ -65,7 +63,7 @@ pub async fn mount(mount_to: &str, mount_from: &str, config: 
&AppConfig) -> Gvfs
 }
 
 pub async fn unmount() -> GvfsResult<()> {
-    info!("Stop gvfs-fuse server...");
+    info!("Stopping gvfs-fuse server...");
     let svr = {
         let mut server = SERVER.lock().await;
         if server.is_none() {
@@ -127,120 +125,3 @@ pub async fn create_path_fs(
         create_gvfs_filesystem(mount_from, config, fs_context).await
     }
 }
-
-pub async fn create_gvfs_filesystem(
-    mount_from: &str,
-    config: &AppConfig,
-    fs_context: &FileSystemContext,
-) -> GvfsResult<CreateFileSystemResult> {
-    // Gvfs-fuse filesystem structure:
-    // FuseApiHandle
-    // ├─ DefaultRawFileSystem (RawFileSystem)
-    // │ └─ FileSystemLog (PathFileSystem)
-    // │    ├─ GravitinoComposedFileSystem (PathFileSystem)
-    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
-    // │    │  │  └─ S3FileSystem (PathFileSystem)
-    // │    │  │     └─ OpenDALFileSystem (PathFileSystem)
-    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
-    // │    │  │  └─ HDFSFileSystem (PathFileSystem)
-    // │    │  │     └─ OpenDALFileSystem (PathFileSystem)
-    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
-    // │    │  │  └─ JuiceFileSystem (PathFileSystem)
-    // │    │  │     └─ NasFileSystem (PathFileSystem)
-    // │    │  ├─ GravitinoFilesetFileSystem (PathFileSystem)
-    // │    │  │  └─ XXXFileSystem (PathFileSystem)
-    //
-    // `SimpleFileSystem` is a low-level filesystem designed to communicate 
with FUSE APIs.
-    // It manages file and directory relationships, as well as file mappings.
-    // It delegates file operations to the PathFileSystem
-    //
-    // `FileSystemLog` is a decorator that adds extra debug logging 
functionality to file system APIs.
-    // Similar implementations include permissions, caching, and metrics.
-    //
-    // `GravitinoComposeFileSystem` is a composite file system that can 
combine multiple `GravitinoFilesetFileSystem`.
-    // It use the part of catalog and schema of fileset path to a find actual 
GravitinoFilesetFileSystem. delegate the operation to the real storage.
-    // If the user only mounts a fileset, this layer is not present. There 
will only be one below layer.
-    //
-    // `GravitinoFilesetFileSystem` is a file system that can access a 
fileset.It translates the fileset path to the real storage path.
-    // and delegate the operation to the real storage.
-    //
-    // `OpenDALFileSystem` is a file system that use the OpenDAL to access 
real storage.
-    // it can assess the S3, HDFS, gcs, azblob and other storage.
-    //
-    // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access 
S3 storage.
-    //
-    // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to 
access HDFS storage.
-    //
-    // `NasFileSystem` is a filesystem that uses a locally accessible path 
mounted by NAS tools, such as JuiceFS.
-    //
-    // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS 
storage.
-    //
-    // `XXXFileSystem is a filesystem that allows you to implement file access 
through your own extensions.
-
-    let client = GravitinoClient::new(&config.gravitino);
-
-    let (catalog, schema, fileset) = extract_fileset(mount_from)?;
-    let location = client
-        .get_fileset(&catalog, &schema, &fileset)
-        .await?
-        .storage_location;
-    let (_schema, location) = extract_storage_filesystem(&location).unwrap();
-
-    // todo need to replace the inner filesystem with the real storage 
filesystem
-    let inner_fs = MemoryFileSystem::new().await;
-
-    let fs = GravitinoFilesetFileSystem::new(
-        Box::new(inner_fs),
-        Path::new(&location),
-        client,
-        config,
-        fs_context,
-    )
-    .await;
-    Ok(CreateFileSystemResult::Gvfs(fs))
-}
-
-pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> {
-    if !path.starts_with(FILESET_PREFIX) {
-        return Err(InvalidConfig.to_error("Invalid fileset path".to_string()));
-    }
-
-    let path_without_prefix = &path[FILESET_PREFIX.len()..];
-
-    let parts: Vec<&str> = path_without_prefix.split('/').collect();
-
-    if parts.len() != 3 {
-        return Err(InvalidConfig.to_error("Invalid fileset path".to_string()));
-    }
-    // todo handle mount catalog or schema
-
-    let catalog = parts[1].to_string();
-    let schema = parts[2].to_string();
-    let fileset = parts[3].to_string();
-
-    Ok((catalog, schema, fileset))
-}
-
-pub fn extract_storage_filesystem(path: &str) -> Option<(FileSystemSchema, 
String)> {
-    // todo need to improve the logic
-    if let Some(pos) = path.find("://") {
-        let protocol = &path[..pos];
-        let location = &path[pos + 3..];
-        let location = match location.find('/') {
-            Some(index) => &location[index + 1..],
-            None => "",
-        };
-        let location = match location.ends_with('/') {
-            true => location.to_string(),
-            false => format!("{}/", location),
-        };
-
-        match protocol {
-            "s3" => Some((FileSystemSchema::S3, location.to_string())),
-            "s3a" => Some((FileSystemSchema::S3, location.to_string())),
-            _ => None,
-        }
-    } else {
-        None
-    }
-}
diff --git a/clients/filesystem-fuse/src/lib.rs 
b/clients/filesystem-fuse/src/lib.rs
index 5532d619e..31e7c7fd8 100644
--- a/clients/filesystem-fuse/src/lib.rs
+++ b/clients/filesystem-fuse/src/lib.rs
@@ -27,10 +27,13 @@ mod fuse_api_handle;
 mod fuse_server;
 mod gravitino_client;
 mod gravitino_fileset_filesystem;
+mod gvfs_creator;
 mod gvfs_fuse;
 mod memory_filesystem;
+mod open_dal_filesystem;
 mod opened_file;
 mod opened_file_manager;
+mod s3_filesystem;
 mod utils;
 
 pub async fn gvfs_mount(mount_to: &str, mount_from: &str, config: &AppConfig) 
-> GvfsResult<()> {
diff --git a/clients/filesystem-fuse/src/main.rs 
b/clients/filesystem-fuse/src/main.rs
index 8eab5ec0d..3534e0334 100644
--- a/clients/filesystem-fuse/src/main.rs
+++ b/clients/filesystem-fuse/src/main.rs
@@ -26,21 +26,37 @@ use tokio::signal;
 async fn main() -> fuse3::Result<()> {
     tracing_subscriber::fmt().init();
 
+    // todo need inmprove the args parsing
+    let args: Vec<String> = std::env::args().collect();
+    let (mount_point, mount_from, config_path) = match args.len() {
+        4 => (args[1].clone(), args[2].clone(), args[3].clone()),
+        _ => {
+            error!("Usage: {} <mount_point> <mount_from> <config>", args[0]);
+            return Err(Errno::from(libc::EINVAL));
+        }
+    };
+
     //todo(read config file from args)
-    let config = AppConfig::from_file(Some("conf/gvfs_fuse.toml"));
+    let config = AppConfig::from_file(Some(&config_path));
     if let Err(e) = &config {
         error!("Failed to load config: {:?}", e);
         return Err(Errno::from(libc::EINVAL));
     }
     let config = config.unwrap();
-    let handle = tokio::spawn(async move { gvfs_mount("gvfs", "", 
&config).await });
-
-    let _ = signal::ctrl_c().await;
-    info!("Received Ctrl+C, Unmounting gvfs...");
+    let handle = tokio::spawn(async move {
+        let result = gvfs_mount(&mount_point, &mount_from, &config).await;
+        if let Err(e) = result {
+            error!("Failed to mount gvfs: {:?}", e);
+            return Err(Errno::from(libc::EINVAL));
+        }
+        Ok(())
+    });
 
-    if let Err(e) = handle.await {
-        error!("Failed to mount gvfs: {:?}", e);
-        return Err(Errno::from(libc::EINVAL));
+    tokio::select! {
+        _ = handle => {}
+        _ = signal::ctrl_c() => {
+            info!("Received Ctrl+C, unmounting gvfs...");
+        }
     }
 
     let _ = gvfs_unmount().await;
diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs 
b/clients/filesystem-fuse/src/memory_filesystem.rs
index b94d16b8d..f56e65ea3 100644
--- a/clients/filesystem-fuse/src/memory_filesystem.rs
+++ b/clients/filesystem-fuse/src/memory_filesystem.rs
@@ -42,8 +42,6 @@ pub struct MemoryFileSystem {
 }
 
 impl MemoryFileSystem {
-    const FS_META_FILE_NAME: &'static str = "/.gvfs_meta";
-
     pub(crate) async fn new() -> Self {
         Self {
             file_map: RwLock::new(Default::default()),
@@ -69,16 +67,6 @@ impl PathFileSystem for MemoryFileSystem {
         };
         let root_path = PathBuf::from("/");
         self.file_map.write().unwrap().insert(root_path, root_file);
-
-        let meta_file = MemoryFile {
-            kind: RegularFile,
-            data: Arc::new(Mutex::new(Vec::new())),
-        };
-        let meta_file_path = Path::new(Self::FS_META_FILE_NAME).to_path_buf();
-        self.file_map
-            .write()
-            .unwrap()
-            .insert(meta_file_path, meta_file);
         Ok(())
     }
 
@@ -248,7 +236,10 @@ fn path_in_dir(dir: &Path, path: &Path) -> bool {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::filesystem::tests::TestPathFileSystem;
+    use crate::config::AppConfig;
+    use crate::default_raw_filesystem::DefaultRawFileSystem;
+    use crate::filesystem::tests::{TestPathFileSystem, TestRawFileSystem};
+    use crate::filesystem::{FileSystemContext, RawFileSystem};
 
     #[test]
     fn test_path_in_dir() {
@@ -281,7 +272,20 @@ mod tests {
     async fn test_memory_file_system() {
         let fs = MemoryFileSystem::new().await;
         let _ = fs.init().await;
-        let mut tester = TestPathFileSystem::new(fs);
+        let mut tester = TestPathFileSystem::new(Path::new("/ab"), fs);
         tester.test_path_file_system().await;
     }
+
+    #[tokio::test]
+    async fn test_memory_file_system_with_raw_file_system() {
+        let memory_fs = MemoryFileSystem::new().await;
+        let raw_fs = DefaultRawFileSystem::new(
+            memory_fs,
+            &AppConfig::default(),
+            &FileSystemContext::default(),
+        );
+        let _ = raw_fs.init().await;
+        let mut tester = TestRawFileSystem::new(Path::new("/ab"), raw_fs);
+        tester.test_raw_file_system().await;
+    }
 }
diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs 
b/clients/filesystem-fuse/src/open_dal_filesystem.rs
new file mode 100644
index 000000000..e53fbaf60
--- /dev/null
+++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::AppConfig;
+use crate::filesystem::{
+    FileReader, FileStat, FileSystemCapacity, FileSystemContext, FileWriter, 
PathFileSystem, Result,
+};
+use crate::opened_file::{OpenFileFlags, OpenedFile};
+use async_trait::async_trait;
+use bytes::Bytes;
+use fuse3::FileType::{Directory, RegularFile};
+use fuse3::{Errno, FileType, Timestamp};
+use log::error;
+use opendal::{EntryMode, ErrorKind, Metadata, Operator};
+use std::path::{Path, PathBuf};
+use std::time::SystemTime;
+
+pub(crate) struct OpenDalFileSystem {
+    op: Operator,
+}
+
+impl OpenDalFileSystem {}
+
+impl OpenDalFileSystem {
+    pub(crate) fn new(op: Operator, _config: &AppConfig, _fs_context: 
&FileSystemContext) -> Self {
+        Self { op: op }
+    }
+
+    fn opendal_meta_to_file_stat(&self, meta: &Metadata, file_stat: &mut 
FileStat) {
+        let now = SystemTime::now();
+        let mtime = meta.last_modified().map(|x| x.into()).unwrap_or(now);
+
+        file_stat.size = meta.content_length();
+        file_stat.kind = opendal_filemode_to_filetype(meta.mode());
+        file_stat.ctime = Timestamp::from(mtime);
+        file_stat.atime = Timestamp::from(now);
+        file_stat.mtime = Timestamp::from(mtime);
+    }
+}
+
+#[async_trait]
+impl PathFileSystem for OpenDalFileSystem {
+    async fn init(&self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn stat(&self, path: &Path) -> Result<FileStat> {
+        let file_name = path.to_string_lossy().to_string();
+        let meta_result = self.op.stat(&file_name).await;
+
+        // path may be a directory, so try to stat it as a directory
+        let meta = match meta_result {
+            Ok(meta) => meta,
+            Err(err) => {
+                if err.kind() == ErrorKind::NotFound {
+                    let dir_name = build_dir_path(path);
+                    self.op
+                        .stat(&dir_name)
+                        .await
+                        .map_err(opendal_error_to_errno)?
+                } else {
+                    return Err(opendal_error_to_errno(err));
+                }
+            }
+        };
+
+        let mut file_stat = FileStat::new_file_filestat_with_path(path, 0);
+        self.opendal_meta_to_file_stat(&meta, &mut file_stat);
+
+        Ok(file_stat)
+    }
+
+    async fn read_dir(&self, path: &Path) -> Result<Vec<FileStat>> {
+        // dir name should end with '/' in opendal.
+        let dir_name = build_dir_path(path);
+        let entries = self
+            .op
+            .list(&dir_name)
+            .await
+            .map_err(opendal_error_to_errno)?;
+        entries
+            .iter()
+            .map(|entry| {
+                let mut path = PathBuf::from(path);
+                path.push(entry.name());
+
+                let mut file_stat = 
FileStat::new_file_filestat_with_path(&path, 0);
+                self.opendal_meta_to_file_stat(entry.metadata(), &mut 
file_stat);
+                Ok(file_stat)
+            })
+            .collect()
+    }
+
+    async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
+        let file_stat = self.stat(path).await?;
+        debug_assert!(file_stat.kind == RegularFile);
+
+        let mut file = OpenedFile::new(file_stat);
+        let file_name = path.to_string_lossy().to_string();
+        if flags.is_read() {
+            let reader = self
+                .op
+                .reader_with(&file_name)
+                .await
+                .map_err(opendal_error_to_errno)?;
+            file.reader = Some(Box::new(FileReaderImpl { reader }));
+        }
+        if flags.is_write() || flags.is_create() || flags.is_append() || 
flags.is_truncate() {
+            let writer = self
+                .op
+                .writer_with(&file_name)
+                .await
+                .map_err(opendal_error_to_errno)?;
+            file.writer = Some(Box::new(FileWriterImpl { writer }));
+        }
+        Ok(file)
+    }
+
+    async fn open_dir(&self, path: &Path, _flags: OpenFileFlags) -> 
Result<OpenedFile> {
+        let file_stat = self.stat(path).await?;
+        debug_assert!(file_stat.kind == Directory);
+
+        let opened_file = OpenedFile::new(file_stat);
+        Ok(opened_file)
+    }
+
+    async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
+        let file_name = path.to_string_lossy().to_string();
+
+        let mut writer = self
+            .op
+            .writer_with(&file_name)
+            .await
+            .map_err(opendal_error_to_errno)?;
+
+        writer.close().await.map_err(opendal_error_to_errno)?;
+
+        let file = self.open_file(path, flags).await?;
+        Ok(file)
+    }
+
+    async fn create_dir(&self, path: &Path) -> Result<FileStat> {
+        let dir_name = build_dir_path(path);
+        self.op
+            .create_dir(&dir_name)
+            .await
+            .map_err(opendal_error_to_errno)?;
+        let file_stat = self.stat(path).await?;
+        Ok(file_stat)
+    }
+
+    async fn set_attr(&self, _path: &Path, _file_stat: &FileStat, _flush: 
bool) -> Result<()> {
+        // no need to implement
+        Ok(())
+    }
+
+    async fn remove_file(&self, path: &Path) -> Result<()> {
+        let file_name = path.to_string_lossy().to_string();
+        self.op
+            .remove(vec![file_name])
+            .await
+            .map_err(opendal_error_to_errno)
+    }
+
+    async fn remove_dir(&self, path: &Path) -> Result<()> {
+        //todo:: need to consider keeping the behavior of posix remove dir 
when the dir is not empty
+        let dir_name = build_dir_path(path);
+        self.op
+            .remove(vec![dir_name])
+            .await
+            .map_err(opendal_error_to_errno)
+    }
+
+    fn get_capacity(&self) -> Result<FileSystemCapacity> {
+        Ok(FileSystemCapacity {})
+    }
+}
+
+struct FileReaderImpl {
+    reader: opendal::Reader,
+}
+
+#[async_trait]
+impl FileReader for FileReaderImpl {
+    async fn read(&mut self, offset: u64, size: u32) -> Result<Bytes> {
+        let end = offset + size as u64;
+        let v = self
+            .reader
+            .read(offset..end)
+            .await
+            .map_err(opendal_error_to_errno)?;
+        Ok(v.to_bytes())
+    }
+}
+
+struct FileWriterImpl {
+    writer: opendal::Writer,
+}
+
+#[async_trait]
+impl FileWriter for FileWriterImpl {
+    async fn write(&mut self, _offset: u64, data: &[u8]) -> Result<u32> {
+        self.writer
+            .write(data.to_vec())
+            .await
+            .map_err(opendal_error_to_errno)?;
+        Ok(data.len() as u32)
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        self.writer.close().await.map_err(opendal_error_to_errno)?;
+        Ok(())
+    }
+}
+
+fn build_dir_path(path: &Path) -> String {
+    let mut dir_path = path.to_string_lossy().to_string();
+    if !dir_path.ends_with('/') {
+        dir_path.push('/');
+    }
+    dir_path
+}
+
+fn opendal_error_to_errno(err: opendal::Error) -> Errno {
+    error!("opendal operator error {:?}", err);
+    match err.kind() {
+        ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP),
+        ErrorKind::IsADirectory => Errno::from(libc::EISDIR),
+        ErrorKind::NotFound => Errno::from(libc::ENOENT),
+        ErrorKind::PermissionDenied => Errno::from(libc::EACCES),
+        ErrorKind::AlreadyExists => Errno::from(libc::EEXIST),
+        ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR),
+        ErrorKind::RateLimited => Errno::from(libc::EBUSY),
+        _ => Errno::from(libc::ENOENT),
+    }
+}
+
+fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType {
+    match mode {
+        EntryMode::DIR => Directory,
+        _ => RegularFile,
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use crate::config::AppConfig;
+    use crate::s3_filesystem::extract_s3_config;
+    use opendal::layers::LoggingLayer;
+    use opendal::{services, Builder, Operator};
+
+    #[tokio::test]
+    async fn test_s3_stat() {
+        let config = 
AppConfig::from_file(Some("tests/conf/gvfs_fuse_s3.toml")).unwrap();
+        let opendal_config = extract_s3_config(&config);
+
+        let builder = services::S3::from_map(opendal_config);
+
+        // Init an operator
+        let op = Operator::new(builder)
+            .expect("opendal create failed")
+            .layer(LoggingLayer::default())
+            .finish();
+
+        let path = "/";
+        let list = op.list(path).await;
+        if let Ok(l) = list {
+            for i in l {
+                println!("list result: {:?}", i);
+            }
+        } else {
+            println!("list error: {:?}", list.err());
+        }
+
+        let meta = op.stat_with(path).await;
+        if let Ok(m) = meta {
+            println!("stat result: {:?}", m);
+        } else {
+            println!("stat error: {:?}", meta.err());
+        }
+    }
+}
diff --git a/clients/filesystem-fuse/src/opened_file.rs 
b/clients/filesystem-fuse/src/opened_file.rs
index 5bc961c9a..0c630e072 100644
--- a/clients/filesystem-fuse/src/opened_file.rs
+++ b/clients/filesystem-fuse/src/opened_file.rs
@@ -122,6 +122,32 @@ pub(crate) struct FileHandle {
 // OpenFileFlags is the open file flags for the file system.
 pub(crate) struct OpenFileFlags(pub(crate) u32);
 
+impl OpenFileFlags {
+    pub fn is_read(&self) -> bool {
+        (self.0 & libc::O_WRONLY as u32) == 0
+    }
+
+    pub fn is_write(&self) -> bool {
+        (self.0 & libc::O_WRONLY as u32) != 0 || (self.0 & libc::O_RDWR as 
u32) != 0
+    }
+
+    pub fn is_append(&self) -> bool {
+        (self.0 & libc::O_APPEND as u32) != 0
+    }
+
+    pub fn is_create(&self) -> bool {
+        (self.0 & libc::O_CREAT as u32) != 0
+    }
+
+    pub fn is_truncate(&self) -> bool {
+        (self.0 & libc::O_TRUNC as u32) != 0
+    }
+
+    pub fn is_exclusive(&self) -> bool {
+        (self.0 & libc::O_EXCL as u32) != 0
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/clients/filesystem-fuse/src/s3_filesystem.rs 
b/clients/filesystem-fuse/src/s3_filesystem.rs
new file mode 100644
index 000000000..e0ca69b4c
--- /dev/null
+++ b/clients/filesystem-fuse/src/s3_filesystem.rs
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::AppConfig;
+use crate::error::ErrorCode::{InvalidConfig, OpenDalError};
+use crate::filesystem::{FileStat, FileSystemCapacity, FileSystemContext, 
PathFileSystem, Result};
+use crate::gravitino_client::{Catalog, Fileset};
+use crate::open_dal_filesystem::OpenDalFileSystem;
+use crate::opened_file::{OpenFileFlags, OpenedFile};
+use crate::utils::{parse_location, GvfsResult};
+use async_trait::async_trait;
+use log::error;
+use opendal::layers::LoggingLayer;
+use opendal::services::S3;
+use opendal::{Builder, Operator};
+use std::collections::HashMap;
+use std::path::Path;
+
+pub(crate) struct S3FileSystem {
+    open_dal_fs: OpenDalFileSystem,
+}
+
+impl S3FileSystem {}
+
+impl S3FileSystem {
+    const S3_CONFIG_PREFIX: &'static str = "s3-";
+
+    pub(crate) fn new(
+        catalog: &Catalog,
+        fileset: &Fileset,
+        config: &AppConfig,
+        _fs_context: &FileSystemContext,
+    ) -> GvfsResult<Self> {
+        let mut opendal_config = extract_s3_config(config);
+        let bucket = extract_bucket(&fileset.storage_location)?;
+        opendal_config.insert("bucket".to_string(), bucket);
+
+        let region = Self::get_s3_region(catalog)?;
+        opendal_config.insert("region".to_string(), region);
+
+        let builder = S3::from_map(opendal_config);
+
+        let op = Operator::new(builder);
+        if let Err(e) = op {
+            error!("opendal create failed: {:?}", e);
+            return Err(OpenDalError.to_error(format!("opendal create failed: 
{:?}", e)));
+        }
+        let op = op.unwrap().layer(LoggingLayer::default()).finish();
+        let open_dal_fs = OpenDalFileSystem::new(op, config, _fs_context);
+        Ok(Self {
+            open_dal_fs: open_dal_fs,
+        })
+    }
+
+    fn get_s3_region(catalog: &Catalog) -> GvfsResult<String> {
+        if let Some(region) = catalog.properties.get("s3-region") {
+            Ok(region.clone())
+        } else if let Some(endpoint) = catalog.properties.get("s3-endpoint") {
+            extract_region(endpoint)
+        } else {
+            Err(InvalidConfig.to_error(format!(
+                "Cant not retrieve region in the Catalog {}",
+                catalog.name
+            )))
+        }
+    }
+}
+
+#[async_trait]
+impl PathFileSystem for S3FileSystem {
+    async fn init(&self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn stat(&self, path: &Path) -> Result<FileStat> {
+        self.open_dal_fs.stat(path).await
+    }
+
+    async fn read_dir(&self, path: &Path) -> Result<Vec<FileStat>> {
+        self.open_dal_fs.read_dir(path).await
+    }
+
+    async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
+        self.open_dal_fs.open_file(path, flags).await
+    }
+
+    async fn open_dir(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
+        self.open_dal_fs.open_dir(path, flags).await
+    }
+
+    async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
+        self.open_dal_fs.create_file(path, flags).await
+    }
+
+    async fn create_dir(&self, path: &Path) -> Result<FileStat> {
+        self.open_dal_fs.create_dir(path).await
+    }
+
+    async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) 
-> Result<()> {
+        self.open_dal_fs.set_attr(path, file_stat, flush).await
+    }
+
+    async fn remove_file(&self, path: &Path) -> Result<()> {
+        self.open_dal_fs.remove_file(path).await
+    }
+
+    async fn remove_dir(&self, path: &Path) -> Result<()> {
+        self.open_dal_fs.remove_dir(path).await
+    }
+
+    fn get_capacity(&self) -> Result<FileSystemCapacity> {
+        self.open_dal_fs.get_capacity()
+    }
+}
+
+pub(crate) fn extract_bucket(location: &str) -> GvfsResult<String> {
+    let url = parse_location(location)?;
+    match url.host_str() {
+        Some(host) => Ok(host.to_string()),
+        None => Err(InvalidConfig.to_error(format!(
+            "Invalid fileset location without bucket: {}",
+            location
+        ))),
+    }
+}
+
+pub(crate) fn extract_region(location: &str) -> GvfsResult<String> {
+    let url = parse_location(location)?;
+    match url.host_str() {
+        Some(host) => {
+            let parts: Vec<&str> = host.split('.').collect();
+            if parts.len() > 1 {
+                Ok(parts[1].to_string())
+            } else {
+                Err(InvalidConfig.to_error(format!(
+                    "Invalid location: expected region in host, got {}",
+                    location
+                )))
+            }
+        }
+        None => Err(InvalidConfig.to_error(format!(
+            "Invalid fileset location without bucket: {}",
+            location
+        ))),
+    }
+}
+
+pub fn extract_s3_config(config: &AppConfig) -> HashMap<String, String> {
+    config
+        .extend_config
+        .clone()
+        .into_iter()
+        .filter_map(|(k, v)| {
+            if k.starts_with(S3FileSystem::S3_CONFIG_PREFIX) {
+                Some((
+                    k.strip_prefix(S3FileSystem::S3_CONFIG_PREFIX)
+                        .unwrap()
+                        .to_string(),
+                    v,
+                ))
+            } else {
+                None
+            }
+        })
+        .collect()
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::default_raw_filesystem::DefaultRawFileSystem;
+    use crate::filesystem::tests::{TestPathFileSystem, TestRawFileSystem};
+    use crate::filesystem::RawFileSystem;
+    use opendal::layers::TimeoutLayer;
+    use std::time::Duration;
+
+    #[test]
+    fn test_extract_bucket() {
+        let location = "s3://bucket/path/to/file";
+        let result = extract_bucket(location);
+        assert!(result.is_ok());
+        assert_eq!(result.unwrap(), "bucket");
+    }
+
+    #[test]
+    fn test_extract_region() {
+        let location = "http://s3.ap-southeast-2.amazonaws.com";;
+        let result = extract_region(location);
+        assert!(result.is_ok());
+        assert_eq!(result.unwrap(), "ap-southeast-2");
+    }
+
+    async fn delete_dir(op: &Operator, dir_name: &str) {
+        let childs = op.list(dir_name).await.expect("list dir failed");
+        for child in childs {
+            let child_name = dir_name.to_string() + child.name();
+            if child.metadata().is_dir() {
+                Box::pin(delete_dir(op, &child_name)).await;
+            } else {
+                op.delete(&child_name).await.expect("delete file failed");
+            }
+        }
+        op.delete(dir_name).await.expect("delete dir failed");
+    }
+
+    async fn create_s3_fs(cwd: &Path) -> S3FileSystem {
+        let config = 
AppConfig::from_file(Some("tests/conf/gvfs_fuse_s3.toml")).unwrap();
+        let opendal_config = extract_s3_config(&config);
+
+        let fs_context = FileSystemContext::default();
+
+        let builder = S3::from_map(opendal_config);
+        let op = Operator::new(builder)
+            .expect("opendal create failed")
+            .layer(LoggingLayer::default())
+            .layer(
+                TimeoutLayer::new()
+                    .with_timeout(Duration::from_secs(300))
+                    .with_io_timeout(Duration::from_secs(300)),
+            )
+            .finish();
+
+        // clean up the test directory
+        let file_name = cwd.to_string_lossy().to_string() + "/";
+        delete_dir(&op, &file_name).await;
+        op.create_dir(&file_name)
+            .await
+            .expect("create test dir failed");
+
+        let open_dal_fs = OpenDalFileSystem::new(op, &config, &fs_context);
+        S3FileSystem { open_dal_fs }
+    }
+
+    #[tokio::test]
+    async fn test_s3_file_system() {
+        if std::env::var("RUN_S3_TESTS").is_err() {
+            return;
+        }
+        let cwd = Path::new("/gvfs_test1");
+        let fs = create_s3_fs(cwd).await;
+
+        let _ = fs.init().await;
+        let mut tester = TestPathFileSystem::new(cwd, fs);
+        tester.test_path_file_system().await;
+    }
+
+    #[tokio::test]
+    async fn test_s3_file_system_with_raw_file_system() {
+        if std::env::var("RUN_S3_TESTS").is_err() {
+            return;
+        }
+
+        let cwd = Path::new("/gvfs_test2");
+        let s3_fs = create_s3_fs(cwd).await;
+        let raw_fs =
+            DefaultRawFileSystem::new(s3_fs, &AppConfig::default(), 
&FileSystemContext::default());
+        let _ = raw_fs.init().await;
+        let mut tester = TestRawFileSystem::new(cwd, raw_fs);
+        tester.test_raw_file_system().await;
+    }
+}
diff --git a/clients/filesystem-fuse/src/utils.rs 
b/clients/filesystem-fuse/src/utils.rs
index bbc8d7d7f..53eb9179d 100644
--- a/clients/filesystem-fuse/src/utils.rs
+++ b/clients/filesystem-fuse/src/utils.rs
@@ -16,9 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::error::ErrorCode::InvalidConfig;
 use crate::error::GvfsError;
+use reqwest::Url;
+use std::path::PathBuf;
 
 pub type GvfsResult<T> = Result<T, GvfsError>;
 
+pub(crate) fn parse_location(location: &str) -> GvfsResult<Url> {
+    let parsed_url = Url::parse(location);
+    if let Err(e) = parsed_url {
+        return Err(InvalidConfig.to_error(format!("Invalid fileset location: 
{}", e)));
+    }
+    Ok(parsed_url.unwrap())
+}
+
+pub(crate) fn extract_root_path(location: &str) -> GvfsResult<PathBuf> {
+    let url = parse_location(location)?;
+    Ok(PathBuf::from(url.path()))
+}
+
 #[cfg(test)]
-mod tests {}
+mod tests {
+    use crate::utils::extract_root_path;
+    use std::path::PathBuf;
+
+    #[test]
+    fn test_extract_root_path() {
+        let location = "s3://bucket/path/to/file";
+        let result = extract_root_path(location);
+        assert!(result.is_ok());
+        assert_eq!(result.unwrap(), PathBuf::from("/path/to/file"));
+    }
+}
diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml 
b/clients/filesystem-fuse/tests/conf/config_test.toml
similarity index 91%
rename from clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
rename to clients/filesystem-fuse/tests/conf/config_test.toml
index ff7c6936f..524e0aa94 100644
--- a/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
+++ b/clients/filesystem-fuse/tests/conf/config_test.toml
@@ -34,7 +34,7 @@ block_size = 8192
 uri = "http://localhost:8090";
 metalake = "test"
 
-# extent settings
+# extend settings
 [extend_config]
-access_key = "XXX_access_key"
-secret_key = "XXX_secret_key"
+s3-access_key_id = "XXX_access_key"
+s3-secret_access_key = "XXX_secret_key"
diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml 
b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
index 013df6cfc..0ec447cd0 100644
--- a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
+++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
@@ -34,7 +34,7 @@ block_size = 8192
 uri = "http://localhost:8090";
 metalake = "test"
 
-# extent settings
-[extent_config]
-access_key = "XXX_access_key"
-secret_key = "XXX_secret_key"
+# extend settings
+[extend_config]
+s3-access_key_id = "XXX_access_key"
+s3-secret_access_key = "XXX_secret_key"
diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml 
b/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml
similarity index 86%
copy from clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
copy to clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml
index 013df6cfc..7d182cd40 100644
--- a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
+++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml
@@ -34,7 +34,10 @@ block_size = 8192
 uri = "http://localhost:8090";
 metalake = "test"
 
-# extent settings
-[extent_config]
-access_key = "XXX_access_key"
-secret_key = "XXX_secret_key"
+# extend settings
+[extend_config]
+s3-access_key_id = "XXX_access_key"
+s3-secret_access_key = "XXX_secret_key"
+s3-region = "XXX_region"
+s3-bucket = "XXX_bucket"
+
diff --git a/clients/filesystem-fuse/tests/fuse_test.rs 
b/clients/filesystem-fuse/tests/fuse_test.rs
index e761fabc5..d06199d78 100644
--- a/clients/filesystem-fuse/tests/fuse_test.rs
+++ b/clients/filesystem-fuse/tests/fuse_test.rs
@@ -17,15 +17,16 @@
  * under the License.
  */
 
+use fuse3::Errno;
 use gvfs_fuse::config::AppConfig;
 use gvfs_fuse::{gvfs_mount, gvfs_unmount};
-use log::info;
-use std::fs;
+use log::{error, info};
 use std::fs::File;
 use std::path::Path;
 use std::sync::Arc;
 use std::thread::sleep;
 use std::time::{Duration, Instant};
+use std::{fs, panic, process};
 use tokio::runtime::Runtime;
 use tokio::task::JoinHandle;
 
@@ -42,8 +43,14 @@ impl FuseTest {
 
         let config = 
AppConfig::from_file(Some("tests/conf/gvfs_fuse_memory.toml"))
             .expect("Failed to load config");
-        self.runtime
-            .spawn(async move { gvfs_mount(&mount_point, "", &config).await });
+        self.runtime.spawn(async move {
+            let result = gvfs_mount(&mount_point, "", &config).await;
+            if let Err(e) = result {
+                error!("Failed to mount gvfs: {:?}", e);
+                return Err(Errno::from(libc::EINVAL));
+            }
+            Ok(())
+        });
         let success = Self::wait_for_fuse_server_ready(&self.mount_point, 
Duration::from_secs(15));
         assert!(success, "Fuse server cannot start up at 15 seconds");
     }
@@ -60,6 +67,7 @@ impl FuseTest {
 
         while start_time.elapsed() < timeout {
             if file_exists(&test_file) {
+                info!("Fuse server is ready",);
                 return true;
             }
             info!("Wait for fuse server ready",);
@@ -80,6 +88,11 @@ impl Drop for FuseTest {
 fn test_fuse_system_with_auto() {
     tracing_subscriber::fmt().init();
 
+    panic::set_hook(Box::new(|info| {
+        error!("A panic occurred: {:?}", info);
+        process::exit(1);
+    }));
+
     let mount_point = "target/gvfs";
     let _ = fs::create_dir_all(mount_point);
 

Reply via email to