This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch branch-gvfs-fuse-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-gvfs-fuse-dev by this
push:
new ef2b102a1 [#6012] feat (gvfs-fuse): Support Gravitino S3 fileset
filesystem operation in gvfs fuse (#6013)
ef2b102a1 is described below
commit ef2b102a1435cb433461fbd0fd370433de23cb73
Author: Yuhui <[email protected]>
AuthorDate: Fri Jan 3 17:03:39 2025 +0800
[#6012] feat (gvfs-fuse): Support Gravitino S3 fileset filesystem
operation in gvfs fuse (#6013)
### What changes were proposed in this pull request?
Support a Gravitino S3 fileset filesystem operation in gvfs fuse,
implemented by OpenDal
### Why are the changes needed?
Fix: #6012
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually test
---------
Co-authored-by: Qiming Teng <[email protected]>
---
clients/filesystem-fuse/Cargo.toml | 1 +
clients/filesystem-fuse/conf/gvfs_fuse.toml | 6 +-
clients/filesystem-fuse/src/config.rs | 6 +-
.../filesystem-fuse/src/default_raw_filesystem.rs | 98 +++++--
clients/filesystem-fuse/src/error.rs | 2 +
clients/filesystem-fuse/src/filesystem.rs | 109 +++++---
clients/filesystem-fuse/src/fuse_api_handle.rs | 12 +-
clients/filesystem-fuse/src/gravitino_client.rs | 76 ++++++
.../src/gravitino_fileset_filesystem.rs | 57 +++-
clients/filesystem-fuse/src/gvfs_creator.rs | 166 ++++++++++++
clients/filesystem-fuse/src/gvfs_fuse.rs | 127 +--------
clients/filesystem-fuse/src/lib.rs | 3 +
clients/filesystem-fuse/src/main.rs | 32 ++-
clients/filesystem-fuse/src/memory_filesystem.rs | 32 ++-
clients/filesystem-fuse/src/open_dal_filesystem.rs | 297 +++++++++++++++++++++
clients/filesystem-fuse/src/opened_file.rs | 26 ++
clients/filesystem-fuse/src/s3_filesystem.rs | 276 +++++++++++++++++++
clients/filesystem-fuse/src/utils.rs | 29 +-
.../conf/{gvfs_fuse_test.toml => config_test.toml} | 6 +-
.../tests/conf/gvfs_fuse_memory.toml | 8 +-
.../{gvfs_fuse_memory.toml => gvfs_fuse_s3.toml} | 11 +-
clients/filesystem-fuse/tests/fuse_test.rs | 21 +-
22 files changed, 1166 insertions(+), 235 deletions(-)
diff --git a/clients/filesystem-fuse/Cargo.toml
b/clients/filesystem-fuse/Cargo.toml
index 4008ec5ca..3760bd528 100644
--- a/clients/filesystem-fuse/Cargo.toml
+++ b/clients/filesystem-fuse/Cargo.toml
@@ -42,6 +42,7 @@ futures-util = "0.3.30"
libc = "0.2.168"
log = "0.4.22"
once_cell = "1.20.2"
+opendal = { version = "0.46.0", features = ["services-s3"] }
reqwest = { version = "0.12.9", features = ["json"] }
serde = { version = "1.0.216", features = ["derive"] }
tokio = { version = "1.38.0", features = ["full"] }
diff --git a/clients/filesystem-fuse/conf/gvfs_fuse.toml
b/clients/filesystem-fuse/conf/gvfs_fuse.toml
index 94d3d8560..4bde0e9e1 100644
--- a/clients/filesystem-fuse/conf/gvfs_fuse.toml
+++ b/clients/filesystem-fuse/conf/gvfs_fuse.toml
@@ -32,7 +32,7 @@ block_size = 8192
uri = "http://localhost:8090"
metalake = "your_metalake"
-# extent settings
+# extend settings
[extend_config]
-access_key = "your access_key"
-secret_key = "your_secret_key"
+s3-access_key_id = "your access_key"
+s3-secret_access_key = "your_secret_key"
diff --git a/clients/filesystem-fuse/src/config.rs
b/clients/filesystem-fuse/src/config.rs
index b381caa75..17908fd08 100644
--- a/clients/filesystem-fuse/src/config.rs
+++ b/clients/filesystem-fuse/src/config.rs
@@ -302,18 +302,18 @@ mod test {
#[test]
fn test_config_from_file() {
- let config =
AppConfig::from_file(Some("tests/conf/gvfs_fuse_test.toml")).unwrap();
+ let config =
AppConfig::from_file(Some("tests/conf/config_test.toml")).unwrap();
assert_eq!(config.fuse.file_mask, 0o644);
assert_eq!(config.fuse.dir_mask, 0o755);
assert_eq!(config.filesystem.block_size, 8192);
assert_eq!(config.gravitino.uri, "http://localhost:8090");
assert_eq!(config.gravitino.metalake, "test");
assert_eq!(
- config.extend_config.get("access_key"),
+ config.extend_config.get("s3-access_key_id"),
Some(&"XXX_access_key".to_string())
);
assert_eq!(
- config.extend_config.get("secret_key"),
+ config.extend_config.get("s3-secret_access_key"),
Some(&"XXX_secret_key".to_string())
);
}
diff --git a/clients/filesystem-fuse/src/default_raw_filesystem.rs
b/clients/filesystem-fuse/src/default_raw_filesystem.rs
index 0c9836e5b..944181246 100644
--- a/clients/filesystem-fuse/src/default_raw_filesystem.rs
+++ b/clients/filesystem-fuse/src/default_raw_filesystem.rs
@@ -18,10 +18,11 @@
*/
use crate::config::AppConfig;
use crate::filesystem::{
- FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result,
INITIAL_FILE_ID,
- ROOT_DIR_FILE_ID, ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
+ FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result,
FS_META_FILE_ID,
+ FS_META_FILE_NAME, FS_META_FILE_PATH, INITIAL_FILE_ID, ROOT_DIR_FILE_ID,
+ ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
};
-use crate::opened_file::{FileHandle, OpenFileFlags};
+use crate::opened_file::{FileHandle, OpenFileFlags, OpenedFile};
use crate::opened_file_manager::OpenedFileManager;
use async_trait::async_trait;
use bytes::Bytes;
@@ -78,6 +79,7 @@ impl<T: PathFileSystem> DefaultRawFileSystem<T> {
}
async fn resolve_file_id_to_filestat(&self, file_stat: &mut FileStat,
parent_file_id: u64) {
+ debug_assert!(parent_file_id != 0);
let mut file_manager = self.file_entry_manager.write().await;
let file_entry = file_manager.get_file_entry_by_path(&file_stat.path);
match file_entry {
@@ -132,6 +134,21 @@ impl<T: PathFileSystem> DefaultRawFileSystem<T> {
let mut file_manager = self.file_entry_manager.write().await;
file_manager.insert(parent_file_id, file_id, path);
}
+
+ fn get_meta_file_stat(&self) -> FileStat {
+ let mut meta_file_stat =
+
FileStat::new_file_filestat_with_path(Path::new(FS_META_FILE_PATH), 0);
+ meta_file_stat.set_file_id(ROOT_DIR_FILE_ID, FS_META_FILE_ID);
+ meta_file_stat
+ }
+
+ fn is_meta_file(&self, file_id: u64) -> bool {
+ file_id == FS_META_FILE_ID
+ }
+
+ fn is_meta_file_name(&self, parent_file_id: u64, name: &OsStr) -> bool {
+ parent_file_id == ROOT_DIR_FILE_ID && name ==
OsStr::new(FS_META_FILE_NAME)
+ }
}
#[async_trait]
@@ -144,6 +161,13 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
Path::new(ROOT_DIR_PATH),
)
.await;
+
+ self.insert_file_entry_locked(
+ ROOT_DIR_FILE_ID,
+ FS_META_FILE_ID,
+ Path::new(FS_META_FILE_PATH),
+ )
+ .await;
self.fs.init().await
}
@@ -168,6 +192,10 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
}
async fn stat(&self, file_id: u64) -> Result<FileStat> {
+ if self.is_meta_file(file_id) {
+ return Ok(self.get_meta_file_stat());
+ }
+
let file_entry = self.get_file_entry(file_id).await?;
let mut file_stat = self.fs.stat(&file_entry.path).await?;
file_stat.set_file_id(file_entry.parent_file_id, file_entry.file_id);
@@ -175,8 +203,11 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
}
async fn lookup(&self, parent_file_id: u64, name: &OsStr) ->
Result<FileStat> {
- let parent_file_entry = self.get_file_entry(parent_file_id).await?;
+ if self.is_meta_file_name(parent_file_id, name) {
+ return Ok(self.get_meta_file_stat());
+ }
+ let parent_file_entry = self.get_file_entry(parent_file_id).await?;
let path = parent_file_entry.path.join(name);
let mut file_stat = self.fs.stat(&path).await?;
// fill the file id to file stat
@@ -192,10 +223,21 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
for file_stat in child_filestats.iter_mut() {
self.resolve_file_id_to_filestat(file_stat, file_id).await;
}
+
+ if file_id == ROOT_DIR_FILE_ID {
+ child_filestats.push(self.get_meta_file_stat());
+ }
Ok(child_filestats)
}
async fn open_file(&self, file_id: u64, flags: u32) -> Result<FileHandle> {
+ if self.is_meta_file(file_id) {
+ let meta_file = OpenedFile::new(self.get_meta_file_stat());
+ let resutl = self.opened_file_manager.put(meta_file);
+ let file = resutl.lock().await;
+ return Ok(file.file_handle());
+ }
+
self.open_file_internal(file_id, flags, FileType::RegularFile)
.await
}
@@ -211,6 +253,10 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
name: &OsStr,
flags: u32,
) -> Result<FileHandle> {
+ if self.is_meta_file_name(parent_file_id, name) {
+ return Err(Errno::from(libc::EEXIST));
+ }
+
let parent_file_entry = self.get_file_entry(parent_file_id).await?;
let mut file_without_id = self
.fs
@@ -247,11 +293,19 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
}
async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()>
{
+ if self.is_meta_file(file_id) {
+ return Ok(());
+ }
+
let file_entry = self.get_file_entry(file_id).await?;
self.fs.set_attr(&file_entry.path, file_stat, true).await
}
async fn remove_file(&self, parent_file_id: u64, name: &OsStr) ->
Result<()> {
+ if self.is_meta_file_name(parent_file_id, name) {
+ return Err(Errno::from(libc::EPERM));
+ }
+
let parent_file_entry = self.get_file_entry(parent_file_id).await?;
let path = parent_file_entry.path.join(name);
self.fs.remove_file(&path).await?;
@@ -271,6 +325,15 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
Ok(())
}
+ async fn flush_file(&self, _file_id: u64, fh: u64) -> Result<()> {
+ let opened_file = self
+ .opened_file_manager
+ .get(fh)
+ .ok_or(Errno::from(libc::EBADF))?;
+ let mut file = opened_file.lock().await;
+ file.flush().await
+ }
+
async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> {
let opened_file = self
.opened_file_manager
@@ -280,7 +343,11 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
file.close().await
}
- async fn read(&self, _file_id: u64, fh: u64, offset: u64, size: u32) ->
Result<Bytes> {
+ async fn read(&self, file_id: u64, fh: u64, offset: u64, size: u32) ->
Result<Bytes> {
+ if self.is_meta_file(file_id) {
+ return Ok(Bytes::new());
+ }
+
let (data, file_stat) = {
let opened_file = self
.opened_file_manager
@@ -297,7 +364,11 @@ impl<T: PathFileSystem> RawFileSystem for
DefaultRawFileSystem<T> {
data
}
- async fn write(&self, _file_id: u64, fh: u64, offset: u64, data: &[u8]) ->
Result<u32> {
+ async fn write(&self, file_id: u64, fh: u64, offset: u64, data: &[u8]) ->
Result<u32> {
+ if self.is_meta_file(file_id) {
+ return Err(Errno::from(libc::EPERM));
+ }
+
let (len, file_stat) = {
let opened_file = self
.opened_file_manager
@@ -368,8 +439,6 @@ impl FileEntryManager {
#[cfg(test)]
mod tests {
use super::*;
- use crate::filesystem::tests::TestRawFileSystem;
- use crate::memory_filesystem::MemoryFileSystem;
#[test]
fn test_file_entry_manager() {
@@ -389,17 +458,4 @@ mod tests {
assert!(manager.get_file_entry_by_id(2).is_none());
assert!(manager.get_file_entry_by_path(Path::new("a/b")).is_none());
}
-
- #[tokio::test]
- async fn test_default_raw_file_system() {
- let memory_fs = MemoryFileSystem::new().await;
- let raw_fs = DefaultRawFileSystem::new(
- memory_fs,
- &AppConfig::default(),
- &FileSystemContext::default(),
- );
- let _ = raw_fs.init().await;
- let mut tester = TestRawFileSystem::new(raw_fs);
- tester.test_raw_file_system().await;
- }
}
diff --git a/clients/filesystem-fuse/src/error.rs
b/clients/filesystem-fuse/src/error.rs
index ba3c037c5..7e38e4687 100644
--- a/clients/filesystem-fuse/src/error.rs
+++ b/clients/filesystem-fuse/src/error.rs
@@ -24,6 +24,7 @@ pub enum ErrorCode {
GravitinoClientError,
InvalidConfig,
ConfigNotFound,
+ OpenDalError,
}
impl ErrorCode {
@@ -39,6 +40,7 @@ impl std::fmt::Display for ErrorCode {
ErrorCode::GravitinoClientError => write!(f, "Gravitino client
error"),
ErrorCode::InvalidConfig => write!(f, "Invalid config"),
ErrorCode::ConfigNotFound => write!(f, "Config not found"),
+ ErrorCode::OpenDalError => write!(f, "OpenDal error"),
}
}
}
diff --git a/clients/filesystem-fuse/src/filesystem.rs
b/clients/filesystem-fuse/src/filesystem.rs
index 742cdd4c8..dcf35f8eb 100644
--- a/clients/filesystem-fuse/src/filesystem.rs
+++ b/clients/filesystem-fuse/src/filesystem.rs
@@ -36,6 +36,11 @@ pub(crate) const ROOT_DIR_NAME: &str = "";
pub(crate) const ROOT_DIR_PATH: &str = "/";
pub(crate) const INITIAL_FILE_ID: u64 = 10000;
+// File system meta file is indicated the fuse filesystem is active.
+pub(crate) const FS_META_FILE_PATH: &str = "/.gvfs_meta";
+pub(crate) const FS_META_FILE_NAME: &str = ".gvfs_meta";
+pub(crate) const FS_META_FILE_ID: u64 = 10;
+
/// RawFileSystem interface for the file system implementation. it use by
FuseApiHandle,
/// it ues the file id to operate the file system apis
/// the `file_id` and `parent_file_id` it is the unique identifier for the
file system,
@@ -89,6 +94,9 @@ pub(crate) trait RawFileSystem: Send + Sync {
/// Remove the directory by parent file id and file name
async fn remove_dir(&self, parent_file_id: u64, name: &OsStr) ->
Result<()>;
+ /// flush the file with file id and file handle, if successful return Ok
+ async fn flush_file(&self, file_id: u64, fh: u64) -> Result<()>;
+
/// Close the file by file id and file handle, if successful
async fn close_file(&self, file_id: u64, fh: u64) -> Result<()>;
@@ -289,57 +297,53 @@ pub trait FileWriter: Sync + Send {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
+ use libc::{O_APPEND, O_CREAT, O_RDONLY};
use std::collections::HashMap;
+ use std::path::Component;
pub(crate) struct TestPathFileSystem<F: PathFileSystem> {
files: HashMap<PathBuf, FileStat>,
fs: F,
+ cwd: PathBuf,
}
impl<F: PathFileSystem> TestPathFileSystem<F> {
- pub(crate) fn new(fs: F) -> Self {
+ pub(crate) fn new(cwd: &Path, fs: F) -> Self {
Self {
files: HashMap::new(),
fs,
+ cwd: cwd.into(),
}
}
pub(crate) async fn test_path_file_system(&mut self) {
- // Test root dir
- self.test_root_dir().await;
+ // test root dir
+ let resutl = self.fs.stat(Path::new("/")).await;
+ assert!(resutl.is_ok());
+ let root_file_stat = resutl.unwrap();
+ self.assert_file_stat(&root_file_stat, Path::new("/"), Directory,
0);
- // Test stat file
- self.test_stat_file(Path::new("/.gvfs_meta"), RegularFile, 0)
- .await;
+ // test list root dir
+ let result = self.fs.read_dir(Path::new("/")).await;
+ assert!(result.is_ok());
// Test create file
- self.test_create_file(Path::new("/file1.txt")).await;
+ self.test_create_file(&self.cwd.join("file1.txt")).await;
// Test create dir
- self.test_create_dir(Path::new("/dir1")).await;
+ self.test_create_dir(&self.cwd.join("dir1")).await;
// Test list dir
- self.test_list_dir(Path::new("/")).await;
+ self.test_list_dir(&self.cwd).await;
// Test remove file
- self.test_remove_file(Path::new("/file1.txt")).await;
+ self.test_remove_file(&self.cwd.join("file1.txt")).await;
// Test remove dir
- self.test_remove_dir(Path::new("/dir1")).await;
+ self.test_remove_dir(&self.cwd.join("dir1")).await;
// Test file not found
- self.test_file_not_found(Path::new("unknown")).await;
-
- // Test list dir
- self.test_list_dir(Path::new("/")).await;
- }
-
- async fn test_root_dir(&mut self) {
- let root_dir_path = Path::new("/");
- let root_file_stat = self.fs.stat(root_dir_path).await;
- assert!(root_file_stat.is_ok());
- let root_file_stat = root_file_stat.unwrap();
- self.assert_file_stat(&root_file_stat, root_dir_path, Directory,
0);
+ self.test_file_not_found(&self.cwd.join("unknown")).await;
}
async fn test_stat_file(&mut self, path: &Path, expect_kind: FileType,
expect_size: u64) {
@@ -370,7 +374,6 @@ pub(crate) mod tests {
let list_dir = self.fs.read_dir(path).await;
assert!(list_dir.is_ok());
let list_dir = list_dir.unwrap();
- assert_eq!(list_dir.len(), self.files.len());
for file_stat in list_dir {
assert!(self.files.contains_key(&file_stat.path));
let actual_file_stat =
self.files.get(&file_stat.path).unwrap();
@@ -414,13 +417,15 @@ pub(crate) mod tests {
pub(crate) struct TestRawFileSystem<F: RawFileSystem> {
fs: F,
files: HashMap<u64, FileStat>,
+ cwd: PathBuf,
}
impl<F: RawFileSystem> TestRawFileSystem<F> {
- pub(crate) fn new(fs: F) -> Self {
+ pub(crate) fn new(cwd: &Path, fs: F) -> Self {
Self {
fs,
files: HashMap::new(),
+ cwd: cwd.into(),
}
}
@@ -431,31 +436,45 @@ pub(crate) mod tests {
// test read root dir
self.test_list_dir(ROOT_DIR_FILE_ID, false).await;
- let parent_file_id = ROOT_DIR_FILE_ID;
- // Test lookup file
+ // Test lookup meta file
let file_id = self
- .test_lookup_file(parent_file_id, ".gvfs_meta".as_ref(),
RegularFile, 0)
+ .test_lookup_file(ROOT_DIR_FILE_ID, ".gvfs_meta".as_ref(),
RegularFile, 0)
.await;
- // Test get file stat
+ // Test get meta file stat
self.test_stat_file(file_id, Path::new("/.gvfs_meta"),
RegularFile, 0)
.await;
// Test get file path
self.test_get_file_path(file_id, "/.gvfs_meta").await;
- // Test create file
- self.test_create_file(parent_file_id, "file1.txt".as_ref())
- .await;
+ // get cwd file id
+ let mut parent_file_id = ROOT_DIR_FILE_ID;
+ for child in self.cwd.components() {
+ if child == Component::RootDir {
+ continue;
+ }
+ let file_id = self.fs.create_dir(parent_file_id,
child.as_os_str()).await;
+ assert!(file_id.is_ok());
+ parent_file_id = file_id.unwrap();
+ }
- // Test open file
+ // Test create file
let file_handle = self
- .test_open_file(parent_file_id, "file1.txt".as_ref())
+ .test_create_file(parent_file_id, "file1.txt".as_ref())
.await;
// Test write file
self.test_write_file(&file_handle, "test").await;
+ // Test close file
+ self.test_close_file(&file_handle).await;
+
+ // Test open file with read
+ let file_handle = self
+ .test_open_file(parent_file_id, "file1.txt".as_ref(), O_RDONLY
as u32)
+ .await;
+
// Test read file
self.test_read_file(&file_handle, "test").await;
@@ -526,8 +545,11 @@ pub(crate) mod tests {
self.files.insert(file_stat.file_id, file_stat);
}
- async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) {
- let file = self.fs.create_file(root_file_id, name, 0).await;
+ async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr)
-> FileHandle {
+ let file = self
+ .fs
+ .create_file(root_file_id, name, (O_CREAT | O_APPEND) as u32)
+ .await;
assert!(file.is_ok());
let file = file.unwrap();
assert!(file.handle_id > 0);
@@ -537,11 +559,12 @@ pub(crate) mod tests {
self.test_stat_file(file.file_id, &file_stat.unwrap().path,
RegularFile, 0)
.await;
+ file
}
- async fn test_open_file(&self, root_file_id: u64, name: &OsStr) ->
FileHandle {
+ async fn test_open_file(&self, root_file_id: u64, name: &OsStr, flags:
u32) -> FileHandle {
let file = self.fs.lookup(root_file_id, name).await.unwrap();
- let file_handle = self.fs.open_file(file.file_id, 0).await;
+ let file_handle = self.fs.open_file(file.file_id, flags).await;
assert!(file_handle.is_ok());
let file_handle = file_handle.unwrap();
assert_eq!(file_handle.file_id, file.file_id);
@@ -558,9 +581,16 @@ pub(crate) mod tests {
content.as_bytes(),
)
.await;
+
assert!(write_size.is_ok());
assert_eq!(write_size.unwrap(), content.len() as u32);
+ let result = self
+ .fs
+ .flush_file(file_handle.file_id, file_handle.handle_id)
+ .await;
+ assert!(result.is_ok());
+
self.files.get_mut(&file_handle.file_id).unwrap().size =
content.len() as u64;
}
@@ -606,7 +636,6 @@ pub(crate) mod tests {
if !check_child {
return;
}
- assert_eq!(list_dir.len(), self.files.len());
for file_stat in list_dir {
assert!(self.files.contains_key(&file_stat.file_id));
let actual_file_stat =
self.files.get(&file_stat.file_id).unwrap();
@@ -652,7 +681,7 @@ pub(crate) mod tests {
assert_eq!(file_stat.path, path);
assert_eq!(file_stat.kind, kind);
assert_eq!(file_stat.size, size);
- if file_stat.file_id == 1 {
+ if file_stat.file_id == ROOT_DIR_FILE_ID || file_stat.file_id ==
FS_META_FILE_ID {
assert_eq!(file_stat.parent_file_id, 1);
} else {
assert!(file_stat.file_id >= INITIAL_FILE_ID);
diff --git a/clients/filesystem-fuse/src/fuse_api_handle.rs
b/clients/filesystem-fuse/src/fuse_api_handle.rs
index 153e32389..15679a222 100644
--- a/clients/filesystem-fuse/src/fuse_api_handle.rs
+++ b/clients/filesystem-fuse/src/fuse_api_handle.rs
@@ -227,7 +227,7 @@ impl<T: RawFileSystem> Filesystem for FuseApiHandle<T> {
async fn release(
&self,
- _eq: Request,
+ _req: Request,
inode: Inode,
fh: u64,
_flags: u32,
@@ -237,6 +237,16 @@ impl<T: RawFileSystem> Filesystem for FuseApiHandle<T> {
self.fs.close_file(inode, fh).await
}
+ async fn flush(
+ &self,
+ _req: Request,
+ inode: Inode,
+ fh: u64,
+ _lock_owner: u64,
+ ) -> fuse3::Result<()> {
+ self.fs.flush_file(inode, fh).await
+ }
+
async fn opendir(&self, _req: Request, inode: Inode, flags: u32) ->
fuse3::Result<ReplyOpen> {
let file_handle = self.fs.open_dir(inode, flags).await?;
Ok(ReplyOpen {
diff --git a/clients/filesystem-fuse/src/gravitino_client.rs
b/clients/filesystem-fuse/src/gravitino_client.rs
index e5553c9f6..9bdfbb2c2 100644
--- a/clients/filesystem-fuse/src/gravitino_client.rs
+++ b/clients/filesystem-fuse/src/gravitino_client.rs
@@ -48,6 +48,22 @@ struct FileLocationResponse {
location: String,
}
+#[derive(Debug, Deserialize)]
+pub(crate) struct Catalog {
+ pub(crate) name: String,
+ #[serde(rename = "type")]
+ pub(crate) catalog_type: String,
+ provider: String,
+ comment: String,
+ pub(crate) properties: HashMap<String, String>,
+}
+
+#[derive(Debug, Deserialize)]
+struct CatalogResponse {
+ code: u32,
+ catalog: Catalog,
+}
+
pub(crate) struct GravitinoClient {
gravitino_uri: String,
metalake: String,
@@ -105,6 +121,26 @@ impl GravitinoClient {
Ok(res)
}
+ pub async fn get_catalog_url(&self, catalog_name: &str) -> String {
+ format!(
+ "{}/api/metalakes/{}/catalogs/{}",
+ self.gravitino_uri, self.metalake, catalog_name
+ )
+ }
+
+ pub async fn get_catalog(&self, catalog_name: &str) -> Result<Catalog,
GvfsError> {
+ let url = self.get_catalog_url(catalog_name).await;
+ let res = self.do_get::<CatalogResponse>(&url).await?;
+
+ if res.code != 0 {
+ return Err(GvfsError::Error(
+ ErrorCode::GravitinoClientError,
+ "Failed to get catalog".to_string(),
+ ));
+ }
+ Ok(res.catalog)
+ }
+
pub async fn get_fileset(
&self,
catalog_name: &str,
@@ -257,6 +293,46 @@ mod tests {
}
}
+ #[tokio::test]
+ async fn test_get_catalog_success() {
+ let catalog_response = r#"
+ {
+ "code": 0,
+ "catalog": {
+ "name": "example_catalog",
+ "type": "example_type",
+ "provider": "example_provider",
+ "comment": "This is a test catalog",
+ "properties": {
+ "key1": "value1",
+ "key2": "value2"
+ }
+ }
+ }"#;
+
+ let mock_server_url = &mockito::server_url();
+
+ let url = format!("/api/metalakes/{}/catalogs/{}", "test", "catalog1");
+ let _m = mock("GET", url.as_str())
+ .with_status(200)
+ .with_header("content-type", "application/json")
+ .with_body(catalog_response)
+ .create();
+
+ let config = GravitinoConfig {
+ uri: mock_server_url.to_string(),
+ metalake: "test".to_string(),
+ };
+ let client = GravitinoClient::new(&config);
+
+ let result = client.get_catalog("catalog1").await;
+
+ match result {
+ Ok(_) => {}
+ Err(e) => panic!("Expected Ok, but got Err: {:?}", e),
+ }
+ }
+
async fn get_fileset_example() {
tracing_subscriber::fmt::init();
let config = GravitinoConfig {
diff --git a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
index 98a295dbb..7da2f572d 100644
--- a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
+++ b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs
@@ -30,13 +30,15 @@ use std::path::{Path, PathBuf};
pub(crate) struct GravitinoFilesetFileSystem {
physical_fs: Box<dyn PathFileSystem>,
client: GravitinoClient,
- fileset_location: PathBuf,
+ // location is a absolute path in the physical filesystem that is
associated with the fileset.
+ // e.g. fileset location : s3://bucket/path/to/file the location is
/path/to/file
+ location: PathBuf,
}
impl GravitinoFilesetFileSystem {
pub async fn new(
fs: Box<dyn PathFileSystem>,
- location: &Path,
+ target_path: &Path,
client: GravitinoClient,
_config: &AppConfig,
_context: &FileSystemContext,
@@ -44,18 +46,25 @@ impl GravitinoFilesetFileSystem {
Self {
physical_fs: fs,
client: client,
- fileset_location: location.into(),
+ location: target_path.into(),
}
}
fn gvfs_path_to_raw_path(&self, path: &Path) -> PathBuf {
- self.fileset_location.join(path)
+ let relation_path = path.strip_prefix("/").expect("path should start
with /");
+ if relation_path == Path::new("") {
+ return self.location.clone();
+ }
+ self.location.join(relation_path)
}
fn raw_path_to_gvfs_path(&self, path: &Path) -> Result<PathBuf> {
- path.strip_prefix(&self.fileset_location)
+ let stripped_path = path
+ .strip_prefix(&self.location)
.map_err(|_| Errno::from(libc::EBADF))?;
- Ok(path.into())
+ let mut result_path = PathBuf::from("/");
+ result_path.push(stripped_path);
+ Ok(result_path)
}
}
@@ -128,3 +137,39 @@ impl PathFileSystem for GravitinoFilesetFileSystem {
self.physical_fs.get_capacity()
}
}
+
+#[cfg(test)]
+mod tests {
+ use crate::config::GravitinoConfig;
+ use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
+ use crate::memory_filesystem::MemoryFileSystem;
+ use std::path::Path;
+
+ #[tokio::test]
+ async fn test_map_fileset_path_to_raw_path() {
+ let fs = GravitinoFilesetFileSystem {
+ physical_fs: Box::new(MemoryFileSystem::new().await),
+ client: super::GravitinoClient::new(&GravitinoConfig::default()),
+ location: "/c1/fileset1".into(),
+ };
+ let path = fs.gvfs_path_to_raw_path(Path::new("/a"));
+ assert_eq!(path, Path::new("/c1/fileset1/a"));
+ let path = fs.gvfs_path_to_raw_path(Path::new("/"));
+ assert_eq!(path, Path::new("/c1/fileset1"));
+ }
+
+ #[tokio::test]
+ async fn test_map_raw_path_to_fileset_path() {
+ let fs = GravitinoFilesetFileSystem {
+ physical_fs: Box::new(MemoryFileSystem::new().await),
+ client: super::GravitinoClient::new(&GravitinoConfig::default()),
+ location: "/c1/fileset1".into(),
+ };
+ let path = fs
+ .raw_path_to_gvfs_path(Path::new("/c1/fileset1/a"))
+ .unwrap();
+ assert_eq!(path, Path::new("/a"));
+ let path =
fs.raw_path_to_gvfs_path(Path::new("/c1/fileset1")).unwrap();
+ assert_eq!(path, Path::new("/"));
+ }
+}
diff --git a/clients/filesystem-fuse/src/gvfs_creator.rs
b/clients/filesystem-fuse/src/gvfs_creator.rs
new file mode 100644
index 000000000..aac88ad9d
--- /dev/null
+++ b/clients/filesystem-fuse/src/gvfs_creator.rs
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::AppConfig;
+use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem};
+use crate::filesystem::{FileSystemContext, PathFileSystem};
+use crate::gravitino_client::{Catalog, Fileset, GravitinoClient};
+use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
+use crate::gvfs_fuse::{CreateFileSystemResult, FileSystemSchema};
+use crate::s3_filesystem::S3FileSystem;
+use crate::utils::{extract_root_path, parse_location, GvfsResult};
+
+const GRAVITINO_FILESET_SCHEMA: &str = "gvfs";
+
+pub async fn create_gvfs_filesystem(
+ mount_from: &str,
+ config: &AppConfig,
+ fs_context: &FileSystemContext,
+) -> GvfsResult<CreateFileSystemResult> {
+ // Gvfs-fuse filesystem structure:
+ // FuseApiHandle
+ // ├─ DefaultRawFileSystem (RawFileSystem)
+ // │ └─ FileSystemLog (PathFileSystem)
+ // │ ├─ GravitinoComposedFileSystem (PathFileSystem)
+ // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
+ // │ │ │ └─ S3FileSystem (PathFileSystem)
+ // │ │ │ └─ OpenDALFileSystem (PathFileSystem)
+ // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
+ // │ │ │ └─ HDFSFileSystem (PathFileSystem)
+ // │ │ │ └─ OpenDALFileSystem (PathFileSystem)
+ // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
+ // │ │ │ └─ JuiceFileSystem (PathFileSystem)
+ // │ │ │ └─ NasFileSystem (PathFileSystem)
+ // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
+ // │ │ │ └─ XXXFileSystem (PathFileSystem)
+ //
+ // `SimpleFileSystem` is a low-level filesystem designed to communicate
with FUSE APIs.
+ // It manages file and directory relationships, as well as file mappings.
+ // It delegates file operations to the PathFileSystem
+ //
+ // `FileSystemLog` is a decorator that adds extra debug logging
functionality to file system APIs.
+ // Similar implementations include permissions, caching, and metrics.
+ //
+ // `GravitinoComposeFileSystem` is a composite file system that can
combine multiple `GravitinoFilesetFileSystem`.
+ // It use the part of catalog and schema of fileset path to a find actual
GravitinoFilesetFileSystem. delegate the operation to the real storage.
+ // If the user only mounts a fileset, this layer is not present. There
will only be one below layer.
+ //
+ // `GravitinoFilesetFileSystem` is a file system that can access a
fileset.It translates the fileset path to the real storage path.
+ // and delegate the operation to the real storage.
+ //
+ // `OpenDALFileSystem` is a file system that use the OpenDAL to access
real storage.
+ // it can assess the S3, HDFS, gcs, azblob and other storage.
+ //
+ // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access
S3 storage.
+ //
+ // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to
access HDFS storage.
+ //
+ // `NasFileSystem` is a filesystem that uses a locally accessible path
mounted by NAS tools, such as JuiceFS.
+ //
+ // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS
storage.
+ //
+ // `XXXFileSystem is a filesystem that allows you to implement file access
through your own extensions.
+
+ let client = GravitinoClient::new(&config.gravitino);
+
+ let (catalog_name, schema_name, fileset_name) =
extract_fileset(mount_from)?;
+ let catalog = client.get_catalog(&catalog_name).await?;
+ if catalog.catalog_type != "fileset" {
+ return Err(InvalidConfig.to_error(format!("Catalog {} is not a
fileset", catalog_name)));
+ }
+ let fileset = client
+ .get_fileset(&catalog_name, &schema_name, &fileset_name)
+ .await?;
+
+ let inner_fs = create_fs_with_fileset(&catalog, &fileset, config,
fs_context)?;
+
+ let target_path = extract_root_path(fileset.storage_location.as_str())?;
+ let fs =
+ GravitinoFilesetFileSystem::new(inner_fs, &target_path, client,
config, fs_context).await;
+ Ok(CreateFileSystemResult::Gvfs(fs))
+}
+
+fn create_fs_with_fileset(
+ catalog: &Catalog,
+ fileset: &Fileset,
+ config: &AppConfig,
+ fs_context: &FileSystemContext,
+) -> GvfsResult<Box<dyn PathFileSystem>> {
+ let schema = extract_filesystem_scheme(&fileset.storage_location)?;
+
+ match schema {
+ FileSystemSchema::S3 => Ok(Box::new(S3FileSystem::new(
+ catalog, fileset, config, fs_context,
+ )?)),
+ }
+}
+
+pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> {
+ let path = parse_location(path)?;
+
+ if path.scheme() != GRAVITINO_FILESET_SCHEMA {
+ return Err(InvalidConfig.to_error(format!("Invalid fileset schema:
{}", path)));
+ }
+
+ let split = path.path_segments();
+ if split.is_none() {
+ return Err(InvalidConfig.to_error(format!("Invalid fileset path: {}",
path)));
+ }
+ let split = split.unwrap().collect::<Vec<&str>>();
+ if split.len() != 4 {
+ return Err(InvalidConfig.to_error(format!("Invalid fileset path: {}",
path)));
+ }
+
+ let catalog = split[1].to_string();
+ let schema = split[2].to_string();
+ let fileset = split[3].to_string();
+ Ok((catalog, schema, fileset))
+}
+
+pub fn extract_filesystem_scheme(path: &str) -> GvfsResult<FileSystemSchema> {
+ let url = parse_location(path)?;
+ let scheme = url.scheme();
+
+ match scheme {
+ "s3" => Ok(FileSystemSchema::S3),
+ "s3a" => Ok(FileSystemSchema::S3),
+ _ => Err(UnSupportedFilesystem.to_error(format!("Invalid storage
schema: {}", path))),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::gvfs_creator::extract_fileset;
+ use crate::gvfs_fuse::FileSystemSchema;
+
+ #[test]
+ fn test_extract_fileset() {
+ let location = "gvfs://fileset/test/c1/s1/fileset1";
+ let (catalog, schema, fileset) = extract_fileset(location).unwrap();
+ assert_eq!(catalog, "c1");
+ assert_eq!(schema, "s1");
+ assert_eq!(fileset, "fileset1");
+ }
+
+ #[test]
+ fn test_extract_schema() {
+ let location = "s3://bucket/path/to/file";
+ let schema = super::extract_filesystem_scheme(location).unwrap();
+ assert_eq!(schema, FileSystemSchema::S3);
+ }
+}
diff --git a/clients/filesystem-fuse/src/gvfs_fuse.rs
b/clients/filesystem-fuse/src/gvfs_fuse.rs
index d472895d2..88079e99b 100644
--- a/clients/filesystem-fuse/src/gvfs_fuse.rs
+++ b/clients/filesystem-fuse/src/gvfs_fuse.rs
@@ -18,22 +18,19 @@
*/
use crate::config::AppConfig;
use crate::default_raw_filesystem::DefaultRawFileSystem;
-use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem};
+use crate::error::ErrorCode::UnSupportedFilesystem;
use crate::filesystem::FileSystemContext;
use crate::fuse_api_handle::FuseApiHandle;
use crate::fuse_server::FuseServer;
-use crate::gravitino_client::GravitinoClient;
use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
+use crate::gvfs_creator::create_gvfs_filesystem;
use crate::memory_filesystem::MemoryFileSystem;
use crate::utils::GvfsResult;
use log::info;
use once_cell::sync::Lazy;
-use std::path::Path;
use std::sync::Arc;
use tokio::sync::Mutex;
-const FILESET_PREFIX: &str = "gvfs://fileset/";
-
static SERVER: Lazy<Mutex<Option<Arc<FuseServer>>>> = Lazy::new(||
Mutex::new(None));
pub(crate) enum CreateFileSystemResult {
@@ -44,6 +41,7 @@ pub(crate) enum CreateFileSystemResult {
None,
}
+#[derive(Debug, PartialEq)]
pub enum FileSystemSchema {
S3,
}
@@ -65,7 +63,7 @@ pub async fn mount(mount_to: &str, mount_from: &str, config:
&AppConfig) -> Gvfs
}
pub async fn unmount() -> GvfsResult<()> {
- info!("Stop gvfs-fuse server...");
+ info!("Stopping gvfs-fuse server...");
let svr = {
let mut server = SERVER.lock().await;
if server.is_none() {
@@ -127,120 +125,3 @@ pub async fn create_path_fs(
create_gvfs_filesystem(mount_from, config, fs_context).await
}
}
-
-pub async fn create_gvfs_filesystem(
- mount_from: &str,
- config: &AppConfig,
- fs_context: &FileSystemContext,
-) -> GvfsResult<CreateFileSystemResult> {
- // Gvfs-fuse filesystem structure:
- // FuseApiHandle
- // ├─ DefaultRawFileSystem (RawFileSystem)
- // │ └─ FileSystemLog (PathFileSystem)
- // │ ├─ GravitinoComposedFileSystem (PathFileSystem)
- // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
- // │ │ │ └─ S3FileSystem (PathFileSystem)
- // │ │ │ └─ OpenDALFileSystem (PathFileSystem)
- // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
- // │ │ │ └─ HDFSFileSystem (PathFileSystem)
- // │ │ │ └─ OpenDALFileSystem (PathFileSystem)
- // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
- // │ │ │ └─ JuiceFileSystem (PathFileSystem)
- // │ │ │ └─ NasFileSystem (PathFileSystem)
- // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
- // │ │ │ └─ XXXFileSystem (PathFileSystem)
- //
- // `SimpleFileSystem` is a low-level filesystem designed to communicate
with FUSE APIs.
- // It manages file and directory relationships, as well as file mappings.
- // It delegates file operations to the PathFileSystem
- //
- // `FileSystemLog` is a decorator that adds extra debug logging
functionality to file system APIs.
- // Similar implementations include permissions, caching, and metrics.
- //
- // `GravitinoComposeFileSystem` is a composite file system that can
combine multiple `GravitinoFilesetFileSystem`.
- // It use the part of catalog and schema of fileset path to a find actual
GravitinoFilesetFileSystem. delegate the operation to the real storage.
- // If the user only mounts a fileset, this layer is not present. There
will only be one below layer.
- //
- // `GravitinoFilesetFileSystem` is a file system that can access a
fileset.It translates the fileset path to the real storage path.
- // and delegate the operation to the real storage.
- //
- // `OpenDALFileSystem` is a file system that use the OpenDAL to access
real storage.
- // it can assess the S3, HDFS, gcs, azblob and other storage.
- //
- // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access
S3 storage.
- //
- // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to
access HDFS storage.
- //
- // `NasFileSystem` is a filesystem that uses a locally accessible path
mounted by NAS tools, such as JuiceFS.
- //
- // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS
storage.
- //
- // `XXXFileSystem is a filesystem that allows you to implement file access
through your own extensions.
-
- let client = GravitinoClient::new(&config.gravitino);
-
- let (catalog, schema, fileset) = extract_fileset(mount_from)?;
- let location = client
- .get_fileset(&catalog, &schema, &fileset)
- .await?
- .storage_location;
- let (_schema, location) = extract_storage_filesystem(&location).unwrap();
-
- // todo need to replace the inner filesystem with the real storage
filesystem
- let inner_fs = MemoryFileSystem::new().await;
-
- let fs = GravitinoFilesetFileSystem::new(
- Box::new(inner_fs),
- Path::new(&location),
- client,
- config,
- fs_context,
- )
- .await;
- Ok(CreateFileSystemResult::Gvfs(fs))
-}
-
-pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> {
- if !path.starts_with(FILESET_PREFIX) {
- return Err(InvalidConfig.to_error("Invalid fileset path".to_string()));
- }
-
- let path_without_prefix = &path[FILESET_PREFIX.len()..];
-
- let parts: Vec<&str> = path_without_prefix.split('/').collect();
-
- if parts.len() != 3 {
- return Err(InvalidConfig.to_error("Invalid fileset path".to_string()));
- }
- // todo handle mount catalog or schema
-
- let catalog = parts[1].to_string();
- let schema = parts[2].to_string();
- let fileset = parts[3].to_string();
-
- Ok((catalog, schema, fileset))
-}
-
-pub fn extract_storage_filesystem(path: &str) -> Option<(FileSystemSchema,
String)> {
- // todo need to improve the logic
- if let Some(pos) = path.find("://") {
- let protocol = &path[..pos];
- let location = &path[pos + 3..];
- let location = match location.find('/') {
- Some(index) => &location[index + 1..],
- None => "",
- };
- let location = match location.ends_with('/') {
- true => location.to_string(),
- false => format!("{}/", location),
- };
-
- match protocol {
- "s3" => Some((FileSystemSchema::S3, location.to_string())),
- "s3a" => Some((FileSystemSchema::S3, location.to_string())),
- _ => None,
- }
- } else {
- None
- }
-}
diff --git a/clients/filesystem-fuse/src/lib.rs
b/clients/filesystem-fuse/src/lib.rs
index 5532d619e..31e7c7fd8 100644
--- a/clients/filesystem-fuse/src/lib.rs
+++ b/clients/filesystem-fuse/src/lib.rs
@@ -27,10 +27,13 @@ mod fuse_api_handle;
mod fuse_server;
mod gravitino_client;
mod gravitino_fileset_filesystem;
+mod gvfs_creator;
mod gvfs_fuse;
mod memory_filesystem;
+mod open_dal_filesystem;
mod opened_file;
mod opened_file_manager;
+mod s3_filesystem;
mod utils;
pub async fn gvfs_mount(mount_to: &str, mount_from: &str, config: &AppConfig)
-> GvfsResult<()> {
diff --git a/clients/filesystem-fuse/src/main.rs
b/clients/filesystem-fuse/src/main.rs
index 8eab5ec0d..3534e0334 100644
--- a/clients/filesystem-fuse/src/main.rs
+++ b/clients/filesystem-fuse/src/main.rs
@@ -26,21 +26,37 @@ use tokio::signal;
async fn main() -> fuse3::Result<()> {
tracing_subscriber::fmt().init();
+ // todo need inmprove the args parsing
+ let args: Vec<String> = std::env::args().collect();
+ let (mount_point, mount_from, config_path) = match args.len() {
+ 4 => (args[1].clone(), args[2].clone(), args[3].clone()),
+ _ => {
+ error!("Usage: {} <mount_point> <mount_from> <config>", args[0]);
+ return Err(Errno::from(libc::EINVAL));
+ }
+ };
+
//todo(read config file from args)
- let config = AppConfig::from_file(Some("conf/gvfs_fuse.toml"));
+ let config = AppConfig::from_file(Some(&config_path));
if let Err(e) = &config {
error!("Failed to load config: {:?}", e);
return Err(Errno::from(libc::EINVAL));
}
let config = config.unwrap();
- let handle = tokio::spawn(async move { gvfs_mount("gvfs", "",
&config).await });
-
- let _ = signal::ctrl_c().await;
- info!("Received Ctrl+C, Unmounting gvfs...");
+ let handle = tokio::spawn(async move {
+ let result = gvfs_mount(&mount_point, &mount_from, &config).await;
+ if let Err(e) = result {
+ error!("Failed to mount gvfs: {:?}", e);
+ return Err(Errno::from(libc::EINVAL));
+ }
+ Ok(())
+ });
- if let Err(e) = handle.await {
- error!("Failed to mount gvfs: {:?}", e);
- return Err(Errno::from(libc::EINVAL));
+ tokio::select! {
+ _ = handle => {}
+ _ = signal::ctrl_c() => {
+ info!("Received Ctrl+C, unmounting gvfs...");
+ }
}
let _ = gvfs_unmount().await;
diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs
b/clients/filesystem-fuse/src/memory_filesystem.rs
index b94d16b8d..f56e65ea3 100644
--- a/clients/filesystem-fuse/src/memory_filesystem.rs
+++ b/clients/filesystem-fuse/src/memory_filesystem.rs
@@ -42,8 +42,6 @@ pub struct MemoryFileSystem {
}
impl MemoryFileSystem {
- const FS_META_FILE_NAME: &'static str = "/.gvfs_meta";
-
pub(crate) async fn new() -> Self {
Self {
file_map: RwLock::new(Default::default()),
@@ -69,16 +67,6 @@ impl PathFileSystem for MemoryFileSystem {
};
let root_path = PathBuf::from("/");
self.file_map.write().unwrap().insert(root_path, root_file);
-
- let meta_file = MemoryFile {
- kind: RegularFile,
- data: Arc::new(Mutex::new(Vec::new())),
- };
- let meta_file_path = Path::new(Self::FS_META_FILE_NAME).to_path_buf();
- self.file_map
- .write()
- .unwrap()
- .insert(meta_file_path, meta_file);
Ok(())
}
@@ -248,7 +236,10 @@ fn path_in_dir(dir: &Path, path: &Path) -> bool {
#[cfg(test)]
mod tests {
use super::*;
- use crate::filesystem::tests::TestPathFileSystem;
+ use crate::config::AppConfig;
+ use crate::default_raw_filesystem::DefaultRawFileSystem;
+ use crate::filesystem::tests::{TestPathFileSystem, TestRawFileSystem};
+ use crate::filesystem::{FileSystemContext, RawFileSystem};
#[test]
fn test_path_in_dir() {
@@ -281,7 +272,20 @@ mod tests {
async fn test_memory_file_system() {
let fs = MemoryFileSystem::new().await;
let _ = fs.init().await;
- let mut tester = TestPathFileSystem::new(fs);
+ let mut tester = TestPathFileSystem::new(Path::new("/ab"), fs);
tester.test_path_file_system().await;
}
+
+ #[tokio::test]
+ async fn test_memory_file_system_with_raw_file_system() {
+ let memory_fs = MemoryFileSystem::new().await;
+ let raw_fs = DefaultRawFileSystem::new(
+ memory_fs,
+ &AppConfig::default(),
+ &FileSystemContext::default(),
+ );
+ let _ = raw_fs.init().await;
+ let mut tester = TestRawFileSystem::new(Path::new("/ab"), raw_fs);
+ tester.test_raw_file_system().await;
+ }
}
diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs
b/clients/filesystem-fuse/src/open_dal_filesystem.rs
new file mode 100644
index 000000000..e53fbaf60
--- /dev/null
+++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::AppConfig;
+use crate::filesystem::{
+ FileReader, FileStat, FileSystemCapacity, FileSystemContext, FileWriter,
PathFileSystem, Result,
+};
+use crate::opened_file::{OpenFileFlags, OpenedFile};
+use async_trait::async_trait;
+use bytes::Bytes;
+use fuse3::FileType::{Directory, RegularFile};
+use fuse3::{Errno, FileType, Timestamp};
+use log::error;
+use opendal::{EntryMode, ErrorKind, Metadata, Operator};
+use std::path::{Path, PathBuf};
+use std::time::SystemTime;
+
+pub(crate) struct OpenDalFileSystem {
+ op: Operator,
+}
+
+impl OpenDalFileSystem {}
+
+impl OpenDalFileSystem {
+ pub(crate) fn new(op: Operator, _config: &AppConfig, _fs_context:
&FileSystemContext) -> Self {
+ Self { op: op }
+ }
+
+ fn opendal_meta_to_file_stat(&self, meta: &Metadata, file_stat: &mut
FileStat) {
+ let now = SystemTime::now();
+ let mtime = meta.last_modified().map(|x| x.into()).unwrap_or(now);
+
+ file_stat.size = meta.content_length();
+ file_stat.kind = opendal_filemode_to_filetype(meta.mode());
+ file_stat.ctime = Timestamp::from(mtime);
+ file_stat.atime = Timestamp::from(now);
+ file_stat.mtime = Timestamp::from(mtime);
+ }
+}
+
+#[async_trait]
+impl PathFileSystem for OpenDalFileSystem {
+ async fn init(&self) -> Result<()> {
+ Ok(())
+ }
+
+ async fn stat(&self, path: &Path) -> Result<FileStat> {
+ let file_name = path.to_string_lossy().to_string();
+ let meta_result = self.op.stat(&file_name).await;
+
+ // path may be a directory, so try to stat it as a directory
+ let meta = match meta_result {
+ Ok(meta) => meta,
+ Err(err) => {
+ if err.kind() == ErrorKind::NotFound {
+ let dir_name = build_dir_path(path);
+ self.op
+ .stat(&dir_name)
+ .await
+ .map_err(opendal_error_to_errno)?
+ } else {
+ return Err(opendal_error_to_errno(err));
+ }
+ }
+ };
+
+ let mut file_stat = FileStat::new_file_filestat_with_path(path, 0);
+ self.opendal_meta_to_file_stat(&meta, &mut file_stat);
+
+ Ok(file_stat)
+ }
+
+ async fn read_dir(&self, path: &Path) -> Result<Vec<FileStat>> {
+ // dir name should end with '/' in opendal.
+ let dir_name = build_dir_path(path);
+ let entries = self
+ .op
+ .list(&dir_name)
+ .await
+ .map_err(opendal_error_to_errno)?;
+ entries
+ .iter()
+ .map(|entry| {
+ let mut path = PathBuf::from(path);
+ path.push(entry.name());
+
+ let mut file_stat =
FileStat::new_file_filestat_with_path(&path, 0);
+ self.opendal_meta_to_file_stat(entry.metadata(), &mut
file_stat);
+ Ok(file_stat)
+ })
+ .collect()
+ }
+
+ async fn open_file(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
+ let file_stat = self.stat(path).await?;
+ debug_assert!(file_stat.kind == RegularFile);
+
+ let mut file = OpenedFile::new(file_stat);
+ let file_name = path.to_string_lossy().to_string();
+ if flags.is_read() {
+ let reader = self
+ .op
+ .reader_with(&file_name)
+ .await
+ .map_err(opendal_error_to_errno)?;
+ file.reader = Some(Box::new(FileReaderImpl { reader }));
+ }
+ if flags.is_write() || flags.is_create() || flags.is_append() ||
flags.is_truncate() {
+ let writer = self
+ .op
+ .writer_with(&file_name)
+ .await
+ .map_err(opendal_error_to_errno)?;
+ file.writer = Some(Box::new(FileWriterImpl { writer }));
+ }
+ Ok(file)
+ }
+
+ async fn open_dir(&self, path: &Path, _flags: OpenFileFlags) ->
Result<OpenedFile> {
+ let file_stat = self.stat(path).await?;
+ debug_assert!(file_stat.kind == Directory);
+
+ let opened_file = OpenedFile::new(file_stat);
+ Ok(opened_file)
+ }
+
+ async fn create_file(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
+ let file_name = path.to_string_lossy().to_string();
+
+ let mut writer = self
+ .op
+ .writer_with(&file_name)
+ .await
+ .map_err(opendal_error_to_errno)?;
+
+ writer.close().await.map_err(opendal_error_to_errno)?;
+
+ let file = self.open_file(path, flags).await?;
+ Ok(file)
+ }
+
+ async fn create_dir(&self, path: &Path) -> Result<FileStat> {
+ let dir_name = build_dir_path(path);
+ self.op
+ .create_dir(&dir_name)
+ .await
+ .map_err(opendal_error_to_errno)?;
+ let file_stat = self.stat(path).await?;
+ Ok(file_stat)
+ }
+
+ async fn set_attr(&self, _path: &Path, _file_stat: &FileStat, _flush:
bool) -> Result<()> {
+ // no need to implement
+ Ok(())
+ }
+
+ async fn remove_file(&self, path: &Path) -> Result<()> {
+ let file_name = path.to_string_lossy().to_string();
+ self.op
+ .remove(vec![file_name])
+ .await
+ .map_err(opendal_error_to_errno)
+ }
+
+ async fn remove_dir(&self, path: &Path) -> Result<()> {
+ //todo:: need to consider keeping the behavior of posix remove dir
when the dir is not empty
+ let dir_name = build_dir_path(path);
+ self.op
+ .remove(vec![dir_name])
+ .await
+ .map_err(opendal_error_to_errno)
+ }
+
+ fn get_capacity(&self) -> Result<FileSystemCapacity> {
+ Ok(FileSystemCapacity {})
+ }
+}
+
+struct FileReaderImpl {
+ reader: opendal::Reader,
+}
+
+#[async_trait]
+impl FileReader for FileReaderImpl {
+ async fn read(&mut self, offset: u64, size: u32) -> Result<Bytes> {
+ let end = offset + size as u64;
+ let v = self
+ .reader
+ .read(offset..end)
+ .await
+ .map_err(opendal_error_to_errno)?;
+ Ok(v.to_bytes())
+ }
+}
+
+struct FileWriterImpl {
+ writer: opendal::Writer,
+}
+
+#[async_trait]
+impl FileWriter for FileWriterImpl {
+ async fn write(&mut self, _offset: u64, data: &[u8]) -> Result<u32> {
+ self.writer
+ .write(data.to_vec())
+ .await
+ .map_err(opendal_error_to_errno)?;
+ Ok(data.len() as u32)
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ self.writer.close().await.map_err(opendal_error_to_errno)?;
+ Ok(())
+ }
+}
+
+fn build_dir_path(path: &Path) -> String {
+ let mut dir_path = path.to_string_lossy().to_string();
+ if !dir_path.ends_with('/') {
+ dir_path.push('/');
+ }
+ dir_path
+}
+
+fn opendal_error_to_errno(err: opendal::Error) -> Errno {
+ error!("opendal operator error {:?}", err);
+ match err.kind() {
+ ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP),
+ ErrorKind::IsADirectory => Errno::from(libc::EISDIR),
+ ErrorKind::NotFound => Errno::from(libc::ENOENT),
+ ErrorKind::PermissionDenied => Errno::from(libc::EACCES),
+ ErrorKind::AlreadyExists => Errno::from(libc::EEXIST),
+ ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR),
+ ErrorKind::RateLimited => Errno::from(libc::EBUSY),
+ _ => Errno::from(libc::ENOENT),
+ }
+}
+
+fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType {
+ match mode {
+ EntryMode::DIR => Directory,
+ _ => RegularFile,
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::config::AppConfig;
+ use crate::s3_filesystem::extract_s3_config;
+ use opendal::layers::LoggingLayer;
+ use opendal::{services, Builder, Operator};
+
+ #[tokio::test]
+ async fn test_s3_stat() {
+ let config =
AppConfig::from_file(Some("tests/conf/gvfs_fuse_s3.toml")).unwrap();
+ let opendal_config = extract_s3_config(&config);
+
+ let builder = services::S3::from_map(opendal_config);
+
+ // Init an operator
+ let op = Operator::new(builder)
+ .expect("opendal create failed")
+ .layer(LoggingLayer::default())
+ .finish();
+
+ let path = "/";
+ let list = op.list(path).await;
+ if let Ok(l) = list {
+ for i in l {
+ println!("list result: {:?}", i);
+ }
+ } else {
+ println!("list error: {:?}", list.err());
+ }
+
+ let meta = op.stat_with(path).await;
+ if let Ok(m) = meta {
+ println!("stat result: {:?}", m);
+ } else {
+ println!("stat error: {:?}", meta.err());
+ }
+ }
+}
diff --git a/clients/filesystem-fuse/src/opened_file.rs
b/clients/filesystem-fuse/src/opened_file.rs
index 5bc961c9a..0c630e072 100644
--- a/clients/filesystem-fuse/src/opened_file.rs
+++ b/clients/filesystem-fuse/src/opened_file.rs
@@ -122,6 +122,32 @@ pub(crate) struct FileHandle {
// OpenFileFlags is the open file flags for the file system.
pub(crate) struct OpenFileFlags(pub(crate) u32);
+impl OpenFileFlags {
+ pub fn is_read(&self) -> bool {
+ (self.0 & libc::O_WRONLY as u32) == 0
+ }
+
+ pub fn is_write(&self) -> bool {
+ (self.0 & libc::O_WRONLY as u32) != 0 || (self.0 & libc::O_RDWR as
u32) != 0
+ }
+
+ pub fn is_append(&self) -> bool {
+ (self.0 & libc::O_APPEND as u32) != 0
+ }
+
+ pub fn is_create(&self) -> bool {
+ (self.0 & libc::O_CREAT as u32) != 0
+ }
+
+ pub fn is_truncate(&self) -> bool {
+ (self.0 & libc::O_TRUNC as u32) != 0
+ }
+
+ pub fn is_exclusive(&self) -> bool {
+ (self.0 & libc::O_EXCL as u32) != 0
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/clients/filesystem-fuse/src/s3_filesystem.rs
b/clients/filesystem-fuse/src/s3_filesystem.rs
new file mode 100644
index 000000000..e0ca69b4c
--- /dev/null
+++ b/clients/filesystem-fuse/src/s3_filesystem.rs
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::AppConfig;
+use crate::error::ErrorCode::{InvalidConfig, OpenDalError};
+use crate::filesystem::{FileStat, FileSystemCapacity, FileSystemContext,
PathFileSystem, Result};
+use crate::gravitino_client::{Catalog, Fileset};
+use crate::open_dal_filesystem::OpenDalFileSystem;
+use crate::opened_file::{OpenFileFlags, OpenedFile};
+use crate::utils::{parse_location, GvfsResult};
+use async_trait::async_trait;
+use log::error;
+use opendal::layers::LoggingLayer;
+use opendal::services::S3;
+use opendal::{Builder, Operator};
+use std::collections::HashMap;
+use std::path::Path;
+
+pub(crate) struct S3FileSystem {
+ open_dal_fs: OpenDalFileSystem,
+}
+
+impl S3FileSystem {}
+
+impl S3FileSystem {
+ const S3_CONFIG_PREFIX: &'static str = "s3-";
+
+ pub(crate) fn new(
+ catalog: &Catalog,
+ fileset: &Fileset,
+ config: &AppConfig,
+ _fs_context: &FileSystemContext,
+ ) -> GvfsResult<Self> {
+ let mut opendal_config = extract_s3_config(config);
+ let bucket = extract_bucket(&fileset.storage_location)?;
+ opendal_config.insert("bucket".to_string(), bucket);
+
+ let region = Self::get_s3_region(catalog)?;
+ opendal_config.insert("region".to_string(), region);
+
+ let builder = S3::from_map(opendal_config);
+
+ let op = Operator::new(builder);
+ if let Err(e) = op {
+ error!("opendal create failed: {:?}", e);
+ return Err(OpenDalError.to_error(format!("opendal create failed:
{:?}", e)));
+ }
+ let op = op.unwrap().layer(LoggingLayer::default()).finish();
+ let open_dal_fs = OpenDalFileSystem::new(op, config, _fs_context);
+ Ok(Self {
+ open_dal_fs: open_dal_fs,
+ })
+ }
+
+ fn get_s3_region(catalog: &Catalog) -> GvfsResult<String> {
+ if let Some(region) = catalog.properties.get("s3-region") {
+ Ok(region.clone())
+ } else if let Some(endpoint) = catalog.properties.get("s3-endpoint") {
+ extract_region(endpoint)
+ } else {
+ Err(InvalidConfig.to_error(format!(
+ "Cant not retrieve region in the Catalog {}",
+ catalog.name
+ )))
+ }
+ }
+}
+
+#[async_trait]
+impl PathFileSystem for S3FileSystem {
+ async fn init(&self) -> Result<()> {
+ Ok(())
+ }
+
+ async fn stat(&self, path: &Path) -> Result<FileStat> {
+ self.open_dal_fs.stat(path).await
+ }
+
+ async fn read_dir(&self, path: &Path) -> Result<Vec<FileStat>> {
+ self.open_dal_fs.read_dir(path).await
+ }
+
+ async fn open_file(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
+ self.open_dal_fs.open_file(path, flags).await
+ }
+
+ async fn open_dir(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
+ self.open_dal_fs.open_dir(path, flags).await
+ }
+
+ async fn create_file(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
+ self.open_dal_fs.create_file(path, flags).await
+ }
+
+ async fn create_dir(&self, path: &Path) -> Result<FileStat> {
+ self.open_dal_fs.create_dir(path).await
+ }
+
+ async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool)
-> Result<()> {
+ self.open_dal_fs.set_attr(path, file_stat, flush).await
+ }
+
+ async fn remove_file(&self, path: &Path) -> Result<()> {
+ self.open_dal_fs.remove_file(path).await
+ }
+
+ async fn remove_dir(&self, path: &Path) -> Result<()> {
+ self.open_dal_fs.remove_dir(path).await
+ }
+
+ fn get_capacity(&self) -> Result<FileSystemCapacity> {
+ self.open_dal_fs.get_capacity()
+ }
+}
+
+pub(crate) fn extract_bucket(location: &str) -> GvfsResult<String> {
+ let url = parse_location(location)?;
+ match url.host_str() {
+ Some(host) => Ok(host.to_string()),
+ None => Err(InvalidConfig.to_error(format!(
+ "Invalid fileset location without bucket: {}",
+ location
+ ))),
+ }
+}
+
+pub(crate) fn extract_region(location: &str) -> GvfsResult<String> {
+ let url = parse_location(location)?;
+ match url.host_str() {
+ Some(host) => {
+ let parts: Vec<&str> = host.split('.').collect();
+ if parts.len() > 1 {
+ Ok(parts[1].to_string())
+ } else {
+ Err(InvalidConfig.to_error(format!(
+ "Invalid location: expected region in host, got {}",
+ location
+ )))
+ }
+ }
+ None => Err(InvalidConfig.to_error(format!(
+ "Invalid fileset location without bucket: {}",
+ location
+ ))),
+ }
+}
+
+pub fn extract_s3_config(config: &AppConfig) -> HashMap<String, String> {
+ config
+ .extend_config
+ .clone()
+ .into_iter()
+ .filter_map(|(k, v)| {
+ if k.starts_with(S3FileSystem::S3_CONFIG_PREFIX) {
+ Some((
+ k.strip_prefix(S3FileSystem::S3_CONFIG_PREFIX)
+ .unwrap()
+ .to_string(),
+ v,
+ ))
+ } else {
+ None
+ }
+ })
+ .collect()
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::default_raw_filesystem::DefaultRawFileSystem;
+ use crate::filesystem::tests::{TestPathFileSystem, TestRawFileSystem};
+ use crate::filesystem::RawFileSystem;
+ use opendal::layers::TimeoutLayer;
+ use std::time::Duration;
+
+ #[test]
+ fn test_extract_bucket() {
+ let location = "s3://bucket/path/to/file";
+ let result = extract_bucket(location);
+ assert!(result.is_ok());
+ assert_eq!(result.unwrap(), "bucket");
+ }
+
+ #[test]
+ fn test_extract_region() {
+ let location = "http://s3.ap-southeast-2.amazonaws.com";
+ let result = extract_region(location);
+ assert!(result.is_ok());
+ assert_eq!(result.unwrap(), "ap-southeast-2");
+ }
+
+ async fn delete_dir(op: &Operator, dir_name: &str) {
+ let childs = op.list(dir_name).await.expect("list dir failed");
+ for child in childs {
+ let child_name = dir_name.to_string() + child.name();
+ if child.metadata().is_dir() {
+ Box::pin(delete_dir(op, &child_name)).await;
+ } else {
+ op.delete(&child_name).await.expect("delete file failed");
+ }
+ }
+ op.delete(dir_name).await.expect("delete dir failed");
+ }
+
+ async fn create_s3_fs(cwd: &Path) -> S3FileSystem {
+ let config =
AppConfig::from_file(Some("tests/conf/gvfs_fuse_s3.toml")).unwrap();
+ let opendal_config = extract_s3_config(&config);
+
+ let fs_context = FileSystemContext::default();
+
+ let builder = S3::from_map(opendal_config);
+ let op = Operator::new(builder)
+ .expect("opendal create failed")
+ .layer(LoggingLayer::default())
+ .layer(
+ TimeoutLayer::new()
+ .with_timeout(Duration::from_secs(300))
+ .with_io_timeout(Duration::from_secs(300)),
+ )
+ .finish();
+
+ // clean up the test directory
+ let file_name = cwd.to_string_lossy().to_string() + "/";
+ delete_dir(&op, &file_name).await;
+ op.create_dir(&file_name)
+ .await
+ .expect("create test dir failed");
+
+ let open_dal_fs = OpenDalFileSystem::new(op, &config, &fs_context);
+ S3FileSystem { open_dal_fs }
+ }
+
+ #[tokio::test]
+ async fn test_s3_file_system() {
+ if std::env::var("RUN_S3_TESTS").is_err() {
+ return;
+ }
+ let cwd = Path::new("/gvfs_test1");
+ let fs = create_s3_fs(cwd).await;
+
+ let _ = fs.init().await;
+ let mut tester = TestPathFileSystem::new(cwd, fs);
+ tester.test_path_file_system().await;
+ }
+
+ #[tokio::test]
+ async fn test_s3_file_system_with_raw_file_system() {
+ if std::env::var("RUN_S3_TESTS").is_err() {
+ return;
+ }
+
+ let cwd = Path::new("/gvfs_test2");
+ let s3_fs = create_s3_fs(cwd).await;
+ let raw_fs =
+ DefaultRawFileSystem::new(s3_fs, &AppConfig::default(),
&FileSystemContext::default());
+ let _ = raw_fs.init().await;
+ let mut tester = TestRawFileSystem::new(cwd, raw_fs);
+ tester.test_raw_file_system().await;
+ }
+}
diff --git a/clients/filesystem-fuse/src/utils.rs
b/clients/filesystem-fuse/src/utils.rs
index bbc8d7d7f..53eb9179d 100644
--- a/clients/filesystem-fuse/src/utils.rs
+++ b/clients/filesystem-fuse/src/utils.rs
@@ -16,9 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
+use crate::error::ErrorCode::InvalidConfig;
use crate::error::GvfsError;
+use reqwest::Url;
+use std::path::PathBuf;
pub type GvfsResult<T> = Result<T, GvfsError>;
+pub(crate) fn parse_location(location: &str) -> GvfsResult<Url> {
+ let parsed_url = Url::parse(location);
+ if let Err(e) = parsed_url {
+ return Err(InvalidConfig.to_error(format!("Invalid fileset location:
{}", e)));
+ }
+ Ok(parsed_url.unwrap())
+}
+
+pub(crate) fn extract_root_path(location: &str) -> GvfsResult<PathBuf> {
+ let url = parse_location(location)?;
+ Ok(PathBuf::from(url.path()))
+}
+
#[cfg(test)]
-mod tests {}
+mod tests {
+ use crate::utils::extract_root_path;
+ use std::path::PathBuf;
+
+ #[test]
+ fn test_extract_root_path() {
+ let location = "s3://bucket/path/to/file";
+ let result = extract_root_path(location);
+ assert!(result.is_ok());
+ assert_eq!(result.unwrap(), PathBuf::from("/path/to/file"));
+ }
+}
diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
b/clients/filesystem-fuse/tests/conf/config_test.toml
similarity index 91%
rename from clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
rename to clients/filesystem-fuse/tests/conf/config_test.toml
index ff7c6936f..524e0aa94 100644
--- a/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml
+++ b/clients/filesystem-fuse/tests/conf/config_test.toml
@@ -34,7 +34,7 @@ block_size = 8192
uri = "http://localhost:8090"
metalake = "test"
-# extent settings
+# extend settings
[extend_config]
-access_key = "XXX_access_key"
-secret_key = "XXX_secret_key"
+s3-access_key_id = "XXX_access_key"
+s3-secret_access_key = "XXX_secret_key"
diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
index 013df6cfc..0ec447cd0 100644
--- a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
+++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
@@ -34,7 +34,7 @@ block_size = 8192
uri = "http://localhost:8090"
metalake = "test"
-# extent settings
-[extent_config]
-access_key = "XXX_access_key"
-secret_key = "XXX_secret_key"
+# extend settings
+[extend_config]
+s3-access_key_id = "XXX_access_key"
+s3-secret_access_key = "XXX_secret_key"
diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
b/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml
similarity index 86%
copy from clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
copy to clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml
index 013df6cfc..7d182cd40 100644
--- a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml
+++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_s3.toml
@@ -34,7 +34,10 @@ block_size = 8192
uri = "http://localhost:8090"
metalake = "test"
-# extent settings
-[extent_config]
-access_key = "XXX_access_key"
-secret_key = "XXX_secret_key"
+# extend settings
+[extend_config]
+s3-access_key_id = "XXX_access_key"
+s3-secret_access_key = "XXX_secret_key"
+s3-region = "XXX_region"
+s3-bucket = "XXX_bucket"
+
diff --git a/clients/filesystem-fuse/tests/fuse_test.rs
b/clients/filesystem-fuse/tests/fuse_test.rs
index e761fabc5..d06199d78 100644
--- a/clients/filesystem-fuse/tests/fuse_test.rs
+++ b/clients/filesystem-fuse/tests/fuse_test.rs
@@ -17,15 +17,16 @@
* under the License.
*/
+use fuse3::Errno;
use gvfs_fuse::config::AppConfig;
use gvfs_fuse::{gvfs_mount, gvfs_unmount};
-use log::info;
-use std::fs;
+use log::{error, info};
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use std::thread::sleep;
use std::time::{Duration, Instant};
+use std::{fs, panic, process};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
@@ -42,8 +43,14 @@ impl FuseTest {
let config =
AppConfig::from_file(Some("tests/conf/gvfs_fuse_memory.toml"))
.expect("Failed to load config");
- self.runtime
- .spawn(async move { gvfs_mount(&mount_point, "", &config).await });
+ self.runtime.spawn(async move {
+ let result = gvfs_mount(&mount_point, "", &config).await;
+ if let Err(e) = result {
+ error!("Failed to mount gvfs: {:?}", e);
+ return Err(Errno::from(libc::EINVAL));
+ }
+ Ok(())
+ });
let success = Self::wait_for_fuse_server_ready(&self.mount_point,
Duration::from_secs(15));
assert!(success, "Fuse server cannot start up at 15 seconds");
}
@@ -60,6 +67,7 @@ impl FuseTest {
while start_time.elapsed() < timeout {
if file_exists(&test_file) {
+ info!("Fuse server is ready",);
return true;
}
info!("Wait for fuse server ready",);
@@ -80,6 +88,11 @@ impl Drop for FuseTest {
fn test_fuse_system_with_auto() {
tracing_subscriber::fmt().init();
+ panic::set_hook(Box::new(|info| {
+ error!("A panic occurred: {:?}", info);
+ process::exit(1);
+ }));
+
let mount_point = "target/gvfs";
let _ = fs::create_dir_all(mount_point);