This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 54aee69  feat(fs): introduce file system tests (#42)
54aee69 is described below

commit 54aee69589e6677bf7086a2a4c9ea94cf8368700
Author: Zhang Jiawei <[email protected]>
AuthorDate: Tue Jun 2 18:15:13 2026 +0800

    feat(fs): introduce file system tests (#42)
---
 src/paimon/common/fs/external_path_provider.h      |   74 +
 .../common/fs/external_path_provider_test.cpp      |   68 +
 src/paimon/common/fs/file_system_test.cpp          | 1407 ++++++++++++++++++++
 .../common/fs/resolving_file_system_test.cpp       |  235 ++++
 4 files changed, 1784 insertions(+)

diff --git a/src/paimon/common/fs/external_path_provider.h 
b/src/paimon/common/fs/external_path_provider.h
new file mode 100644
index 0000000..568d3ac
--- /dev/null
+++ b/src/paimon/common/fs/external_path_provider.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <random>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+/// Provider for external data paths.
+class ExternalPathProvider {
+ public:
+    static Result<std::unique_ptr<ExternalPathProvider>> Create(
+        const std::vector<std::string>& external_table_paths,
+        const std::string& relative_bucket_path) {
+        if (external_table_paths.empty()) {
+            return Status::Invalid("external table paths cannot be empty");
+        }
+        return std::unique_ptr<ExternalPathProvider>(
+            new ExternalPathProvider(external_table_paths, 
relative_bucket_path));
+    }
+
+    /// Get the next external data path.
+    ///
+    /// @return the next external data path
+    std::string GetNextExternalDataPath(const std::string& file_name) {
+        position_++;
+        if (position_ == external_table_paths_.size()) {
+            position_ = 0;
+        }
+        return PathUtil::JoinPath(
+            PathUtil::JoinPath(external_table_paths_[position_], 
relative_bucket_path_), file_name);
+    }
+
+ private:
+    ExternalPathProvider(const std::vector<std::string>& external_table_paths,
+                         const std::string& relative_bucket_path)
+        : external_table_paths_(external_table_paths), 
relative_bucket_path_(relative_bucket_path) {
+        std::random_device rd;
+        std::mt19937 gen(rd());
+        std::uniform_int_distribution<> dist(0, external_table_paths_.size() - 
1);
+        position_ = dist(gen);
+    }
+
+ private:
+    std::vector<std::string> external_table_paths_;
+    std::string relative_bucket_path_;
+    size_t position_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/fs/external_path_provider_test.cpp 
b/src/paimon/common/fs/external_path_provider_test.cpp
new file mode 100644
index 0000000..6893bfb
--- /dev/null
+++ b/src/paimon/common/fs/external_path_provider_test.cpp
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/fs/external_path_provider.h"
+
+#include <set>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(ExternalPathProviderTest, TestInvalidExternalDataPath) {
+    std::vector<std::string> external_table_paths;
+    std::string relative_bucket_path = "p0=1/p1=0/bucket-0";
+
+    auto provider = ExternalPathProvider::Create(external_table_paths, 
relative_bucket_path);
+    ASSERT_NOK_WITH_MSG(provider.status(), "external table paths cannot be 
empty");
+}
+
+TEST(ExternalPathProviderTest, TestGetNextExternalDataPath) {
+    std::vector<std::string> external_table_paths;
+    external_table_paths.emplace_back("/tmp/external_path/");
+    std::string relative_bucket_path = "p0=1/p1=0/bucket-0";
+
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<ExternalPathProvider> provider,
+                         ExternalPathProvider::Create(external_table_paths, 
relative_bucket_path));
+    ASSERT_EQ(provider->GetNextExternalDataPath("file.orc"),
+              "/tmp/external_path/p0=1/p1=0/bucket-0/file.orc");
+}
+
+TEST(ExternalPathProviderTest, TestGetNextExternalDataPath2) {
+    std::vector<std::string> external_table_paths;
+    external_table_paths.emplace_back("/tmp/external_path_a/");
+    external_table_paths.emplace_back("/tmp/external_path_b/");
+    external_table_paths.emplace_back("/tmp/external_path_c/");
+    std::string relative_bucket_path = "p0=1/p1=0/bucket-0";
+
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<ExternalPathProvider> provider,
+                         ExternalPathProvider::Create(external_table_paths, 
relative_bucket_path));
+
+    std::set<std::string> result_data_paths;
+    result_data_paths.insert(provider->GetNextExternalDataPath("file.orc"));
+    result_data_paths.insert(provider->GetNextExternalDataPath("file.orc"));
+    result_data_paths.insert(provider->GetNextExternalDataPath("file.orc"));
+
+    ASSERT_EQ(result_data_paths, std::set<std::string>({
+                                     
"/tmp/external_path_a/p0=1/p1=0/bucket-0/file.orc",
+                                     
"/tmp/external_path_b/p0=1/p1=0/bucket-0/file.orc",
+                                     
"/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc",
+                                 }));
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/fs/file_system_test.cpp 
b/src/paimon/common/fs/file_system_test.cpp
new file mode 100644
index 0000000..815b983
--- /dev/null
+++ b/src/paimon/common/fs/file_system_test.cpp
@@ -0,0 +1,1407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/fs/file_system.h"
+
+#include <cassert>
+#include <cstdlib>
+#include <future>
+#include <map>
+#include <set>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/uuid.h"
+#include "paimon/executor.h"
+#include "paimon/factories/factory_creator.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class FileSystemTest : public ::testing::Test, public 
::testing::WithParamInterface<std::string> {
+ public:
+    void SetUp() override {
+        std::string file_system = GetParam();
+        dir_ = paimon::test::UniqueTestDirectory::Create(file_system);
+        ASSERT_TRUE(dir_);
+        test_root_ = dir_->Str();
+        fs_ = dir_->GetFileSystem();
+    }
+
+    void TearDown() override {
+        dir_.reset();
+        fs_.reset();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+    /// Creates a random string with a length within the given interval. The 
string contains
+    /// only characters that can be represented as a single code point.
+    ///
+    /// @param min_length The minimum string length.
+    /// @param max_length The maximum string length (inclusive).
+    /// @param min_value The minimum character value to occur.
+    /// @param max_value The maximum character value to occur.
+    /// @return A random String.
+    static std::string GetRandomString(int32_t min_length, int32_t max_length, 
char min_value,
+                                       char max_value) {
+        int32_t len = std::rand() % (max_length - min_length + 1) + min_length;
+
+        std::vector<char> data;
+        data.resize(len);
+        int32_t diff = max_value - min_value + 1;
+
+        for (int32_t i = 0; i < len; i++) {
+            data[i] = static_cast<char>(std::rand() % diff + min_value);
+        }
+        return std::string(data.data(), data.size());
+    }
+
+    static std::string RandomName() {
+        return GetRandomString(16, 16, 'a', 'z');
+    }
+
+    std::string MakeDir(const std::string& test_root) {
+        EXPECT_OK_AND_ASSIGN(bool exist, fs_->Exists(test_root));
+        if (!exist) {
+            EXPECT_OK(fs_->Mkdirs(test_root));
+        }
+        return test_root;
+    }
+
+    void CreateFile(const std::string& file) {
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<OutputStream> out,
+                             fs_->Create(file, /*overwrite=*/true));
+        std::string input = "paimon";
+        char chars[8] = {1, 2, 3, 4, 5, 6, 7, 8};
+        ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(chars, input.size()));
+        ASSERT_EQ(size, input.size());
+        ASSERT_OK(out->Flush());
+        ASSERT_OK(out->Close());
+    }
+
+    std::string CreateRandomFileInDirectory(const std::string& test_root) {
+        std::string directory = MakeDir(test_root);
+        std::string file_path = PathUtil::JoinPath(directory, RandomName());
+        CreateFile(file_path);
+        return file_path;
+    }
+
+    std::string RemoveLastSlashInPath(const std::string& path) const {
+        std::string new_path = path;
+        PathUtil::TrimLastDelim(&new_path);
+        return new_path;
+    }
+
+    void CheckFileStatus(const std::vector<std::unique_ptr<FileStatus>>& 
actual_statuses,
+                         const std::set<std::string>& expected_files,
+                         const std::set<std::string>& expected_dirs) const {
+        ASSERT_EQ(actual_statuses.size(), expected_files.size() + 
expected_dirs.size());
+        std::set<std::string> actual_files;
+        std::set<std::string> actual_dirs;
+        for (const auto& file_status : actual_statuses) {
+            if (file_status->IsDir()) {
+                
actual_dirs.insert(RemoveLastSlashInPath(file_status->GetPath()));
+            } else {
+                actual_files.insert(file_status->GetPath());
+                ASSERT_GT(file_status->GetLen(), 0);
+                int64_t modification_time = file_status->GetModificationTime();
+                ASSERT_GT(modification_time, 10000000000L);     // 
MIN_VALID_FILE_MODIFICATION_MS
+                ASSERT_LT(modification_time, 10000000000000L);  // 
MAX_VALID_FILE_MODIFICATION_MS
+            }
+        }
+        std::set<std::string> normalized_expected_dirs;
+        for (const auto& path : expected_dirs) {
+            normalized_expected_dirs.insert(RemoveLastSlashInPath(path));
+        }
+        ASSERT_EQ(actual_files, expected_files);
+        ASSERT_EQ(actual_dirs, normalized_expected_dirs);
+    }
+
+    void CheckBasicFileStatus(const 
std::vector<std::unique_ptr<BasicFileStatus>>& actual_statuses,
+                              const std::set<std::string>& expected_files,
+                              const std::set<std::string>& expected_dirs) 
const {
+        ASSERT_EQ(actual_statuses.size(), expected_files.size() + 
expected_dirs.size());
+        std::set<std::string> actual_files;
+        std::set<std::string> actual_dirs;
+        for (const auto& file_status : actual_statuses) {
+            if (file_status->IsDir()) {
+                
actual_dirs.insert(RemoveLastSlashInPath(file_status->GetPath()));
+            } else {
+                actual_files.insert(file_status->GetPath());
+            }
+        }
+        std::set<std::string> normalized_expected_dirs;
+        for (const auto& path : expected_dirs) {
+            normalized_expected_dirs.insert(RemoveLastSlashInPath(path));
+        }
+        ASSERT_EQ(actual_files, expected_files);
+        ASSERT_EQ(actual_dirs, normalized_expected_dirs);
+    }
+
+    std::string GetTestDir() const {
+        std::string file_system = GetParam();
+        if (file_system == "local") {
+            return paimon::test::GetDataDir();
+        } else if (file_system == "jindo") {
+            return "oss://paimon-unittest/test_data/";
+        }
+        assert(false);
+        return "";
+    }
+
+ private:
+    std::shared_ptr<FileSystem> fs_;
+    std::unique_ptr<UniqueTestDirectory> dir_;
+    std::string test_root_;
+};
+
+TEST(FileSystemStaticTest, TestNoneFileSystemFactory) {
+    std::map<std::string, std::string> fs_options;
+    Result<std::unique_ptr<FileSystem>> fs =
+        FileSystemFactory::Get("not exist", "/tmp", fs_options);
+    ASSERT_TRUE(fs.status().IsInvalid());
+}
+
+TEST_P(FileSystemTest, TestFactoryCreator) {
+    std::vector<std::string> factory_registered_type =
+        FactoryCreator::GetInstance()->GetRegisteredType();
+
+    auto test_registered = [&](const std::string& identifier) {
+        bool is_exist = false;
+        for (auto registered_type : factory_registered_type) {
+            if (registered_type == identifier) {
+                is_exist = true;
+            }
+        }
+        ASSERT_TRUE(is_exist);
+    };
+    test_registered(GetParam());
+}
+
+// --- create
+TEST_P(FileSystemTest, TestCreate) {
+    std::string path = PathUtil::JoinPath(test_root_, "/test_file");
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<OutputStream> out, fs_->Create(path, 
/*overwrite=*/true));
+    ASSERT_TRUE(out);
+    std::string input = "paimon";
+    ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(input.data(), input.size()));
+    ASSERT_EQ(size, input.size());
+    ASSERT_OK(out->Close());
+
+    ASSERT_NOK_WITH_MSG(fs_->Create(path, /*overwrite=*/false), "already 
exists");
+}
+
+// --- write&read
+TEST_P(FileSystemTest, TestSimpleWriteAndRead) {
+    std::string content = "abcdefghijk";
+    std::string file_path = test_root_ + "/file.data";
+    // write process
+    ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, 
/*overwrite=*/true));
+    ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(), 
content.size()));
+    ASSERT_EQ(write_len, content.size());
+
+    ASSERT_OK(out_stream->Flush());
+    ASSERT_OK_AND_ASSIGN(int64_t pos, out_stream->GetPos());
+    ASSERT_EQ(pos, content.size());
+
+    ASSERT_OK_AND_ASSIGN(std::string uri, out_stream->GetUri());
+    ASSERT_EQ(uri, file_path);
+    ASSERT_OK(out_stream->Close());
+
+    // read process
+    ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+    ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+    ASSERT_EQ(pos, 0);
+
+    // read from cur pos
+    std::string read_content(content.size(), '\0');
+    ASSERT_OK_AND_ASSIGN(int32_t read_len,
+                         in_stream->Read(read_content.data(), 
read_content.size()));
+    ASSERT_EQ(read_len, read_content.size());
+    ASSERT_EQ(content, read_content);
+
+    // read from offset
+    ASSERT_OK_AND_ASSIGN(read_len, in_stream->Read(read_content.data(), 
read_content.size(), 0));
+    ASSERT_EQ(read_len, read_content.size());
+    ASSERT_EQ(content, read_content);
+
+    ASSERT_OK_AND_ASSIGN(uri, in_stream->GetUri());
+    ASSERT_EQ(uri, file_path);
+    ASSERT_OK_AND_ASSIGN(uint64_t file_len, in_stream->Length());
+    ASSERT_EQ(file_len, content.size());
+
+    ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+    ASSERT_EQ(pos, content.size());
+    ASSERT_OK(in_stream->Close());
+}
+
+TEST_P(FileSystemTest, TestWriteMultipleTimes) {
+    std::vector<std::string> content_vec = {"abc", "defg", "hi", "j", "k"};
+    std::string content = "abcdefghijk";
+    std::string file_path = test_root_ + "/file.data";
+    // write process
+    ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, 
/*overwrite=*/true));
+    for (const auto& str : content_vec) {
+        ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(str.data(), 
str.size()));
+        ASSERT_EQ(write_len, str.size());
+    }
+    ASSERT_OK(out_stream->Flush());
+    ASSERT_OK_AND_ASSIGN(int64_t pos, out_stream->GetPos());
+    ASSERT_EQ(pos, content.size());
+    ASSERT_OK(out_stream->Close());
+
+    // read process
+    ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+    std::string read_content(content.size(), '\0');
+    ASSERT_OK_AND_ASSIGN(int32_t read_len,
+                         in_stream->Read(read_content.data(), 
read_content.size()));
+    ASSERT_EQ(read_len, read_content.size());
+    ASSERT_EQ(content, read_content);
+}
+
+TEST_P(FileSystemTest, TestWriteInNotExistDir) {
+    std::string file_path = test_root_ + "/no_exist/file.data";
+    // write process
+    std::string content = "abcdefghijk";
+    ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, 
/*overwrite=*/true));
+    ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len,
+                         out_stream->Write(content.data(), content.size()));
+    ASSERT_OK(out_stream->Flush());
+    ASSERT_OK(out_stream->Close());
+
+    // read process
+    ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+    std::string read_content(content.size(), '\0');
+    ASSERT_OK_AND_ASSIGN(int32_t read_len,
+                         in_stream->Read(read_content.data(), 
read_content.size()));
+    ASSERT_EQ(read_len, read_content.size());
+    ASSERT_EQ(content, read_content);
+
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(test_root_ + 
"/no_exist/"));
+    ASSERT_TRUE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestWriteEmptyFile) {
+    std::string file_path = test_root_ + "/file.data";
+    // write process
+    std::string content = "";
+    ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, 
/*overwrite=*/true));
+    ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(), 
content.size()));
+    ASSERT_EQ(write_len, 0);
+    ASSERT_OK(out_stream->Flush());
+    ASSERT_OK(out_stream->Close());
+
+    // get file status
+    ASSERT_OK_AND_ASSIGN(auto st, fs_->GetFileStatus(file_path));
+    ASSERT_EQ(st->GetPath(), file_path);
+    ASSERT_FALSE(st->IsDir());
+    ASSERT_EQ(st->GetLen(), 0);
+    auto modification_time = st->GetModificationTime();
+    ASSERT_GT(modification_time, 10000000000L);
+    ASSERT_LT(modification_time, 10000000000000L);
+
+    // read process
+    ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+    std::string read_content(content.size(), '\0');
+    ASSERT_OK_AND_ASSIGN(int32_t read_len,
+                         in_stream->Read(read_content.data(), 
read_content.size()));
+    ASSERT_EQ(read_len, read_content.size());
+    ASSERT_EQ(content, read_content);
+}
+
+TEST_P(FileSystemTest, TestWriteWithOverwrite) {
+    std::string content = "abcdefghijk";
+    std::string file_path = test_root_ + "/file.data";
+    // write process
+    ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, 
/*overwrite=*/true));
+    ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(), 
content.size()));
+    ASSERT_EQ(write_len, content.size());
+    ASSERT_OK(out_stream->Flush());
+    ASSERT_OK(out_stream->Close());
+
+    // write file which already exist
+    {
+        std::string new_content = "helloworld";
+        ASSERT_OK_AND_ASSIGN(auto out_stream2, fs_->Create(file_path, 
/*overwrite=*/true));
+        ASSERT_OK_AND_ASSIGN(write_len, out_stream2->Write(new_content.data(), 
new_content.size()));
+        ASSERT_EQ(write_len, new_content.size());
+        ASSERT_OK(out_stream2->Flush());
+        ASSERT_OK(out_stream2->Close());
+
+        // read process
+        ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+        std::string read_content(new_content.size(), '\0');
+        ASSERT_OK_AND_ASSIGN(int32_t read_len,
+                             in_stream->Read(read_content.data(), 
read_content.size()));
+        ASSERT_EQ(read_len, read_content.size());
+        ASSERT_EQ(new_content, read_content);
+    }
+    {
+        // test file exist and overwrite = false
+        ASSERT_NOK_WITH_MSG(fs_->Create(file_path, /*overwrite=*/false), "do 
not allow overwrite");
+    }
+}
+
+TEST_P(FileSystemTest, TestAsyncRead) {
+    std::string content = "abcdefghijk";
+    std::string file_path = test_root_ + "/file.data";
+    // write process
+    ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, 
/*overwrite=*/true));
+    ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len,
+                         out_stream->Write(content.data(), content.size()));
+    ASSERT_OK(out_stream->Flush());
+    ASSERT_OK(out_stream->Close());
+
+    // read process
+    ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+    std::string read_content(content.size(), '\0');
+    bool read_finished = false;
+    std::promise<int32_t> promise;
+    std::future<int32_t> future = promise.get_future();
+    auto callback = [&](Status status) {
+        EXPECT_OK(status);
+        if (status.ok()) {
+            read_finished = true;
+            promise.set_value(10);
+        } else {
+            read_finished = false;
+            promise.set_value(20);
+        }
+    };
+    in_stream->ReadAsync(read_content.data(), read_content.size(), 
/*offset=*/0, callback);
+    ASSERT_EQ(future.get(), 10);
+    ASSERT_TRUE(read_finished);
+    ASSERT_EQ(content, read_content);
+    ASSERT_OK(in_stream->Close());
+}
+
+TEST_P(FileSystemTest, TestInvalidRead) {
+    std::string content = "abcdefghijk";
+    std::string file_path = test_root_ + "/file.data";
+    // write process
+    ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, 
/*overwrite=*/true));
+    ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len,
+                         out_stream->Write(content.data(), content.size()));
+    ASSERT_OK(out_stream->Flush());
+    ASSERT_OK(out_stream->Close());
+    {
+        // seek to end, then read
+        ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+        ASSERT_OK(in_stream->Seek(/*offset=*/11, SeekOrigin::FS_SEEK_SET));
+        ASSERT_OK_AND_ASSIGN(auto pos, in_stream->GetPos());
+        ASSERT_EQ(pos, 11);
+        // read from cur pos
+        std::string read_content(3, '\0');
+        ASSERT_NOK(in_stream->Read(read_content.data(), read_content.size()));
+        ASSERT_OK_AND_ASSIGN(size_t actual_read, 
in_stream->Read(read_content.data(), 0));
+        ASSERT_EQ(actual_read, 0);
+    }
+    {
+        // read invalid bulk data
+        ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+        std::string read_content(20, '\0');
+        ASSERT_NOK(in_stream->Read(read_content.data(), read_content.size()));
+        ASSERT_OK_AND_ASSIGN(auto pos, in_stream->GetPos());
+        ASSERT_EQ(pos, 11);
+    }
+    {
+        // read invalid with oversize offset
+        ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+        std::string read_content(4, '\0');
+        ASSERT_NOK(in_stream->Read(read_content.data(), read_content.size(), 
/*offset=*/20));
+    }
+}
+
+TEST_P(FileSystemTest, TestInvalidAsyncRead) {
+    std::string content = "abcdefghijk";
+    std::string file_path = test_root_ + "/file.data";
+    // write process
+    ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, 
/*overwrite=*/true));
+    ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len,
+                         out_stream->Write(content.data(), content.size()));
+    ASSERT_OK(out_stream->Flush());
+    ASSERT_OK(out_stream->Close());
+
+    {
+        // test read overflow
+        ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+        std::string read_content(20, '\0');
+        bool read_finished = false;
+        std::promise<int> promise;
+        std::future<int> future = promise.get_future();
+        auto callback = [&](Status status) {
+            EXPECT_NOK(status);
+            if (status.ok()) {
+                read_finished = true;
+                promise.set_value(10);
+            } else {
+                read_finished = false;
+                promise.set_value(20);
+            }
+        };
+        // invalid async read
+        in_stream->ReadAsync(read_content.data(), read_content.size(), 
/*offset=*/0, callback);
+        ASSERT_EQ(future.get(), 20);
+        ASSERT_FALSE(read_finished);
+        ASSERT_NE(read_content, content);
+        ASSERT_OK(in_stream->Close());
+    }
+    {
+        // test read with invalid offset
+        ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+        std::string read_content(content.size(), '\0');
+        bool read_finished = false;
+        std::promise<int> promise;
+        std::future<int> future = promise.get_future();
+        auto callback = [&](Status status) {
+            EXPECT_NOK(status);
+            if (status.ok()) {
+                read_finished = true;
+                promise.set_value(10);
+            } else {
+                read_finished = false;
+                promise.set_value(20);
+            }
+        };
+        // invalid async read
+        in_stream->ReadAsync(read_content.data(), read_content.size(), 
/*offset=*/20, callback);
+        ASSERT_EQ(future.get(), 20);
+        ASSERT_FALSE(read_finished);
+        ASSERT_NE(read_content, content);
+        ASSERT_OK(in_stream->Close());
+    }
+}
+
+TEST_P(FileSystemTest, TestReadAndWriteAndAtomicStoreFile) {
+    std::string file_path = test_root_ + "/file.data";
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+    ASSERT_FALSE(is_exist);
+
+    std::string content = "content";
+    ASSERT_OK(fs_->AtomicStore(file_path, content));
+    std::string read_content;
+    ASSERT_OK(fs_->ReadFile(file_path, &read_content));
+    ASSERT_EQ(read_content, content);
+
+    std::string new_content = "hello world";
+    ASSERT_OK(fs_->WriteFile(file_path, new_content, /*overwrite=*/true));
+    ASSERT_OK(fs_->ReadFile(file_path, &read_content));
+    ASSERT_EQ(read_content, new_content);
+
+    ASSERT_NOK_WITH_MSG(fs_->WriteFile(file_path, content, 
/*overwrite=*/false),
+                        "do not allow overwrite");
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+    ASSERT_TRUE(is_exist);
+
+    ASSERT_NOK_WITH_MSG(fs_->AtomicStore(file_path, content), "dst file 
already exist");
+    ASSERT_OK(fs_->ReadFile(file_path, &read_content));
+    ASSERT_EQ(read_content, new_content);
+}
+
+TEST(FileSystemStaticTest, TestIsObjectStore) {
+    ASSERT_OK_AND_ASSIGN(bool is_object_store, 
FileSystem::IsObjectStore("file:///tmp/test.txt"));
+    ASSERT_FALSE(is_object_store);
+    ASSERT_OK_AND_ASSIGN(is_object_store, 
FileSystem::IsObjectStore("/tmp/test.txt"));
+    ASSERT_FALSE(is_object_store);
+    ASSERT_OK_AND_ASSIGN(is_object_store, 
FileSystem::IsObjectStore("dfs://tmp/test.txt"));
+    ASSERT_FALSE(is_object_store);
+    ASSERT_OK_AND_ASSIGN(is_object_store, 
FileSystem::IsObjectStore("hdfs://tmp/test.txt"));
+    ASSERT_FALSE(is_object_store);
+
+    ASSERT_OK_AND_ASSIGN(is_object_store, 
FileSystem::IsObjectStore("oss://bucket/test.txt"));
+    ASSERT_TRUE(is_object_store);
+    ASSERT_OK_AND_ASSIGN(is_object_store, 
FileSystem::IsObjectStore("s3://bucket/test.txt"));
+    ASSERT_TRUE(is_object_store);
+}
+
+// --- seek
+TEST_P(FileSystemTest, TestSeek) {
+    std::string path = PathUtil::JoinPath(test_root_, "/test_file");
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<OutputStream> out, fs_->Create(path, 
/*overwrite=*/true));
+    std::string input = "paimon";
+    ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(input.data(), input.size()));
+    ASSERT_EQ(size, input.size());
+    ASSERT_OK(out->Close());
+
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<InputStream> in, fs_->Open(path));
+    ASSERT_OK(in->Seek(/*offset=*/1, SeekOrigin::FS_SEEK_SET));
+    ASSERT_OK_AND_ASSIGN(int64_t pos, in->GetPos());
+    ASSERT_EQ(pos, 1);
+
+    ASSERT_OK(in->Seek(/*offset=*/1, SeekOrigin::FS_SEEK_CUR));
+    ASSERT_OK_AND_ASSIGN(int64_t pos2, in->GetPos());
+    ASSERT_EQ(pos2, 2);
+
+    ASSERT_OK(in->Seek(/*offset=*/-5, SeekOrigin::FS_SEEK_END));
+    ASSERT_OK_AND_ASSIGN(int64_t pos3, in->GetPos());
+    ASSERT_EQ(pos3, 1);
+
+    std::string read_content(3, '\0');
+    ASSERT_OK_AND_ASSIGN(int32_t read_len, in->Read(read_content.data(), 
read_content.size()));
+    ASSERT_EQ(read_len, read_content.size());
+    ASSERT_EQ("aim", read_content);
+
+    // read from offset
+    ASSERT_OK_AND_ASSIGN(read_len,
+                         in->Read(read_content.data(), read_content.size(), 
/*offset=*/2));
+    ASSERT_EQ(read_len, read_content.size());
+    ASSERT_EQ("imo", read_content);
+
+    ASSERT_OK_AND_ASSIGN(pos, in->GetPos());
+    ASSERT_EQ(pos, 4);
+}
+
+TEST_P(FileSystemTest, TestSeek2) {
+    std::string content = "abcdefghijk";
+    std::string file_path = test_root_ + "/file.data";
+    // write process
+    ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path, 
/*overwrite=*/true));
+    ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(), 
content.size()));
+    ASSERT_EQ(write_len, content.size());
+    ASSERT_OK(out_stream->Flush());
+    ASSERT_OK(out_stream->Close());
+
+    // read process
+    ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+    ASSERT_OK_AND_ASSIGN(auto pos, in_stream->GetPos());
+    ASSERT_EQ(pos, 0);
+
+    // valid seek
+    ASSERT_OK(in_stream->Seek(/*offset=*/2, SeekOrigin::FS_SEEK_SET));
+    ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+    ASSERT_EQ(pos, 2);
+
+    ASSERT_OK(in_stream->Seek(/*offset=*/4, SeekOrigin::FS_SEEK_CUR));
+    ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+    ASSERT_EQ(pos, 6);
+
+    ASSERT_OK(in_stream->Seek(/*offset=*/-3, SeekOrigin::FS_SEEK_END));
+    ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+    ASSERT_EQ(pos, 8);
+
+    // read from cur pos
+    std::string read_content(3, '\0');
+    ASSERT_OK_AND_ASSIGN(int32_t read_len,
+                         in_stream->Read(read_content.data(), 
read_content.size()));
+    ASSERT_EQ(read_len, read_content.size());
+    ASSERT_EQ("ijk", read_content);
+
+    // read from offset
+    ASSERT_OK_AND_ASSIGN(read_len,
+                         in_stream->Read(read_content.data(), 
read_content.size(), /*offset=*/4));
+    ASSERT_EQ(read_len, read_content.size());
+    ASSERT_EQ("efg", read_content);
+
+    ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+    ASSERT_EQ(pos, 11);
+    ASSERT_OK(in_stream->Close());
+}
+
+// --- rename
+TEST_P(FileSystemTest, TestRename) {
+    std::string path = PathUtil::JoinPath(test_root_, "/test_file");
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<OutputStream> out, fs_->Create(path, 
/*overwrite=*/true));
+    ASSERT_TRUE(out);
+    std::string input = "paimon";
+    ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(input.data(), input.size()));
+    ASSERT_EQ(size, input.size());
+    ASSERT_OK(out->Flush());
+    ASSERT_OK(out->Close());
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<InputStream> in, fs_->Open(path));
+    ASSERT_TRUE(in);
+    char* data = new char[input.size() * 2];
+    ASSERT_OK_AND_ASSIGN(int32_t size_read, in->Read(data, input.size(), 
/*offset=*/0));
+    ASSERT_EQ(size_read, input.size());
+    std::string read_data(data, input.size());
+    ASSERT_EQ(read_data, input);
+    delete[] data;
+    ASSERT_OK(in->Close());
+
+    std::string path2 = PathUtil::JoinPath(test_root_, "/test_file_renamed");
+    ASSERT_OK(fs_->Rename(path, path2));
+
+    std::string no_exist_path = PathUtil::JoinPath(test_root_, 
"/no_exist_file");
+    ASSERT_NOK_WITH_MSG(
+        fs_->Rename(no_exist_path, PathUtil::JoinPath(test_root_, 
"/no_exist_file_renamed")),
+        "src file not exist");
+
+    ASSERT_NOK_WITH_MSG(
+        fs_->Rename(path2, PathUtil::JoinPath(test_root_, 
"/wrong_path_file_renamed/")),
+        "src file is not a dir");
+}
+
+TEST_P(FileSystemTest, TestRename2) {
+    {
+        // test rename dir
+        std::string dir_path = test_root_ + "/file_dir/";
+        std::string dir_path2 = test_root_ + "/file_dir2/";
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+        ASSERT_OK(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path2));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+        ASSERT_TRUE(is_exist);
+    }
+    {
+        // test rename itself
+        std::string dir_path = test_root_ + "/file_dir_itself/";
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+        ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path),
+                            "dst file already exist");
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+    }
+    {
+        // test rename from non-exist dir
+        std::string dir_path = test_root_ + "/non_exist_file_dir/";
+        std::string dir_path2 = test_root_ + "/non_exist_file_dir2/";
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path2), 
"src file not exist");
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+        ASSERT_FALSE(is_exist);
+    }
+    {
+        // test rename to exist dir
+        std::string dir_path = test_root_ + "/file_src_dir/";
+        std::string dir_path2 = test_root_ + "/file_dst_dir/";
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK(fs_->Mkdirs(dir_path2));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+        ASSERT_TRUE(is_exist);
+
+        ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path2),
+                            "dst file already exist");
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+        ASSERT_TRUE(is_exist);
+    }
+    {
+        // test rename file
+        std::string file_path = test_root_ + "/file1/file2/file3";
+        ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_TRUE(is_exist);
+        std::string file_path2 = test_root_ + "/file1/file4";
+        ASSERT_OK(fs_->Rename(/*src=*/file_path, /*dst=*/file_path2));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+        ASSERT_FALSE(is_exist);
+
+        std::string data;
+        ASSERT_OK(fs_->ReadFile(file_path2, &data));
+        ASSERT_EQ(data, "content");
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + 
"/file1/file2/"));
+        ASSERT_TRUE(is_exist);
+    }
+    {
+        // test rename file to exist file
+        std::string file_path = test_root_ + "/file11/file12/file13";
+        ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_TRUE(is_exist);
+        std::string file_path2 = test_root_ + "/file15/file16/file13";
+        ASSERT_OK(fs_->WriteFile(file_path2, "HelloWorld", 
/*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path2));
+        ASSERT_TRUE(is_exist);
+
+        ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/file_path, /*dst=*/file_path2),
+                            "dst file already exist");
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+        ASSERT_TRUE(is_exist);
+        std::string data;
+        ASSERT_OK(fs_->ReadFile(file_path2, &data));
+        ASSERT_EQ(data, "HelloWorld");
+    }
+    {
+        // test rename from non-exist file
+        std::string file_path = test_root_ + "/non_exist_file";
+        std::string file_path2 = test_root_ + "/non_exist_file2";
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/file_path, /*dst=*/file_path2),
+                            "src file not exist");
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path2));
+        ASSERT_FALSE(is_exist);
+    }
+}
+
+// --- exists
+TEST_P(FileSystemTest, TestFileExists) {
+    std::string file_path = CreateRandomFileInDirectory(test_root_);
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+    ASSERT_TRUE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestFileDoesNotExist) {
+    std::string path = PathUtil::JoinPath(test_root_, RandomName());
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(path));
+    ASSERT_FALSE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestExists) {
+    std::string file_path = test_root_ + "/file.data";
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+    ASSERT_FALSE(is_exist);
+
+    ASSERT_OK(fs_->WriteFile(file_path, "/content", /*overwrite=*/false));
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+    ASSERT_TRUE(is_exist);
+
+    std::string dir_path = test_root_ + "/file_dir/";
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+    ASSERT_FALSE(is_exist);
+
+    ASSERT_OK(fs_->WriteFile(dir_path + "/file.data", "content", 
/*overwrite=*/false));
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+    ASSERT_TRUE(is_exist);
+}
+
+// --- delete
+TEST_P(FileSystemTest, TestExistingFileDeletion) {
+    auto check = [&](bool recursive) {
+        std::string file_path = CreateRandomFileInDirectory(test_root_);
+        ASSERT_OK(fs_->Delete(file_path, recursive));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_FALSE(is_exist);
+    };
+    check(true);
+    check(false);
+}
+
+TEST_P(FileSystemTest, TestNotExistingFileDeletion) {
+    auto check = [&](bool recursive) {
+        std::string path = PathUtil::JoinPath(test_root_, RandomName());
+        ASSERT_TRUE(fs_->Delete(path, recursive).IsIOError());
+    };
+    check(true);
+    check(false);
+}
+
+TEST_P(FileSystemTest, TestExistingEmptyDirectoryDeletion) {
+    auto check = [&](bool recursive) {
+        std::string path = PathUtil::JoinPath(test_root_, RandomName());
+        ASSERT_OK(fs_->Mkdirs(path));
+        ASSERT_OK(fs_->Delete(path, recursive));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(path));
+        ASSERT_FALSE(is_exist);
+    };
+    check(true);
+    check(false);
+}
+
+TEST_P(FileSystemTest, TestExistingNonEmptyDirectoryRecursiveDeletion) {
+    {
+        std::string file_path = CreateRandomFileInDirectory(test_root_);
+        ASSERT_OK(fs_->Delete(test_root_, /*recursive=*/true));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_FALSE(is_exist);
+    }
+    {
+        std::string file_path = CreateRandomFileInDirectory(test_root_);
+        ASSERT_NOK(fs_->Delete(test_root_, /*recursive=*/false));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_TRUE(is_exist);
+    }
+}
+
+TEST_P(FileSystemTest, 
TestExistingNonEmptyDirectoryWithSubDirRecursiveDeletion) {
+    {
+        std::string level1_subdir_with_file = PathUtil::JoinPath(test_root_, 
RandomName());
+        const std::string file_in_level1_subdir =
+            CreateRandomFileInDirectory(level1_subdir_with_file);
+        std::string level2_subdir_with_file =
+            PathUtil::JoinPath(level1_subdir_with_file, RandomName());
+        const std::string file_in_level2_subdir =
+            CreateRandomFileInDirectory(level2_subdir_with_file);
+        ASSERT_OK(fs_->Delete(test_root_, /*recursive=*/true));
+        ASSERT_FALSE(fs_->Exists(file_in_level1_subdir).value());
+        ASSERT_FALSE(fs_->Exists(level2_subdir_with_file).value());
+        ASSERT_FALSE(fs_->Exists(file_in_level2_subdir).value());
+    }
+    {
+        std::string level1_subdir_with_file = PathUtil::JoinPath(test_root_, 
RandomName());
+        const std::string file_in_level1_subdir =
+            CreateRandomFileInDirectory(level1_subdir_with_file);
+        std::string level2_subdir_with_file =
+            PathUtil::JoinPath(level1_subdir_with_file, RandomName());
+        const std::string file_in_level2_subdir =
+            CreateRandomFileInDirectory(level2_subdir_with_file);
+        ASSERT_NOK(fs_->Delete(test_root_, /*recursive=*/false));
+        ASSERT_TRUE(fs_->Exists(file_in_level1_subdir).value());
+        ASSERT_TRUE(fs_->Exists(level2_subdir_with_file).value());
+        ASSERT_TRUE(fs_->Exists(file_in_level2_subdir).value());
+    }
+}
+
+TEST_P(FileSystemTest, TestDelete) {
+    {
+        std::string dir_path = test_root_ + "/file_dir/";
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+        ASSERT_OK(fs_->Delete(dir_path, /*recursive=*/true));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+    }
+    {
+        // test recursive delete
+        std::string dir_path = test_root_ + "/file_dir1/file_dir2/file_dir3";
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+
+        // rm file_dir1/file_dir2/file_dir3/
+        ASSERT_OK(fs_->Delete(dir_path, /*recursive=*/true));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + 
"/file_dir1/"));
+        ASSERT_TRUE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + 
"/file_dir1/file_dir2"));
+        ASSERT_TRUE(is_exist);
+
+        // rm file_dir1/
+        ASSERT_OK(fs_->Delete(test_root_ + "/file_dir1/", /*recursive=*/true));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + 
"/file_dir1/"));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + 
"/file_dir1/file_dir2"));
+        ASSERT_FALSE(is_exist);
+    }
+    {
+        // test delete file
+        std::string file_path = test_root_ + "/file1/file2/file3";
+        ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_TRUE(is_exist);
+
+        ASSERT_OK(fs_->Delete(file_path, /*recursive=*/true));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + 
"/file1/file2/"));
+        ASSERT_TRUE(is_exist);
+
+        ASSERT_NOK_WITH_MSG(fs_->Delete(test_root_ + "/file1", 
/*recursive=*/false),
+                            "is not empty");
+    }
+    {
+        // test recursive delete
+        std::string file_path = test_root_ + "/file1/file2/file3";
+        ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_TRUE(is_exist);
+
+        ASSERT_OK(fs_->Delete(test_root_ + "/file1/", /*recursive=*/true));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + 
"/file1/file2/"));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file1/"));
+        ASSERT_FALSE(is_exist);
+    }
+    {
+        // test recursive delete
+        std::string dir_path = test_root_ + 
"/file_recursive_dir1/file_dir2/file_dir3";
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+
+        ASSERT_NOK_WITH_MSG(fs_->Delete(test_root_ + "/file_recursive_dir1", 
/*recursive=*/false),
+                            "is not empty");
+    }
+    {
+        // test delete non-exist file
+        std::string file_path = test_root_ + "/file_non_exist";
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_NOK(fs_->Delete(file_path, /*recursive=*/false));
+    }
+}
+
+// --- mkdirs
+TEST_P(FileSystemTest, TestMkdirsReturnsTrueWhenCreatingDirectory) {
+    ASSERT_OK(fs_->Mkdirs(test_root_));
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(test_root_));
+    ASSERT_TRUE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestMkdirsCreatesParentDirectories) {
+    std::string deep_path = PathUtil::JoinPath(test_root_, RandomName());
+    ASSERT_OK(fs_->Mkdirs(deep_path));
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(deep_path));
+    ASSERT_TRUE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestMkdirsReturnsTrueForExistingDirectory) {
+    std::string file_path = CreateRandomFileInDirectory(test_root_);
+    ASSERT_OK(fs_->Mkdirs(test_root_));
+}
+
+TEST_P(FileSystemTest, TestMkdirsFailsForExistingFile) {
+    std::string file_path = CreateRandomFileInDirectory(test_root_);
+    auto status = fs_->Mkdirs(file_path);
+    ASSERT_TRUE(status.IsIOError());
+}
+
+TEST_P(FileSystemTest, TestMkdirsFailsWithExistingParentFile) {
+    std::string file_path = CreateRandomFileInDirectory(test_root_);
+    std::string dir_under_file = PathUtil::JoinPath(file_path, RandomName());
+    ASSERT_TRUE(fs_->Mkdirs(dir_under_file).IsIOError());
+}
+
+TEST_P(FileSystemTest, TestMkdir) {
+    ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp.txt/tmpB"));
+    ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmpA/tmpB/"));
+
+    ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp/local/f/1"));
+    ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1"));
+    ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1/f2/"));
+    ASSERT_OK(fs_->Mkdirs("/"));
+    ASSERT_NOK_WITH_MSG(fs_->Mkdirs(""), "path is an empty string.");
+}
+
+TEST_P(FileSystemTest, TestMkdir2) {
+    {
+        std::string dir_path = test_root_ + "/file_dir/";
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+    }
+    {
+        // test recursive mkdir
+        std::string dir_path = test_root_ + "/file_dir1/file_dir2/file_dir3";
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + 
"/file_dir1/"));
+        ASSERT_TRUE(is_exist);
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + 
"/file_dir1/file_dir2"));
+        ASSERT_TRUE(is_exist);
+    }
+}
+
+// test for create multi dir such as "/table/partition1/bucket1" and 
"/table/partition1/bucket2"
+TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameNonExistParentDir) {
+    uint32_t runs_count = 10;
+    uint32_t thread_count = 10;
+    auto executor = CreateDefaultExecutor(thread_count);
+
+    for (uint32_t i = 0; i < runs_count; i++) {
+        std::string uuid;
+        ASSERT_TRUE(UUID::Generate(&uuid));
+        std::vector<std::future<void>> futures;
+        for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++) 
{
+            futures.push_back(Via(executor.get(), [this, &uuid, thread_idx]() 
-> void {
+                std::string dir_path =
+                    PathUtil::JoinPath(test_root_, uuid + "/" + 
std::to_string(thread_idx));
+                ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+                ASSERT_FALSE(is_exist);
+                ASSERT_OK(fs_->Mkdirs(dir_path));
+                ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+                ASSERT_TRUE(is_exist);
+            }));
+        }
+        Wait(futures);
+    }
+}
+
+// test for create multi dir such as "/table/partition1" and 
"/table/partition1"
+TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameName) {
+    uint32_t runs_count = 10;
+    uint32_t thread_count = 10;
+    auto executor = CreateDefaultExecutor(thread_count);
+
+    for (uint32_t i = 0; i < runs_count; i++) {
+        std::string uuid;
+        ASSERT_TRUE(UUID::Generate(&uuid));
+        std::vector<std::future<void>> futures;
+        for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++) 
{
+            futures.push_back(Via(executor.get(), [this, &uuid]() -> void {
+                std::string dir_path = PathUtil::JoinPath(test_root_, uuid);
+                ASSERT_OK(fs_->Mkdirs(dir_path));
+                ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+                ASSERT_TRUE(is_exist);
+            }));
+        }
+        Wait(futures);
+    }
+}
+
+// test for create multi dir such as "partition1" and "partition1" (relative 
path)
+TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameNameWithRelativePath) {
+    uint32_t runs_count = 10;
+    uint32_t thread_count = 10;
+    auto executor = CreateDefaultExecutor(thread_count);
+
+    for (uint32_t i = 0; i < runs_count; i++) {
+        std::string uuid;
+        ASSERT_TRUE(UUID::Generate(&uuid));
+        std::vector<std::future<void>> futures;
+        for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++) 
{
+            futures.push_back(Via(executor.get(), [this, &uuid]() -> void {
+                std::string dir_path = uuid;
+                ASSERT_OK(fs_->Mkdirs(dir_path));
+                ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+                ASSERT_TRUE(is_exist);
+            }));
+        }
+        Wait(futures);
+    }
+}
+
+TEST_P(FileSystemTest, TestInvalidMkdir) {
+    {
+        // test mkdir with one exist dir
+        std::string dir_path = test_root_ + "/file_dir/";
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+    }
+    {
+        // test mkdir with one exist file with same name
+        std::string file_path = test_root_ + "/file_dir/file.data";
+        ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_TRUE(is_exist);
+
+        ASSERT_NOK_WITH_MSG(fs_->Mkdirs(file_path), "already exists");
+    }
+}
+
+// --- file status
+TEST_P(FileSystemTest, TestGetFileStatus1) {
+    {
+        // test dir simple
+        std::string dir_path = test_root_ + "/file_dir/";
+        ASSERT_OK(fs_->Mkdirs(dir_path));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_TRUE(is_exist);
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStatus> st, 
fs_->GetFileStatus(dir_path));
+        ASSERT_EQ(RemoveLastSlashInPath(st->GetPath()), 
RemoveLastSlashInPath(dir_path));
+        ASSERT_TRUE(st->IsDir());
+        auto modification_time = st->GetModificationTime();
+        ASSERT_GT(modification_time, 10000000000L);
+        ASSERT_LT(modification_time, 10000000000000L);
+
+        std::string file_path = test_root_ + "/file_dir/file.data";
+        ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+        ASSERT_TRUE(is_exist);
+
+        // check meta in dir
+        ASSERT_OK_AND_ASSIGN(st, fs_->GetFileStatus(dir_path));
+        ASSERT_EQ(RemoveLastSlashInPath(st->GetPath()), 
RemoveLastSlashInPath(dir_path));
+        ASSERT_TRUE(st->IsDir());
+        modification_time = st->GetModificationTime();
+        ASSERT_GT(modification_time, 10000000000L);
+        ASSERT_LT(modification_time, 10000000000000L);
+    }
+    {
+        // test non-exist dir
+        std::string dir_path = test_root_ + "/non_exist_file_dir/";
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_NOK(fs_->GetFileStatus(dir_path));
+    }
+    {
+        // test file simple
+        std::string content = "content";
+        std::string file_path = test_root_ + "/file.data";
+        ASSERT_OK(fs_->WriteFile(file_path, content, /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_TRUE(is_exist);
+
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStatus> st, 
fs_->GetFileStatus(file_path));
+        ASSERT_EQ(st->GetPath(), file_path);
+        ASSERT_FALSE(st->IsDir());
+        ASSERT_EQ(st->GetLen(), content.size());
+        auto modification_time = st->GetModificationTime();
+        ASSERT_GT(modification_time, 10000000000L);
+        ASSERT_LT(modification_time, 10000000000000L);
+    }
+    {
+        // test non-exist file
+        std::string dir_path = test_root_ + "/non_exist_file";
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+        ASSERT_FALSE(is_exist);
+        ASSERT_NOK(fs_->GetFileStatus(dir_path));
+    }
+}
+
+TEST_P(FileSystemTest, TestGetFileStatus2) {
+    const std::string test_path = GetTestDir() + "orc/append_09.db/append_09";
+    {
+        // input is a dir, with a trailing '/'
+        std::string dir_name = test_path + "/";
+        std::vector<std::unique_ptr<FileStatus>> status_list;
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStatus> file_status, 
fs_->GetFileStatus(dir_name));
+        status_list.emplace_back(std::move(file_status));
+        CheckFileStatus(status_list, /*expected_files=*/{}, 
/*expected_dirs=*/{dir_name});
+    }
+    {
+        // input is a dir, without a trailing '/'
+        std::vector<std::unique_ptr<FileStatus>> status_list;
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStatus> file_status,
+                             fs_->GetFileStatus(test_path));
+        status_list.emplace_back(std::move(file_status));
+        CheckFileStatus(status_list, /*expected_files=*/{}, 
/*expected_dirs=*/{test_path});
+    }
+    {
+        // input is a file
+        std::vector<std::unique_ptr<FileStatus>> status_list;
+        std::string file_name = PathUtil::JoinPath(test_path, "README");
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStatus> file_status,
+                             fs_->GetFileStatus(file_name));
+        status_list.emplace_back(std::move(file_status));
+        CheckFileStatus(status_list, /*expected_files=*/{file_name}, 
/*expected_dirs=*/{});
+    }
+    {
+        // input is not exist
+        std::vector<std::unique_ptr<FileStatus>> status_list;
+        ASSERT_NOK(fs_->GetFileStatus(PathUtil::JoinPath(test_path, 
"NOT_EXIST")));
+    }
+}
+
+TEST_P(FileSystemTest, TestInvalidListFileStatus) {
+    {
+        // list non exist dir will return ok
+        std::vector<std::unique_ptr<FileStatus>> file_status_list;
+        ASSERT_OK(fs_->ListFileStatus(test_root_ + "/non-exist/", 
&file_status_list));
+        ASSERT_TRUE(file_status_list.empty());
+    }
+    {
+        // test data path
+        std::string file_path = test_root_ + "/file.data";
+        ASSERT_OK(fs_->WriteFile(file_path, "hello", /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+        ASSERT_TRUE(is_exist);
+        std::vector<std::unique_ptr<FileStatus>> file_status_list;
+        ASSERT_OK(fs_->ListFileStatus(file_path, &file_status_list));
+        ASSERT_EQ(file_status_list[0]->GetPath(), file_path);
+        ASSERT_EQ(file_status_list[0]->GetLen(), 5);
+        ASSERT_FALSE(file_status_list[0]->IsDir());
+    }
+}
+
+TEST_P(FileSystemTest, TestListFileStatus1) {
+    std::string file_path = test_root_ + "/file_dir1/file.data";
+    ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+    ASSERT_TRUE(is_exist);
+
+    std::vector<std::unique_ptr<FileStatus>> file_status_list;
+    ASSERT_OK(fs_->ListFileStatus(test_root_, &file_status_list));
+    ASSERT_EQ(file_status_list.size(), 1);
+    std::set<std::string> expected_dirs = {test_root_ + "/file_dir1/"};
+    CheckFileStatus(file_status_list, /*expected_files=*/{}, expected_dirs);
+
+    auto dir_path2 = test_root_ + "/file_dir2/";
+    ASSERT_OK(fs_->Mkdirs(dir_path2));
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+    ASSERT_TRUE(is_exist);
+
+    auto dir_path3 = test_root_ + "/file_dir1/file_dir3/";
+    ASSERT_OK(fs_->Mkdirs(dir_path3));
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path3));
+    ASSERT_TRUE(is_exist);
+
+    std::string file_path2 = test_root_ + "/file_dir1/second_file.data";
+    ASSERT_OK(fs_->WriteFile(file_path2, "hello!", /*overwrite=*/false));
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path2));
+    ASSERT_TRUE(is_exist);
+
+    std::vector<std::unique_ptr<FileStatus>> file_status_list2;
+    ASSERT_OK(fs_->ListFileStatus(test_root_, &file_status_list2));
+    ASSERT_EQ(file_status_list2.size(), 2);
+    expected_dirs = {test_root_ + "/file_dir1/", dir_path2};
+    CheckFileStatus(file_status_list2, /*expected_files=*/{}, expected_dirs);
+
+    std::vector<std::unique_ptr<FileStatus>> file_status_list3;
+    ASSERT_OK(fs_->ListFileStatus(test_root_ + "/file_dir1/", 
&file_status_list3));
+    ASSERT_EQ(file_status_list3.size(), 3);
+    expected_dirs = {dir_path3};
+    std::set<std::string> expected_files = {file_path, file_path2};
+    CheckFileStatus(file_status_list3, expected_files, expected_dirs);
+}
+
+TEST_P(FileSystemTest, TestListFileStatus2) {
+    const std::string test_path = GetTestDir() + "orc/append_09.db/append_09";
+    const std::set<std::string> expected_files = 
{PathUtil::JoinPath(test_path, "README")};
+    const std::set<std::string> expected_dirs = {
+        PathUtil::JoinPath(test_path, "f1=10"),    
PathUtil::JoinPath(test_path, "commit_messages"),
+        PathUtil::JoinPath(test_path, "f1=20"),    
PathUtil::JoinPath(test_path, "data_splits"),
+        PathUtil::JoinPath(test_path, "manifest"), 
PathUtil::JoinPath(test_path, "schema"),
+        PathUtil::JoinPath(test_path, "snapshot")};
+    {
+        // input is a dir, with a trailing '/'
+        std::vector<std::unique_ptr<FileStatus>> status_list;
+        ASSERT_OK(fs_->ListFileStatus(test_path + "/", &status_list));
+        CheckFileStatus(status_list, expected_files, expected_dirs);
+    }
+    {
+        // input is a dir, without a trailing '/'
+        std::vector<std::unique_ptr<FileStatus>> status_list;
+        ASSERT_OK(fs_->ListFileStatus(test_path, &status_list));
+        CheckFileStatus(status_list, expected_files, expected_dirs);
+    }
+    {
+        // input is a file
+        std::vector<std::unique_ptr<FileStatus>> status_list;
+        ASSERT_OK(fs_->ListFileStatus(PathUtil::JoinPath(test_path, "README"), 
&status_list));
+        CheckFileStatus(status_list, expected_files, /*expected_dirs=*/{});
+    }
+    {
+        // input is not exist
+        std::vector<std::unique_ptr<FileStatus>> status_list;
+        ASSERT_OK(fs_->ListFileStatus(PathUtil::JoinPath(test_path, 
"NOT_EXIST"), &status_list));
+        CheckFileStatus(status_list, /*expected_files=*/{}, 
/*expected_dirs=*/{});
+    }
+}
+
+TEST_P(FileSystemTest, TestListDir1) {
+    std::string file_path = test_root_ + "/file_dir1/file.data";
+    ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+    ASSERT_TRUE(is_exist);
+
+    auto dir_path1 = test_root_ + "/file_dir1/";
+    std::vector<std::unique_ptr<BasicFileStatus>> file_status_list;
+    ASSERT_OK(fs_->ListDir(test_root_, &file_status_list));
+    ASSERT_EQ(file_status_list.size(), 1);
+    std::set<std::string> expected_dirs = {dir_path1};
+    CheckBasicFileStatus(file_status_list, std::set<std::string>(), 
expected_dirs);
+
+    auto dir_path2 = test_root_ + "/file_dir2/";
+    ASSERT_OK(fs_->Mkdirs(dir_path2));
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+    ASSERT_TRUE(is_exist);
+
+    auto dir_path3 = test_root_ + "/file_dir1/file_dir3/";
+    ASSERT_OK(fs_->Mkdirs(dir_path3));
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path3));
+    ASSERT_TRUE(is_exist);
+
+    std::vector<std::unique_ptr<BasicFileStatus>> file_status_list2;
+    ASSERT_OK(fs_->ListDir(test_root_, &file_status_list2));
+    ASSERT_EQ(file_status_list2.size(), 2);
+    expected_dirs = {dir_path1, dir_path2};
+    CheckBasicFileStatus(file_status_list2, std::set<std::string>(), 
expected_dirs);
+
+    std::vector<std::unique_ptr<BasicFileStatus>> file_status_list3;
+    ASSERT_OK(fs_->ListDir(dir_path1, &file_status_list3));
+    ASSERT_EQ(file_status_list3.size(), 2);
+    std::set<std::string> expected_files = {file_path};
+    expected_dirs = {dir_path3};
+    CheckBasicFileStatus(file_status_list3, expected_files, expected_dirs);
+
+    // list non exist dir will return ok
+    std::vector<std::unique_ptr<BasicFileStatus>> file_status_list4;
+    ASSERT_OK(fs_->ListDir(test_root_ + "/non-exist/", &file_status_list4));
+    ASSERT_TRUE(file_status_list4.empty());
+
+    // list invalid path, a data file path
+    std::vector<std::unique_ptr<BasicFileStatus>> file_status_list5;
+    ASSERT_NOK_WITH_MSG(fs_->ListDir(file_path, &file_status_list5), "is not a 
directory");
+}
+
+TEST_P(FileSystemTest, TestListDir2) {
+    const std::string test_path = GetTestDir() + "orc/append_09.db/append_09";
+    const std::set<std::string> expected_files = 
{PathUtil::JoinPath(test_path, "README")};
+    const std::set<std::string> expected_dirs = {
+        PathUtil::JoinPath(test_path, "f1=10"),    
PathUtil::JoinPath(test_path, "commit_messages"),
+        PathUtil::JoinPath(test_path, "f1=20"),    
PathUtil::JoinPath(test_path, "data_splits"),
+        PathUtil::JoinPath(test_path, "manifest"), 
PathUtil::JoinPath(test_path, "schema"),
+        PathUtil::JoinPath(test_path, "snapshot")};
+    {
+        // input is a dir, with a trailing '/'
+        std::vector<std::unique_ptr<BasicFileStatus>> status_list;
+        ASSERT_OK(fs_->ListDir(test_path + "/", &status_list));
+        CheckBasicFileStatus(status_list, expected_files, expected_dirs);
+    }
+    {
+        // input is a dir, without a trailing '/'
+        std::vector<std::unique_ptr<BasicFileStatus>> status_list;
+        ASSERT_OK(fs_->ListDir(test_path, &status_list));
+        CheckBasicFileStatus(status_list, expected_files, expected_dirs);
+    }
+    {
+        // input is a file
+        std::vector<std::unique_ptr<BasicFileStatus>> status_list;
+        ASSERT_NOK_WITH_MSG(fs_->ListDir(PathUtil::JoinPath(test_path, 
"README"), &status_list),
+                            "file " + PathUtil::JoinPath(test_path, "README") +
+                                " already exists and is not a directory");
+    }
+    {
+        // input is not exist
+        std::vector<std::unique_ptr<BasicFileStatus>> status_list;
+        ASSERT_OK(fs_->ListDir(PathUtil::JoinPath(test_path, "NOT_EXIST"), 
&status_list));
+        CheckBasicFileStatus(status_list, /*expected_files=*/{},
+                             /*expected_dirs=*/{});
+    }
+}
+
+// --- exception
+TEST_P(FileSystemTest, TestGetNotExistFileStatus) {
+    std::string path = PathUtil::JoinPath(test_root_, RandomName());
+    ASSERT_NOK(fs_->GetFileStatus(path));
+}
+
+// --- atomic store
+TEST_P(FileSystemTest, TestAtomicStore) {
+    std::string path = PathUtil::JoinPath(test_root_, RandomName());
+    std::string content = "test_content";
+    ASSERT_OK(fs_->AtomicStore(path, content));
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(path));
+    ASSERT_TRUE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestAtomicStoreAlreadyExist) {
+    std::string file_path = CreateRandomFileInDirectory(test_root_);
+    std::string content = "test_content";
+    ASSERT_NOK(fs_->AtomicStore(file_path, content));
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+    ASSERT_TRUE(is_exist);
+}
+
+INSTANTIATE_TEST_SUITE_P(UseLocal, FileSystemTest, ::testing::Values("local" 
/*, "jindo"*/));
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/fs/resolving_file_system_test.cpp 
b/src/paimon/common/fs/resolving_file_system_test.cpp
new file mode 100644
index 0000000..c459c00
--- /dev/null
+++ b/src/paimon/common/fs/resolving_file_system_test.cpp
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/fs/resolving_file_system.h"
+
+#include <future>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/executor.h"
+#include "paimon/factories/factory_creator.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/fs/local/local_file_system_factory.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class GmockFileSystem : public FileSystem {
+ public:
+    GmockFileSystem(const std::string& identifier, const std::string& 
authority)
+        : identifier_(identifier), authority_(authority) {}
+
+    MOCK_METHOD(Result<std::unique_ptr<InputStream>>, Open, (const 
std::string& path),
+                (const, override));
+    MOCK_METHOD(Result<std::unique_ptr<OutputStream>>, Create,
+                (const std::string& path, bool overwrite), (const, override));
+    MOCK_METHOD(Status, Mkdirs, (const std::string& path), (const, override));
+    MOCK_METHOD(Status, Rename, (const std::string& src, const std::string& 
dst),
+                (const, override));
+    MOCK_METHOD(Status, Delete, (const std::string& path, bool recursive), 
(const, override));
+    MOCK_METHOD(Result<std::unique_ptr<FileStatus>>, GetFileStatus, (const 
std::string& path),
+                (const, override));
+    MOCK_METHOD(Status, ListDir,
+                (const std::string& directory,
+                 std::vector<std::unique_ptr<BasicFileStatus>>* 
file_status_list),
+                (const, override));
+    MOCK_METHOD(Status, ListFileStatus,
+                (const std::string& path,
+                 std::vector<std::unique_ptr<FileStatus>>* file_status_list),
+                (const, override));
+    MOCK_METHOD(Result<bool>, Exists, (const std::string& path), (const, 
override));
+
+    const std::string& GetIdentifier() const {
+        return identifier_;
+    }
+    const std::string& GetAuthority() const {
+        return authority_;
+    }
+
+ private:
+    std::string identifier_;
+    std::string authority_;
+};
+
+class GmockFileSystemFactory : public FileSystemFactory {
+ public:
+    explicit GmockFileSystemFactory(const std::string& identifier) : 
identifier_(identifier) {}
+
+    const char* Identifier() const override {
+        return identifier_.c_str();
+    }
+
+    Result<std::unique_ptr<FileSystem>> Create(
+        const std::string& path_str,
+        const std::map<std::string, std::string>& options) const override {
+        PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_str));
+        return 
std::make_unique<testing::NiceMock<GmockFileSystem>>(identifier_, 
path.authority);
+    }
+
+ private:
+    std::string identifier_;
+};
+
+class ResolvingFileSystemTest : public testing::Test {
+ public:
+    void SetUp() override {
+        auto factory_creator = FactoryCreator::GetInstance();
+
+        // Register mock factories
+        factory_creator->Register("mock_local", new 
GmockFileSystemFactory("mock_local"));
+        factory_creator->Register("mock_hdfs", new 
GmockFileSystemFactory("mock_hdfs"));
+        factory_creator->Register("mock_oss", new 
GmockFileSystemFactory("mock_oss"));
+    }
+
+    void TearDown() override {
+        auto factory_creator = FactoryCreator::GetInstance();
+        factory_creator->TEST_Unregister("mock_local");
+        factory_creator->TEST_Unregister("mock_hdfs");
+        factory_creator->TEST_Unregister("mock_oss");
+    }
+};
+
+TEST_F(ResolvingFileSystemTest, GetRealFileSystemWithFileScheme) {
+    std::map<std::string, std::string> scheme_mapping = {
+        {"file", "mock_local"}, {"hdfs", "mock_hdfs"}, {"oss", "mock_oss"}};
+    ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local", 
/*options=*/{});
+
+    auto check_result = [&](const std::string& uri, const std::string& 
expected_identifier) {
+        ASSERT_OK_AND_ASSIGN(auto real_fs, 
resolving_fs.GetRealFileSystem(uri));
+        auto casted_fs = dynamic_cast<GmockFileSystem*>(real_fs.get());
+        ASSERT_TRUE(casted_fs);
+        ASSERT_EQ(casted_fs->GetIdentifier(), expected_identifier);
+    };
+    check_result("file:///tmp/test.txt", "mock_local");
+    check_result("hdfs://node:9000/tmp/test.txt", "mock_hdfs");
+    check_result("oss://bucket/tmp/test.txt", "mock_oss");
+    // Empty scheme should map to "file" scheme
+    check_result("/tmp/test.txt", "mock_local");
+    check_result("tmp/test.txt", "mock_local");
+    // Unknown scheme should use default file system
+    check_result("s3://bucket/tmp/test.txt", "mock_local");
+}
+
+TEST_F(ResolvingFileSystemTest, EmptySchemeMapping) {
+    std::map<std::string, std::string> empty_scheme_mapping;
+    ResolvingFileSystem resolving_fs(empty_scheme_mapping, "mock_local", 
/*options=*/{});
+
+    // Should use default file system for all schemes
+    ASSERT_OK_AND_ASSIGN(auto real_fs, 
resolving_fs.GetRealFileSystem("any_scheme://path/test"));
+    auto casted_fs = dynamic_cast<GmockFileSystem*>(real_fs.get());
+    ASSERT_TRUE(casted_fs);
+    ASSERT_EQ(casted_fs->GetIdentifier(), "mock_local");
+}
+
+TEST_F(ResolvingFileSystemTest, FileSystemCaching) {
+    std::map<std::string, std::string> scheme_mapping = {{"file", 
"mock_local"}};
+    ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local", 
/*options=*/{});
+
+    // First call should create and cache the file system
+    ASSERT_OK_AND_ASSIGN(auto fs1, 
resolving_fs.GetRealFileSystem("file:///tmp/test1.txt"));
+    // Second call with same scheme should use cached file system
+    ASSERT_OK_AND_ASSIGN(auto fs2, 
resolving_fs.GetRealFileSystem("file:///tmp/test2.txt"));
+    ASSERT_EQ(fs1, fs2);
+}
+
+TEST_F(ResolvingFileSystemTest, DifferentAuthoritiesCacheSeparately) {
+    std::map<std::string, std::string> scheme_mapping = {{"hdfs", 
"mock_hdfs"}};
+    ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local", 
/*options=*/{});
+
+    // Different authorities should create separate file system instances
+    ASSERT_OK_AND_ASSIGN(auto fs1, 
resolving_fs.GetRealFileSystem("hdfs://node1:9000/test.txt"));
+    ASSERT_OK_AND_ASSIGN(auto fs2, 
resolving_fs.GetRealFileSystem("hdfs://node2:9000/test.txt"));
+    ASSERT_NE(fs1, fs2);
+}
+
+TEST_F(ResolvingFileSystemTest, ThreadSafety) {
+    std::map<std::string, std::string> scheme_mapping = {{"file", 
"mock_local"}};
+    ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local", 
/*options=*/{});
+    const int32_t num_threads = 100;
+    const int32_t operations_per_thread = 10;
+
+    auto executor = CreateDefaultExecutor();
+    std::vector<std::future<std::vector<std::shared_ptr<FileSystem>>>> futures;
+    for (int32_t i = 0; i < num_threads; ++i) {
+        futures.push_back(
+            Via(executor.get(), [&resolving_fs, i]() -> 
std::vector<std::shared_ptr<FileSystem>> {
+                std::vector<std::shared_ptr<FileSystem>> fs_list;
+                for (int32_t j = 0; j < operations_per_thread; ++j) {
+                    std::string path = "file:///tmp/thread_" + 
std::to_string(i) + "_" +
+                                       std::to_string(j) + ".txt";
+                    EXPECT_OK_AND_ASSIGN(auto fs, 
resolving_fs.GetRealFileSystem(path));
+                    EXPECT_TRUE(fs);
+                    fs_list.push_back(fs);
+                }
+                return fs_list;
+            }));
+    }
+
+    // Verify that all threads should get the same file system instance
+    auto results = CollectAll(futures);
+    auto fs0 = results[0][0];
+    for (const auto& fs_list : results) {
+        for (const auto& fs : fs_list) {
+            ASSERT_EQ(fs, fs0);
+        }
+    }
+}
+
+TEST_F(ResolvingFileSystemTest, ThreadSafety2) {
+    std::map<std::string, std::string> scheme_mapping = {{"oss", "mock_oss"}};
+    ResolvingFileSystem resolving_fs(scheme_mapping, "mock_oss", 
/*options=*/{});
+    const int32_t num_threads = 100;
+    const int32_t operations_per_thread = 10;
+
+    auto executor = CreateDefaultExecutor();
+    std::vector<std::future<std::vector<std::shared_ptr<FileSystem>>>> futures;
+    for (int32_t i = 0; i < num_threads; ++i) {
+        futures.push_back(
+            Via(executor.get(), [&resolving_fs, i]() -> 
std::vector<std::shared_ptr<FileSystem>> {
+                std::vector<std::shared_ptr<FileSystem>> fs_list;
+                for (int32_t j = 0; j < operations_per_thread; ++j) {
+                    std::string path =
+                        "oss://bucket_" + std::to_string(i) + "/" + 
std::to_string(j) + ".txt";
+                    EXPECT_OK_AND_ASSIGN(auto fs, 
resolving_fs.GetRealFileSystem(path));
+                    EXPECT_TRUE(fs);
+                    fs_list.push_back(fs);
+                }
+                return fs_list;
+            }));
+    }
+
+    auto results = CollectAll(futures);
+    for (size_t i = 0; i < results.size(); ++i) {
+        for (size_t j = 0; j < results[i].size(); ++j) {
+            auto casted_fs = 
dynamic_cast<GmockFileSystem*>(results[i][j].get());
+            ASSERT_TRUE(casted_fs);
+            ASSERT_EQ(casted_fs->GetAuthority(), "bucket_" + 
std::to_string(i));
+        }
+    }
+}
+
+}  // namespace paimon::test

Reply via email to