This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 71998d9df4 [#6279] feat (gvfs-fuse): Add gvfs-fuse integration tests
for big files and open-file flag test cases (#6280)
71998d9df4 is described below
commit 71998d9df4f7af0c5cedcb8ee845721a86d8455b
Author: Yuhui <[email protected]>
AuthorDate: Wed Feb 5 16:02:19 2025 +0800
[#6279] feat (gvfs-fuse): Add gvfs-fuse integration tests for big files and
open-file flag test cases (#6280)
### What changes were proposed in this pull request?
Add gvfs-fuse integration tests for big files and open-file flag test
cases
### Why are the changes needed?
Fix: #6279
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
IT
---
clients/filesystem-fuse/Makefile | 2 +
clients/filesystem-fuse/src/filesystem.rs | 20 +-
clients/filesystem-fuse/src/main.rs | 2 +-
clients/filesystem-fuse/src/memory_filesystem.rs | 35 +-
clients/filesystem-fuse/src/open_dal_filesystem.rs | 142 +++++-
.../filesystem-fuse/tests/bin/run_fuse_testers.sh | 11 +-
.../filesystem-fuse/tests/bin/run_s3fs_testers.sh | 9 +
clients/filesystem-fuse/tests/fuse_test.rs | 487 +++++++++++++++++----
8 files changed, 587 insertions(+), 121 deletions(-)
diff --git a/clients/filesystem-fuse/Makefile b/clients/filesystem-fuse/Makefile
index 86dd2f2215..21a97ceec7 100644
--- a/clients/filesystem-fuse/Makefile
+++ b/clients/filesystem-fuse/Makefile
@@ -71,5 +71,7 @@ test-s3:
test: doc-test
cargo test --no-fail-fast --all-targets --all-features --workspace
+test-all: test test-s3 test-fuse-it
+
clean:
cargo clean
diff --git a/clients/filesystem-fuse/src/filesystem.rs
b/clients/filesystem-fuse/src/filesystem.rs
index dcf35f8ebc..c0c27a5fbe 100644
--- a/clients/filesystem-fuse/src/filesystem.rs
+++ b/clients/filesystem-fuse/src/filesystem.rs
@@ -297,7 +297,7 @@ pub trait FileWriter: Sync + Send {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
- use libc::{O_APPEND, O_CREAT, O_RDONLY};
+ use libc::{O_CREAT, O_RDONLY, O_WRONLY};
use std::collections::HashMap;
use std::path::Component;
@@ -461,7 +461,11 @@ pub(crate) mod tests {
// Test create file
let file_handle = self
- .test_create_file(parent_file_id, "file1.txt".as_ref())
+ .test_create_file(
+ parent_file_id,
+ "file1.txt".as_ref(),
+ (O_CREAT | O_WRONLY) as u32,
+ )
.await;
// Test write file
@@ -545,11 +549,13 @@ pub(crate) mod tests {
self.files.insert(file_stat.file_id, file_stat);
}
- async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr)
-> FileHandle {
- let file = self
- .fs
- .create_file(root_file_id, name, (O_CREAT | O_APPEND) as u32)
- .await;
+ async fn test_create_file(
+ &mut self,
+ root_file_id: u64,
+ name: &OsStr,
+ flags: u32,
+ ) -> FileHandle {
+ let file = self.fs.create_file(root_file_id, name, flags).await;
assert!(file.is_ok());
let file = file.unwrap();
assert!(file.handle_id > 0);
diff --git a/clients/filesystem-fuse/src/main.rs
b/clients/filesystem-fuse/src/main.rs
index 3534e03346..4e517c76b3 100644
--- a/clients/filesystem-fuse/src/main.rs
+++ b/clients/filesystem-fuse/src/main.rs
@@ -24,7 +24,7 @@ use tokio::signal;
#[tokio::main]
async fn main() -> fuse3::Result<()> {
- tracing_subscriber::fmt().init();
+ tracing_subscriber::fmt::init();
// todo need inmprove the args parsing
let args: Vec<String> = std::env::args().collect();
diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs
b/clients/filesystem-fuse/src/memory_filesystem.rs
index f56e65ea33..d038539072 100644
--- a/clients/filesystem-fuse/src/memory_filesystem.rs
+++ b/clients/filesystem-fuse/src/memory_filesystem.rs
@@ -91,7 +91,7 @@ impl PathFileSystem for MemoryFileSystem {
Ok(results)
}
- async fn open_file(&self, path: &Path, _flags: OpenFileFlags) ->
Result<OpenedFile> {
+ async fn open_file(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
let file_stat = self.stat(path).await?;
let mut opened_file = OpenedFile::new(file_stat);
match opened_file.file_stat.kind {
@@ -105,8 +105,17 @@ impl PathFileSystem for MemoryFileSystem {
.unwrap()
.data
.clone();
- opened_file.reader = Some(Box::new(MemoryFileReader { data:
data.clone() }));
- opened_file.writer = Some(Box::new(MemoryFileWriter { data:
data }));
+ if flags.is_read() {
+ opened_file.reader = Some(Box::new(MemoryFileReader {
data: data.clone() }));
+ }
+ if flags.is_write() || flags.is_append() ||
flags.is_truncate() {
+ opened_file.writer = Some(Box::new(MemoryFileWriter {
data: data.clone() }));
+ }
+
+ if flags.is_truncate() {
+ let mut data = data.lock().unwrap();
+ data.clear();
+ }
Ok(opened_file)
}
_ => Err(Errno::from(libc::EBADF)),
@@ -117,27 +126,19 @@ impl PathFileSystem for MemoryFileSystem {
self.open_file(path, flags).await
}
- async fn create_file(&self, path: &Path, _flags: OpenFileFlags) ->
Result<OpenedFile> {
- let mut file_map = self.file_map.write().unwrap();
- if file_map.contains_key(path) {
+ async fn create_file(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
+ if self.file_map.read().unwrap().contains_key(path) &&
flags.is_exclusive() {
return Err(Errno::from(libc::EEXIST));
}
- let mut opened_file =
OpenedFile::new(FileStat::new_file_filestat_with_path(path, 0));
-
- let data = Arc::new(Mutex::new(Vec::new()));
- file_map.insert(
- opened_file.file_stat.path.clone(),
+ self.file_map.write().unwrap().insert(
+ path.to_path_buf(),
MemoryFile {
kind: RegularFile,
- data: data.clone(),
+ data: Arc::new(Mutex::new(Vec::new())),
},
);
-
- opened_file.reader = Some(Box::new(MemoryFileReader { data:
data.clone() }));
- opened_file.writer = Some(Box::new(MemoryFileWriter { data: data }));
-
- Ok(opened_file)
+ self.open_file(path, flags).await
}
async fn create_dir(&self, path: &Path) -> Result<FileStat> {
diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs
b/clients/filesystem-fuse/src/open_dal_filesystem.rs
index d32b014d1f..9e094873f5 100644
--- a/clients/filesystem-fuse/src/open_dal_filesystem.rs
+++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs
@@ -26,19 +26,26 @@ use bytes::Bytes;
use fuse3::FileType::{Directory, RegularFile};
use fuse3::{Errno, FileType, Timestamp};
use log::error;
-use opendal::{EntryMode, ErrorKind, Metadata, Operator};
+use opendal::{Buffer, EntryMode, ErrorKind, Metadata, Operator};
+use std::mem::swap;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
pub(crate) struct OpenDalFileSystem {
op: Operator,
+ block_size: u32,
}
impl OpenDalFileSystem {}
impl OpenDalFileSystem {
- pub(crate) fn new(op: Operator, _config: &AppConfig, _fs_context:
&FileSystemContext) -> Self {
- Self { op: op }
+ const WRITE_BUFFER_SIZE: usize = 5 * 1024 * 1024;
+
+ pub(crate) fn new(op: Operator, config: &AppConfig, _fs_context:
&FileSystemContext) -> Self {
+ Self {
+ op: op,
+ block_size: config.filesystem.block_size,
+ }
}
fn opendal_meta_to_file_stat(&self, meta: &Metadata, file_stat: &mut
FileStat) {
@@ -120,14 +127,30 @@ impl PathFileSystem for OpenDalFileSystem {
.map_err(opendal_error_to_errno)?;
file.reader = Some(Box::new(FileReaderImpl { reader }));
}
- if flags.is_write() || flags.is_create() || flags.is_append() ||
flags.is_truncate() {
+ if !flags.is_create() && flags.is_append() {
+ error!("The file system does not support open a exists file with
the append mode");
+ return Err(Errno::from(libc::EINVAL));
+ }
+
+ if flags.is_truncate() {
+ self.op
+ .write(&file_name, Buffer::new())
+ .await
+ .map_err(opendal_error_to_errno)?;
+ }
+
+ if flags.is_write() || flags.is_append() || flags.is_truncate() {
let writer = self
.op
.writer_with(&file_name)
.await
.map_err(opendal_error_to_errno)?;
- file.writer = Some(Box::new(FileWriterImpl { writer }));
+ file.writer = Some(Box::new(FileWriterImpl::new(
+ writer,
+ Self::WRITE_BUFFER_SIZE + self.block_size as usize,
+ )));
}
+
Ok(file)
}
@@ -141,15 +164,17 @@ impl PathFileSystem for OpenDalFileSystem {
async fn create_file(&self, path: &Path, flags: OpenFileFlags) ->
Result<OpenedFile> {
let file_name = path.to_string_lossy().to_string();
+ if flags.is_exclusive() {
+ let meta = self.op.stat(&file_name).await;
+ if meta.is_ok() {
+ return Err(Errno::from(libc::EEXIST));
+ }
+ }
- let mut writer = self
- .op
- .writer_with(&file_name)
+ self.op
+ .write(&file_name, Buffer::new())
.await
.map_err(opendal_error_to_errno)?;
-
- writer.close().await.map_err(opendal_error_to_errno)?;
-
let file = self.open_file(path, flags).await?;
Ok(file)
}
@@ -210,19 +235,45 @@ impl FileReader for FileReaderImpl {
struct FileWriterImpl {
writer: opendal::Writer,
+ buffer: Vec<u8>,
+ buffer_size: usize,
+}
+
+impl FileWriterImpl {
+ fn new(writer: opendal::Writer, buffer_size: usize) -> Self {
+ Self {
+ writer,
+ buffer_size: buffer_size,
+ buffer: Vec::with_capacity(buffer_size),
+ }
+ }
}
#[async_trait]
impl FileWriter for FileWriterImpl {
async fn write(&mut self, _offset: u64, data: &[u8]) -> Result<u32> {
- self.writer
- .write(data.to_vec())
- .await
- .map_err(opendal_error_to_errno)?;
+ if self.buffer.len() > OpenDalFileSystem::WRITE_BUFFER_SIZE {
+ let mut new_buffer: Vec<u8> = Vec::with_capacity(self.buffer_size);
+ swap(&mut new_buffer, &mut self.buffer);
+
+ self.writer
+ .write(new_buffer)
+ .await
+ .map_err(opendal_error_to_errno)?;
+ }
+ self.buffer.extend(data);
Ok(data.len() as u32)
}
async fn close(&mut self) -> Result<()> {
+ if !self.buffer.is_empty() {
+ let mut new_buffer: Vec<u8> = vec![];
+ swap(&mut new_buffer, &mut self.buffer);
+ self.writer
+ .write(new_buffer)
+ .await
+ .map_err(opendal_error_to_errno)?;
+ }
self.writer.close().await.map_err(opendal_error_to_errno)?;
Ok(())
}
@@ -260,10 +311,12 @@ fn opendal_filemode_to_filetype(mode: EntryMode) ->
FileType {
#[cfg(test)]
mod test {
use crate::config::AppConfig;
+ use crate::open_dal_filesystem::OpenDalFileSystem;
use crate::s3_filesystem::extract_s3_config;
use crate::s3_filesystem::tests::s3_test_config;
use crate::test_enable_with;
use crate::RUN_TEST_WITH_S3;
+ use bytes::Buf;
use opendal::layers::LoggingLayer;
use opendal::{services, Builder, Operator};
@@ -327,4 +380,63 @@ mod test {
}
}
}
+
+ #[tokio::test]
+ async fn s3_ut_test_s3_write() {
+ test_enable_with!(RUN_TEST_WITH_S3);
+ let config = s3_test_config();
+
+ let op = create_opendal(&config);
+ let path = "/s1/fileset1/gvfs_test/test_dir/test_file";
+ let mut writer = op.writer_with(path).await.unwrap();
+
+ let mut buffer: Vec<u8> = vec![];
+ let num_batch = 10 * 1024;
+ for i in 0..num_batch {
+ let data = vec![i as u8; num_batch];
+ buffer.extend(&data);
+
+ if buffer.len() > OpenDalFileSystem::WRITE_BUFFER_SIZE {
+ writer.write(buffer).await.unwrap();
+ buffer = vec![];
+ };
+ }
+
+ if !buffer.is_empty() {
+ writer.write(buffer).await.unwrap();
+ }
+ writer.close().await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn s3_ut_test_s3_read() {
+ test_enable_with!(RUN_TEST_WITH_S3);
+ let config = s3_test_config();
+
+ let op = create_opendal(&config);
+ let path = "/s1/fileset1/test_dir/test_big_file";
+ let meta = op.stat(path).await;
+ if meta.is_err() {
+ println!("stat error: {:?}", meta.err());
+ return;
+ }
+ let reader = op.reader(path).await.unwrap();
+
+ let mut buffer = Vec::new();
+
+ let batch_size = 1024;
+ let mut start = 0;
+ let mut end = batch_size;
+ loop {
+ let buf = reader.read(start..end).await.unwrap();
+ if buf.is_empty() {
+ break;
+ }
+ buffer.extend_from_slice(buf.chunk());
+ start = end;
+ end += batch_size;
+ }
+
+ println!("Read {} bytes.", buffer.len());
+ }
}
diff --git a/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
b/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
index 6dc38c48f0..7088a310b5 100755
--- a/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
+++ b/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
@@ -50,13 +50,22 @@ if [ "$1" == "test" ]; then
echo "Running tests..."
cd $CLIENT_FUSE_DIR
export RUN_TEST_WITH_FUSE=1
- cargo test --test fuse_test fuse_it_
+ cargo test --test fuse_test fuse_it_ -- weak_consistency
elif [ "$1" == "start" ]; then
# Start the servers
echo "Starting servers..."
start_servers
+elif [ "$1" == "restart" ]; then
+ # Stop the servers
+ echo "Stopping servers..."
+ stop_servers
+
+ # Start the servers
+ echo "Starting servers..."
+ start_servers
+
elif [ "$1" == "stop" ]; then
# Stop the servers
echo "Stopping servers..."
diff --git a/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
b/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
index ac5f9812c9..8f25c0b395 100644
--- a/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
+++ b/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
@@ -51,6 +51,15 @@ elif [ "$1" == "start" ]; then
echo "Starting servers..."
start_servers
+elif [ "$1" == "restart" ]; then
+ # Stop the servers
+ echo "Stopping servers..."
+ stop_servers
+
+ # Start the servers
+ echo "Starting servers..."
+ start_servers
+
elif [ "$1" == "stop" ]; then
# Stop the servers
echo "Stopping servers..."
diff --git a/clients/filesystem-fuse/tests/fuse_test.rs
b/clients/filesystem-fuse/tests/fuse_test.rs
index 41e385c49f..1d1ef80b78 100644
--- a/clients/filesystem-fuse/tests/fuse_test.rs
+++ b/clients/filesystem-fuse/tests/fuse_test.rs
@@ -22,29 +22,32 @@ use gvfs_fuse::config::AppConfig;
use gvfs_fuse::RUN_TEST_WITH_FUSE;
use gvfs_fuse::{gvfs_mount, gvfs_unmount, test_enable_with};
use log::{error, info};
-use std::fs::File;
-use std::path::Path;
-use std::sync::Arc;
-use std::thread::sleep;
+use once_cell::sync::Lazy;
+use std::collections::HashSet;
+use std::fs::{File, OpenOptions};
+use std::io::{Read, Write};
+use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
-use std::{fs, panic, process};
+use std::{env, fs};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
+use tokio::time::interval;
-struct FuseTest {
- runtime: Arc<Runtime>,
+static ASYNC_RUNTIME: Lazy<Runtime> = Lazy::new(|| Runtime::new().unwrap());
+
+struct FuseTestEnv {
mount_point: String,
gvfs_mount: Option<JoinHandle<fuse3::Result<()>>>,
}
-impl FuseTest {
+impl FuseTestEnv {
pub fn setup(&mut self) {
info!("Start gvfs fuse server");
let mount_point = self.mount_point.clone();
let config =
AppConfig::from_file(Some("tests/conf/gvfs_fuse_memory.toml"))
.expect("Failed to load config");
- self.runtime.spawn(async move {
+ ASYNC_RUNTIME.spawn(async move {
let result = gvfs_mount(&mount_point, "", &config).await;
if let Err(e) = result {
error!("Failed to mount gvfs: {:?}", e);
@@ -57,116 +60,440 @@ impl FuseTest {
}
pub fn shutdown(&mut self) {
- self.runtime.block_on(async {
+ ASYNC_RUNTIME.block_on(async {
let _ = gvfs_unmount().await;
});
}
fn wait_for_fuse_server_ready(path: &str, timeout: Duration) -> bool {
let test_file = format!("{}/.gvfs_meta", path);
- let start_time = Instant::now();
+ AwaitUtil::wait(timeout, Duration::from_millis(500), || {
+ file_exists(&test_file)
+ })
+ }
+}
+
+struct AwaitUtil();
- while start_time.elapsed() < timeout {
- if file_exists(&test_file) {
- info!("Fuse server is ready",);
- return true;
+impl AwaitUtil {
+ pub(crate) fn wait(
+ max_wait: Duration,
+ poll_interval: Duration,
+ check_fn: impl Fn() -> bool + Send,
+ ) -> bool {
+ ASYNC_RUNTIME.block_on(async {
+ let start = Instant::now();
+ let mut interval = interval(poll_interval);
+
+ while start.elapsed() < max_wait {
+ interval.tick().await;
+ if check_fn() {
+ return true;
+ }
}
- info!("Wait for fuse server ready",);
- sleep(Duration::from_secs(1));
- }
- false
+ false
+ })
}
}
-impl Drop for FuseTest {
+impl Drop for FuseTestEnv {
fn drop(&mut self) {
info!("Shutdown fuse server");
self.shutdown();
}
}
-#[test]
-fn test_fuse_with_memory_fs() {
- tracing_subscriber::fmt().init();
+struct SequenceFileOperationTest {
+ test_dir: PathBuf,
+ weak_consistency: bool,
+}
- panic::set_hook(Box::new(|info| {
- error!("A panic occurred: {:?}", info);
- process::exit(1);
- }));
+impl SequenceFileOperationTest {
+ fn new(test_dir: &Path) -> Self {
+ let args: Vec<String> = env::args().collect();
+ let weak_consistency = args.contains(&"weak_consistency".to_string());
- let mount_point = "target/gvfs";
- let _ = fs::create_dir_all(mount_point);
+ SequenceFileOperationTest {
+ test_dir: test_dir.to_path_buf(),
+ weak_consistency: weak_consistency,
+ }
+ }
+ fn test_create_file(&self, name: &str, open_options: Option<&OpenOptions>)
-> File {
+ let path = self.test_dir.join(name);
+ let file = {
+ match open_options {
+ None => File::create(&path)
+ .unwrap_or_else(|_| panic!("Failed to create file: {:?}",
path)),
+ Some(options) => options.open(&path).unwrap_or_else(|_| {
+ panic!(
+ "Failed to create file: {:?},
+ options {:?}",
+ path, options
+ )
+ }),
+ }
+ };
+ let file_metadata = file
+ .metadata()
+ .unwrap_or_else(|_| panic!("Failed to get file metadata: {:?}",
path));
+ assert!(file_exists(path));
+ if !self.weak_consistency {
+ assert_eq!(file_metadata.len(), 0);
+ }
+ file
+ }
- let mut test = FuseTest {
- runtime: Arc::new(Runtime::new().unwrap()),
- mount_point: mount_point.to_string(),
- gvfs_mount: None,
- };
+ fn test_open_file(&self, name: &str, open_options: Option<&OpenOptions>)
-> File {
+ let path = self.test_dir.join(name);
+ let file = {
+ match open_options {
+ None => {
+ File::open(&path).unwrap_or_else(|_| panic!("Failed to
open file: {:?}", path))
+ }
+ Some(options) => options.open(&path).unwrap_or_else(|_| {
+ panic!(
+ "Failed to open file: {:?},
+ options {:?}",
+ path, options
+ )
+ }),
+ }
+ };
+ let file_metadata = file
+ .metadata()
+ .unwrap_or_else(|_| panic!("Failed to get file metadata: {:?}",
path));
+ assert!(file_metadata.is_file());
+ assert!(file_exists(path));
+ file
+ }
- test.setup();
- test_fuse_filesystem(mount_point);
-}
+ fn test_read_file(&self, file: &mut File, expect: &[u8]) {
+ let mut content = vec![0; expect.len()];
+ file.read_exact(&mut content).expect("Failed to read file");
+ assert_eq!(content, *expect, "File content mismatch");
+ }
-#[test]
-fn fuse_it_test_fuse() {
- test_enable_with!(RUN_TEST_WITH_FUSE);
+ fn test_read_data(&self, file: &mut File, len: usize) -> Vec<u8> {
+ let mut content = vec![0; len];
+ file.read_exact(&mut content).expect("Failed to read file");
+ content
+ }
- test_fuse_filesystem("target/gvfs/gvfs_test");
-}
+ fn test_append_file(&self, file: &mut File, content: &[u8]) {
+ let old_len = file.metadata().unwrap().len();
+ let size = content.len();
+ file.write_all(content).expect("Failed to write file");
+
+ if !self.weak_consistency {
+ let new_len = file.metadata().unwrap().len();
+ assert_eq!(new_len, old_len + size as u64, "File size mismatch");
+ }
+ }
+
+ fn test_remove_file(&self, name: &str) {
+ let path = self.test_dir.join(name);
+ fs::remove_file(&path).unwrap_or_else(|_| panic!("Failed to remove
file: {:?}", path));
+ assert!(!file_exists(path));
+ }
+
+ fn test_create_dir(&self, name: &str) {
+ let path = self.test_dir.join(name);
+ fs::create_dir(&path).unwrap_or_else(|_| panic!("Failed to create
directory: {:?}", path));
+ assert!(file_exists(path));
+ }
+
+ fn test_list_dir_with_expect(&self, name: &str, expect_childs: &Vec<&str>)
{
+ self.test_list_dir(name, expect_childs, &vec![]);
+ }
+
+ fn test_list_dir_with_unexpected(&self, name: &str, unexpected_childs:
&Vec<&str>) {
+ self.test_list_dir(name, &vec![], unexpected_childs);
+ }
+
+ fn test_list_dir(&self, name: &str, expect_childs: &Vec<&str>,
unexpected_childs: &Vec<&str>) {
+ let path = self.test_dir.join(name);
+ let dir_childs =
+ fs::read_dir(&path).unwrap_or_else(|_| panic!("Failed to list
directory: {:?}", path));
+ let mut childs_set: HashSet<String> = HashSet::default();
+ for child in dir_childs {
+ let entry = child.expect("Failed to get entry");
+ childs_set.insert(entry.file_name().to_string_lossy().to_string());
+ }
+ for expect_child in expect_childs {
+ assert!(
+ childs_set.contains(*expect_child),
+ "Expect child not found: {}",
+ expect_child
+ );
+ }
+
+ for unexpected_child in unexpected_childs {
+ assert!(
+ !childs_set.contains(*unexpected_child),
+ "Unexpected child found: {}",
+ unexpected_child
+ );
+ }
+ }
+
+ fn test_remove_dir(&self, name: &str) {
+ let path = self.test_dir.join(name);
+ fs::remove_dir(&path).unwrap_or_else(|_| panic!("Failed to remove
directory: {:?}", path));
+ assert!(!file_exists(path));
+ }
-fn test_fuse_filesystem(mount_point: &str) {
- info!("Test startup");
- let base_path = Path::new(mount_point);
+ // some file storage can't sync file immediately, so we need to sync file
to make sure the file is written to disk
+ fn sync_file(&self, file: File, name: &str, expect_len: u64) -> Result<(),
()> {
+ if !self.weak_consistency {
+ return Ok(());
+ }
+ drop(file);
- if !file_exists(base_path) {
- fs::create_dir_all(base_path).expect("Failed to create test dir");
+ let path = self.test_dir.join(name);
+ let success = AwaitUtil::wait(Duration::from_secs(3),
Duration::from_millis(200), || {
+ let file =
+ File::open(&path).unwrap_or_else(|_| panic!("Failed to open
file: {:?}", path));
+ let file_len = file.metadata().unwrap().len();
+ file_len >= expect_len
+ });
+ if !success {
+ return Err(());
+ }
+ Ok(())
}
- //test create file
- let test_file = base_path.join("test_create");
- let file = File::create(&test_file).expect("Failed to create file");
- assert!(file.metadata().is_ok(), "Failed to get file metadata");
- assert!(file_exists(&test_file));
+ fn test_basic_filesystem(fs_test: &SequenceFileOperationTest) {
+ let file_name1 = "test_create";
+ //test create file
+ let mut file1 = fs_test.test_create_file(file_name1, None);
+
+ //test write file
+ let content = "write test".as_bytes();
+ fs_test.test_append_file(&mut file1, content);
+ fs_test
+ .sync_file(file1, file_name1, content.len() as u64)
+ .expect("Failed to sync file");
+
+ //test read file
+ let mut file1 = fs_test.test_open_file(file_name1, None);
+ fs_test.test_read_file(&mut file1, content);
+
+ //test delete file
+ fs_test.test_remove_file(file_name1);
+
+ //test create directory
+ let dir_name1 = "test_dir";
+ fs_test.test_create_dir(dir_name1);
- //test write file
- fs::write(&test_file, "read test").expect("Failed to write file");
+ //test create file in directory
+ let test_file2 = "test_dir/test_file";
+ let mut file2 = fs_test.test_create_file(test_file2, None);
- //test read file
- let content = fs::read_to_string(&test_file).expect("Failed to read file");
- assert_eq!(content, "read test", "File content mismatch");
+ //test write file in directory
+ fs_test.test_append_file(&mut file2, content);
+ fs_test
+ .sync_file(file2, test_file2, content.len() as u64)
+ .expect("Failed to sync file");
- //test delete file
- fs::remove_file(&test_file).expect("Failed to delete file");
- assert!(!file_exists(&test_file));
+ //test read file in directory
+ let mut file2 = fs_test.test_open_file(test_file2, None);
+ fs_test.test_read_file(&mut file2, content);
- //test create directory
- let test_dir = base_path.join("test_dir");
- fs::create_dir(&test_dir).expect("Failed to create directory");
+ //test list directory
+ fs_test.test_list_dir_with_expect(dir_name1, &vec!["test_file"]);
- //test create file in directory
- let test_file = base_path.join("test_dir/test_file");
- let file = File::create(&test_file).expect("Failed to create file");
- assert!(file.metadata().is_ok(), "Failed to get file metadata");
+ //test delete file in directory
+ fs_test.test_remove_file(test_file2);
- //test write file in directory
- let test_file = base_path.join("test_dir/test_read");
- fs::write(&test_file, "read test").expect("Failed to write file");
+ //test list directory after delete file
+ fs_test.test_list_dir_with_unexpected(dir_name1, &vec!["test_file"]);
- //test read file in directory
- let content = fs::read_to_string(&test_file).expect("Failed to read file");
- assert_eq!(content, "read test", "File content mismatch");
+ //test delete directory
+ fs_test.test_remove_dir(dir_name1);
+ }
+
+ #[allow(clippy::needless_range_loop)]
+ fn test_big_file(fs_test: &SequenceFileOperationTest) {
+ let test_file = "test_big_file";
+ let round_size: usize = 1024 * 1024;
+ let round: u8 = 1;
+
+ //test write big file
+ {
+ let mut file = fs_test.test_create_file(test_file, None);
+
+ for i in 0..round {
+ let mut content = vec![0; round_size];
+ for j in 0..round_size {
+ content[j] = (i as usize + j) as u8;
+ }
- //test delete file in directory
- fs::remove_file(&test_file).expect("Failed to delete file");
- assert!(!file_exists(&test_file));
+ fs_test.test_append_file(&mut file, &content);
+ }
+ fs_test
+ .sync_file(file, test_file, round_size as u64 * round as u64)
+ .expect("Failed to sync file");
+ }
- //test delete directory
- fs::remove_dir_all(&test_dir).expect("Failed to delete directory");
- assert!(!file_exists(&test_dir));
+ //test read big file
+ {
+ let mut file = fs_test.test_open_file(test_file, None);
+ for i in 0..round {
+ let buffer = fs_test.test_read_data(&mut file, round_size);
- info!("Success test");
+ for j in 0..round_size {
+ assert_eq!(buffer[j], (i as usize + j) as u8, "File
content mismatch");
+ }
+ }
+ }
+
+ fs_test.test_remove_file(test_file);
+ }
+
+ fn test_open_file_flag(fs_test: &SequenceFileOperationTest) {
+ let write_content = "write content";
+ {
+ // test open file with read and write create flag
+ let file_name = "test_open_file";
+ let mut file = fs_test.test_create_file(
+ file_name,
+ Some(OpenOptions::new().read(true).write(true).create(true)),
+ );
+
+ // test write can be done
+ fs_test.test_append_file(&mut file, write_content.as_bytes());
+
+ // test read end of file
+ let result = file.read_exact(&mut [1]);
+ assert!(result.is_err());
+ if let Err(e) = result {
+ assert_eq!(e.to_string(), "failed to fill whole buffer");
+ }
+ }
+
+ {
+ // test open file with write flag
+ let file_name = "test_open_file2";
+ let mut file = fs_test
+ .test_create_file(file_name,
Some(OpenOptions::new().write(true).create(true)));
+
+ // test write can be done
+ fs_test.test_append_file(&mut file, write_content.as_bytes());
+
+ // test read can be have error
+ let result = file.read(&mut [0; 10]);
+ assert!(result.is_err());
+ if let Err(e) = result {
+ assert_eq!(e.to_string(), "Bad file descriptor (os error 9)");
+ }
+ }
+
+ {
+ // test open file with read flag
+ let file_name = "test_open_file2";
+ let mut file = fs_test.test_open_file(file_name,
Some(OpenOptions::new().read(true)));
+
+ // test read can be done
+ fs_test.test_read_file(&mut file, write_content.as_bytes());
+
+ // test write can be have error
+ let result = file.write_all(write_content.as_bytes());
+ assert!(result.is_err());
+ if let Err(e) = result {
+ assert_eq!(e.to_string(), "Bad file descriptor (os error 9)");
+ }
+ }
+
+ {
+ // test open file with truncate file
+ let file_name = "test_open_file2";
+ let file = fs_test.test_open_file(
+ file_name,
+ Some(OpenOptions::new().write(true).truncate(true)),
+ );
+
+ // test file size is 0
+ assert_eq!(file.metadata().unwrap().len(), 0);
+ }
+
+ {
+ // test open file with append flag
+ let file_name = "test_open_file";
+
+ // opendal_fs does not support open and appand
+ let result = OpenOptions::new()
+ .append(true)
+ .open(fs_test.test_dir.join(file_name));
+ if let Err(e) = result {
+ assert_eq!(e.to_string(), "Invalid argument (os error 22)");
+ return;
+ }
+
+ let mut file = fs_test.test_open_file(file_name,
Some(OpenOptions::new().append(true)));
+
+ assert_eq!(file.metadata().unwrap().len(), write_content.len() as
u64);
+
+ // test append
+ fs_test.test_append_file(&mut file, write_content.as_bytes());
+ let file_len = file.metadata().unwrap().len();
+ assert_eq!(file_len, 2 * write_content.len() as u64);
+ }
+ }
}
fn file_exists<P: AsRef<Path>>(path: P) -> bool {
fs::metadata(path).is_ok()
}
+
+fn run_tests(test_dir: &Path) {
+ fs::create_dir_all(test_dir).expect("Failed to create test dir");
+
+ let fs_test = SequenceFileOperationTest::new(test_dir);
+
+ info!("test_fuse_filesystem started");
+ SequenceFileOperationTest::test_basic_filesystem(&fs_test);
+ info!("testtest_fuse_filesystem finished");
+
+ info!("test_big_file started");
+ SequenceFileOperationTest::test_big_file(&fs_test);
+ info!("test_big_file finished");
+
+ info!("test_open_file_flag started");
+ SequenceFileOperationTest::test_open_file_flag(&fs_test);
+ info!("test_open_file_flag finished");
+}
+
+fn test_manually() {
+ let mount_point = Path::new("target/gvfs");
+ let test_dir = mount_point.join("test_dir");
+ run_tests(&test_dir);
+}
+
+#[test]
+fn fuse_it_test_fuse() {
+ test_enable_with!(RUN_TEST_WITH_FUSE);
+ tracing_subscriber::fmt().init();
+
+ let mount_point = Path::new("target/gvfs");
+ let test_dir = mount_point.join("test_dir");
+
+ run_tests(&test_dir);
+}
+
+#[test]
+fn test_fuse_with_memory_fs() {
+ tracing_subscriber::fmt().init();
+
+ let mount_point = "target/gvfs";
+ let _ = fs::create_dir_all(mount_point);
+
+ let mut test = FuseTestEnv {
+ mount_point: mount_point.to_string(),
+ gvfs_mount: None,
+ };
+
+ test.setup();
+
+ let test_dir = Path::new(&test.mount_point).join("test_dir");
+ run_tests(&test_dir);
+}