This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 54aee69 feat(fs): introduce file system tests (#42)
54aee69 is described below
commit 54aee69589e6677bf7086a2a4c9ea94cf8368700
Author: Zhang Jiawei <[email protected]>
AuthorDate: Tue Jun 2 18:15:13 2026 +0800
feat(fs): introduce file system tests (#42)
---
src/paimon/common/fs/external_path_provider.h | 74 +
.../common/fs/external_path_provider_test.cpp | 68 +
src/paimon/common/fs/file_system_test.cpp | 1407 ++++++++++++++++++++
.../common/fs/resolving_file_system_test.cpp | 235 ++++
4 files changed, 1784 insertions(+)
diff --git a/src/paimon/common/fs/external_path_provider.h
b/src/paimon/common/fs/external_path_provider.h
new file mode 100644
index 0000000..568d3ac
--- /dev/null
+++ b/src/paimon/common/fs/external_path_provider.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <random>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+/// Provider for external data paths.
+class ExternalPathProvider {
+ public:
+ static Result<std::unique_ptr<ExternalPathProvider>> Create(
+ const std::vector<std::string>& external_table_paths,
+ const std::string& relative_bucket_path) {
+ if (external_table_paths.empty()) {
+ return Status::Invalid("external table paths cannot be empty");
+ }
+ return std::unique_ptr<ExternalPathProvider>(
+ new ExternalPathProvider(external_table_paths,
relative_bucket_path));
+ }
+
+ /// Get the next external data path.
+ ///
+ /// @return the next external data path
+ std::string GetNextExternalDataPath(const std::string& file_name) {
+ position_++;
+ if (position_ == external_table_paths_.size()) {
+ position_ = 0;
+ }
+ return PathUtil::JoinPath(
+ PathUtil::JoinPath(external_table_paths_[position_],
relative_bucket_path_), file_name);
+ }
+
+ private:
+ ExternalPathProvider(const std::vector<std::string>& external_table_paths,
+ const std::string& relative_bucket_path)
+ : external_table_paths_(external_table_paths),
relative_bucket_path_(relative_bucket_path) {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<> dist(0, external_table_paths_.size() -
1);
+ position_ = dist(gen);
+ }
+
+ private:
+ std::vector<std::string> external_table_paths_;
+ std::string relative_bucket_path_;
+ size_t position_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/fs/external_path_provider_test.cpp
b/src/paimon/common/fs/external_path_provider_test.cpp
new file mode 100644
index 0000000..6893bfb
--- /dev/null
+++ b/src/paimon/common/fs/external_path_provider_test.cpp
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/fs/external_path_provider.h"
+
+#include <set>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(ExternalPathProviderTest, TestInvalidExternalDataPath) {
+ std::vector<std::string> external_table_paths;
+ std::string relative_bucket_path = "p0=1/p1=0/bucket-0";
+
+ auto provider = ExternalPathProvider::Create(external_table_paths,
relative_bucket_path);
+ ASSERT_NOK_WITH_MSG(provider.status(), "external table paths cannot be
empty");
+}
+
+TEST(ExternalPathProviderTest, TestGetNextExternalDataPath) {
+ std::vector<std::string> external_table_paths;
+ external_table_paths.emplace_back("/tmp/external_path/");
+ std::string relative_bucket_path = "p0=1/p1=0/bucket-0";
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<ExternalPathProvider> provider,
+ ExternalPathProvider::Create(external_table_paths,
relative_bucket_path));
+ ASSERT_EQ(provider->GetNextExternalDataPath("file.orc"),
+ "/tmp/external_path/p0=1/p1=0/bucket-0/file.orc");
+}
+
+TEST(ExternalPathProviderTest, TestGetNextExternalDataPath2) {
+ std::vector<std::string> external_table_paths;
+ external_table_paths.emplace_back("/tmp/external_path_a/");
+ external_table_paths.emplace_back("/tmp/external_path_b/");
+ external_table_paths.emplace_back("/tmp/external_path_c/");
+ std::string relative_bucket_path = "p0=1/p1=0/bucket-0";
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<ExternalPathProvider> provider,
+ ExternalPathProvider::Create(external_table_paths,
relative_bucket_path));
+
+ std::set<std::string> result_data_paths;
+ result_data_paths.insert(provider->GetNextExternalDataPath("file.orc"));
+ result_data_paths.insert(provider->GetNextExternalDataPath("file.orc"));
+ result_data_paths.insert(provider->GetNextExternalDataPath("file.orc"));
+
+ ASSERT_EQ(result_data_paths, std::set<std::string>({
+
"/tmp/external_path_a/p0=1/p1=0/bucket-0/file.orc",
+
"/tmp/external_path_b/p0=1/p1=0/bucket-0/file.orc",
+
"/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc",
+ }));
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/fs/file_system_test.cpp
b/src/paimon/common/fs/file_system_test.cpp
new file mode 100644
index 0000000..815b983
--- /dev/null
+++ b/src/paimon/common/fs/file_system_test.cpp
@@ -0,0 +1,1407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/fs/file_system.h"
+
+#include <cassert>
+#include <cstdlib>
+#include <future>
+#include <map>
+#include <set>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/uuid.h"
+#include "paimon/executor.h"
+#include "paimon/factories/factory_creator.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class FileSystemTest : public ::testing::Test, public
::testing::WithParamInterface<std::string> {
+ public:
+ void SetUp() override {
+ std::string file_system = GetParam();
+ dir_ = paimon::test::UniqueTestDirectory::Create(file_system);
+ ASSERT_TRUE(dir_);
+ test_root_ = dir_->Str();
+ fs_ = dir_->GetFileSystem();
+ }
+
+ void TearDown() override {
+ dir_.reset();
+ fs_.reset();
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+ /// Creates a random string with a length within the given interval. The
string contains
+ /// only characters that can be represented as a single code point.
+ ///
+ /// @param min_length The minimum string length.
+ /// @param max_length The maximum string length (inclusive).
+ /// @param min_value The minimum character value to occur.
+ /// @param max_value The maximum character value to occur.
+ /// @return A random String.
+ static std::string GetRandomString(int32_t min_length, int32_t max_length,
char min_value,
+ char max_value) {
+ int32_t len = std::rand() % (max_length - min_length + 1) + min_length;
+
+ std::vector<char> data;
+ data.resize(len);
+ int32_t diff = max_value - min_value + 1;
+
+ for (int32_t i = 0; i < len; i++) {
+ data[i] = static_cast<char>(std::rand() % diff + min_value);
+ }
+ return std::string(data.data(), data.size());
+ }
+
+ static std::string RandomName() {
+ return GetRandomString(16, 16, 'a', 'z');
+ }
+
+ std::string MakeDir(const std::string& test_root) {
+ EXPECT_OK_AND_ASSIGN(bool exist, fs_->Exists(test_root));
+ if (!exist) {
+ EXPECT_OK(fs_->Mkdirs(test_root));
+ }
+ return test_root;
+ }
+
+ void CreateFile(const std::string& file) {
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<OutputStream> out,
+ fs_->Create(file, /*overwrite=*/true));
+ std::string input = "paimon";
+ char chars[8] = {1, 2, 3, 4, 5, 6, 7, 8};
+ ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(chars, input.size()));
+ ASSERT_EQ(size, input.size());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ }
+
+ std::string CreateRandomFileInDirectory(const std::string& test_root) {
+ std::string directory = MakeDir(test_root);
+ std::string file_path = PathUtil::JoinPath(directory, RandomName());
+ CreateFile(file_path);
+ return file_path;
+ }
+
+ std::string RemoveLastSlashInPath(const std::string& path) const {
+ std::string new_path = path;
+ PathUtil::TrimLastDelim(&new_path);
+ return new_path;
+ }
+
+ void CheckFileStatus(const std::vector<std::unique_ptr<FileStatus>>&
actual_statuses,
+ const std::set<std::string>& expected_files,
+ const std::set<std::string>& expected_dirs) const {
+ ASSERT_EQ(actual_statuses.size(), expected_files.size() +
expected_dirs.size());
+ std::set<std::string> actual_files;
+ std::set<std::string> actual_dirs;
+ for (const auto& file_status : actual_statuses) {
+ if (file_status->IsDir()) {
+
actual_dirs.insert(RemoveLastSlashInPath(file_status->GetPath()));
+ } else {
+ actual_files.insert(file_status->GetPath());
+ ASSERT_GT(file_status->GetLen(), 0);
+ int64_t modification_time = file_status->GetModificationTime();
+ ASSERT_GT(modification_time, 10000000000L); //
MIN_VALID_FILE_MODIFICATION_MS
+ ASSERT_LT(modification_time, 10000000000000L); //
MAX_VALID_FILE_MODIFICATION_MS
+ }
+ }
+ std::set<std::string> normalized_expected_dirs;
+ for (const auto& path : expected_dirs) {
+ normalized_expected_dirs.insert(RemoveLastSlashInPath(path));
+ }
+ ASSERT_EQ(actual_files, expected_files);
+ ASSERT_EQ(actual_dirs, normalized_expected_dirs);
+ }
+
+ void CheckBasicFileStatus(const
std::vector<std::unique_ptr<BasicFileStatus>>& actual_statuses,
+ const std::set<std::string>& expected_files,
+ const std::set<std::string>& expected_dirs)
const {
+ ASSERT_EQ(actual_statuses.size(), expected_files.size() +
expected_dirs.size());
+ std::set<std::string> actual_files;
+ std::set<std::string> actual_dirs;
+ for (const auto& file_status : actual_statuses) {
+ if (file_status->IsDir()) {
+
actual_dirs.insert(RemoveLastSlashInPath(file_status->GetPath()));
+ } else {
+ actual_files.insert(file_status->GetPath());
+ }
+ }
+ std::set<std::string> normalized_expected_dirs;
+ for (const auto& path : expected_dirs) {
+ normalized_expected_dirs.insert(RemoveLastSlashInPath(path));
+ }
+ ASSERT_EQ(actual_files, expected_files);
+ ASSERT_EQ(actual_dirs, normalized_expected_dirs);
+ }
+
+ std::string GetTestDir() const {
+ std::string file_system = GetParam();
+ if (file_system == "local") {
+ return paimon::test::GetDataDir();
+ } else if (file_system == "jindo") {
+ return "oss://paimon-unittest/test_data/";
+ }
+ assert(false);
+ return "";
+ }
+
+ private:
+ std::shared_ptr<FileSystem> fs_;
+ std::unique_ptr<UniqueTestDirectory> dir_;
+ std::string test_root_;
+};
+
+TEST(FileSystemStaticTest, TestNoneFileSystemFactory) {
+ std::map<std::string, std::string> fs_options;
+ Result<std::unique_ptr<FileSystem>> fs =
+ FileSystemFactory::Get("not exist", "/tmp", fs_options);
+ ASSERT_TRUE(fs.status().IsInvalid());
+}
+
+TEST_P(FileSystemTest, TestFactoryCreator) {
+ std::vector<std::string> factory_registered_type =
+ FactoryCreator::GetInstance()->GetRegisteredType();
+
+ auto test_registered = [&](const std::string& identifier) {
+ bool is_exist = false;
+ for (auto registered_type : factory_registered_type) {
+ if (registered_type == identifier) {
+ is_exist = true;
+ }
+ }
+ ASSERT_TRUE(is_exist);
+ };
+ test_registered(GetParam());
+}
+
+// --- create
+TEST_P(FileSystemTest, TestCreate) {
+ std::string path = PathUtil::JoinPath(test_root_, "/test_file");
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<OutputStream> out, fs_->Create(path,
/*overwrite=*/true));
+ ASSERT_TRUE(out);
+ std::string input = "paimon";
+ ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(input.data(), input.size()));
+ ASSERT_EQ(size, input.size());
+ ASSERT_OK(out->Close());
+
+ ASSERT_NOK_WITH_MSG(fs_->Create(path, /*overwrite=*/false), "already
exists");
+}
+
+// --- write&read
+TEST_P(FileSystemTest, TestSimpleWriteAndRead) {
+ std::string content = "abcdefghijk";
+ std::string file_path = test_root_ + "/file.data";
+ // write process
+ ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path,
/*overwrite=*/true));
+ ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(),
content.size()));
+ ASSERT_EQ(write_len, content.size());
+
+ ASSERT_OK(out_stream->Flush());
+ ASSERT_OK_AND_ASSIGN(int64_t pos, out_stream->GetPos());
+ ASSERT_EQ(pos, content.size());
+
+ ASSERT_OK_AND_ASSIGN(std::string uri, out_stream->GetUri());
+ ASSERT_EQ(uri, file_path);
+ ASSERT_OK(out_stream->Close());
+
+ // read process
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+ ASSERT_EQ(pos, 0);
+
+ // read from cur pos
+ std::string read_content(content.size(), '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t read_len,
+ in_stream->Read(read_content.data(),
read_content.size()));
+ ASSERT_EQ(read_len, read_content.size());
+ ASSERT_EQ(content, read_content);
+
+ // read from offset
+ ASSERT_OK_AND_ASSIGN(read_len, in_stream->Read(read_content.data(),
read_content.size(), 0));
+ ASSERT_EQ(read_len, read_content.size());
+ ASSERT_EQ(content, read_content);
+
+ ASSERT_OK_AND_ASSIGN(uri, in_stream->GetUri());
+ ASSERT_EQ(uri, file_path);
+ ASSERT_OK_AND_ASSIGN(uint64_t file_len, in_stream->Length());
+ ASSERT_EQ(file_len, content.size());
+
+ ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+ ASSERT_EQ(pos, content.size());
+ ASSERT_OK(in_stream->Close());
+}
+
+TEST_P(FileSystemTest, TestWriteMultipleTimes) {
+ std::vector<std::string> content_vec = {"abc", "defg", "hi", "j", "k"};
+ std::string content = "abcdefghijk";
+ std::string file_path = test_root_ + "/file.data";
+ // write process
+ ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path,
/*overwrite=*/true));
+ for (const auto& str : content_vec) {
+ ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(str.data(),
str.size()));
+ ASSERT_EQ(write_len, str.size());
+ }
+ ASSERT_OK(out_stream->Flush());
+ ASSERT_OK_AND_ASSIGN(int64_t pos, out_stream->GetPos());
+ ASSERT_EQ(pos, content.size());
+ ASSERT_OK(out_stream->Close());
+
+ // read process
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ std::string read_content(content.size(), '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t read_len,
+ in_stream->Read(read_content.data(),
read_content.size()));
+ ASSERT_EQ(read_len, read_content.size());
+ ASSERT_EQ(content, read_content);
+}
+
+TEST_P(FileSystemTest, TestWriteInNotExistDir) {
+ std::string file_path = test_root_ + "/no_exist/file.data";
+ // write process
+ std::string content = "abcdefghijk";
+ ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path,
/*overwrite=*/true));
+ ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len,
+ out_stream->Write(content.data(), content.size()));
+ ASSERT_OK(out_stream->Flush());
+ ASSERT_OK(out_stream->Close());
+
+ // read process
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ std::string read_content(content.size(), '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t read_len,
+ in_stream->Read(read_content.data(),
read_content.size()));
+ ASSERT_EQ(read_len, read_content.size());
+ ASSERT_EQ(content, read_content);
+
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(test_root_ +
"/no_exist/"));
+ ASSERT_TRUE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestWriteEmptyFile) {
+ std::string file_path = test_root_ + "/file.data";
+ // write process
+ std::string content = "";
+ ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path,
/*overwrite=*/true));
+ ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(),
content.size()));
+ ASSERT_EQ(write_len, 0);
+ ASSERT_OK(out_stream->Flush());
+ ASSERT_OK(out_stream->Close());
+
+ // get file status
+ ASSERT_OK_AND_ASSIGN(auto st, fs_->GetFileStatus(file_path));
+ ASSERT_EQ(st->GetPath(), file_path);
+ ASSERT_FALSE(st->IsDir());
+ ASSERT_EQ(st->GetLen(), 0);
+ auto modification_time = st->GetModificationTime();
+ ASSERT_GT(modification_time, 10000000000L);
+ ASSERT_LT(modification_time, 10000000000000L);
+
+ // read process
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ std::string read_content(content.size(), '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t read_len,
+ in_stream->Read(read_content.data(),
read_content.size()));
+ ASSERT_EQ(read_len, read_content.size());
+ ASSERT_EQ(content, read_content);
+}
+
+TEST_P(FileSystemTest, TestWriteWithOverwrite) {
+ std::string content = "abcdefghijk";
+ std::string file_path = test_root_ + "/file.data";
+ // write process
+ ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path,
/*overwrite=*/true));
+ ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(),
content.size()));
+ ASSERT_EQ(write_len, content.size());
+ ASSERT_OK(out_stream->Flush());
+ ASSERT_OK(out_stream->Close());
+
+ // write file which already exist
+ {
+ std::string new_content = "helloworld";
+ ASSERT_OK_AND_ASSIGN(auto out_stream2, fs_->Create(file_path,
/*overwrite=*/true));
+ ASSERT_OK_AND_ASSIGN(write_len, out_stream2->Write(new_content.data(),
new_content.size()));
+ ASSERT_EQ(write_len, new_content.size());
+ ASSERT_OK(out_stream2->Flush());
+ ASSERT_OK(out_stream2->Close());
+
+ // read process
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ std::string read_content(new_content.size(), '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t read_len,
+ in_stream->Read(read_content.data(),
read_content.size()));
+ ASSERT_EQ(read_len, read_content.size());
+ ASSERT_EQ(new_content, read_content);
+ }
+ {
+ // test file exist and overwrite = false
+ ASSERT_NOK_WITH_MSG(fs_->Create(file_path, /*overwrite=*/false), "do
not allow overwrite");
+ }
+}
+
+TEST_P(FileSystemTest, TestAsyncRead) {
+ std::string content = "abcdefghijk";
+ std::string file_path = test_root_ + "/file.data";
+ // write process
+ ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path,
/*overwrite=*/true));
+ ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len,
+ out_stream->Write(content.data(), content.size()));
+ ASSERT_OK(out_stream->Flush());
+ ASSERT_OK(out_stream->Close());
+
+ // read process
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ std::string read_content(content.size(), '\0');
+ bool read_finished = false;
+ std::promise<int32_t> promise;
+ std::future<int32_t> future = promise.get_future();
+ auto callback = [&](Status status) {
+ EXPECT_OK(status);
+ if (status.ok()) {
+ read_finished = true;
+ promise.set_value(10);
+ } else {
+ read_finished = false;
+ promise.set_value(20);
+ }
+ };
+ in_stream->ReadAsync(read_content.data(), read_content.size(),
/*offset=*/0, callback);
+ ASSERT_EQ(future.get(), 10);
+ ASSERT_TRUE(read_finished);
+ ASSERT_EQ(content, read_content);
+ ASSERT_OK(in_stream->Close());
+}
+
+TEST_P(FileSystemTest, TestInvalidRead) {
+ std::string content = "abcdefghijk";
+ std::string file_path = test_root_ + "/file.data";
+ // write process
+ ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path,
/*overwrite=*/true));
+ ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len,
+ out_stream->Write(content.data(), content.size()));
+ ASSERT_OK(out_stream->Flush());
+ ASSERT_OK(out_stream->Close());
+ {
+ // seek to end, then read
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ ASSERT_OK(in_stream->Seek(/*offset=*/11, SeekOrigin::FS_SEEK_SET));
+ ASSERT_OK_AND_ASSIGN(auto pos, in_stream->GetPos());
+ ASSERT_EQ(pos, 11);
+ // read from cur pos
+ std::string read_content(3, '\0');
+ ASSERT_NOK(in_stream->Read(read_content.data(), read_content.size()));
+ ASSERT_OK_AND_ASSIGN(size_t actual_read,
in_stream->Read(read_content.data(), 0));
+ ASSERT_EQ(actual_read, 0);
+ }
+ {
+ // read invalid bulk data
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ std::string read_content(20, '\0');
+ ASSERT_NOK(in_stream->Read(read_content.data(), read_content.size()));
+ ASSERT_OK_AND_ASSIGN(auto pos, in_stream->GetPos());
+ ASSERT_EQ(pos, 11);
+ }
+ {
+ // read invalid with oversize offset
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ std::string read_content(4, '\0');
+ ASSERT_NOK(in_stream->Read(read_content.data(), read_content.size(),
/*offset=*/20));
+ }
+}
+
+TEST_P(FileSystemTest, TestInvalidAsyncRead) {
+ std::string content = "abcdefghijk";
+ std::string file_path = test_root_ + "/file.data";
+ // write process
+ ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path,
/*overwrite=*/true));
+ ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t write_len,
+ out_stream->Write(content.data(), content.size()));
+ ASSERT_OK(out_stream->Flush());
+ ASSERT_OK(out_stream->Close());
+
+ {
+ // test read overflow
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ std::string read_content(20, '\0');
+ bool read_finished = false;
+ std::promise<int> promise;
+ std::future<int> future = promise.get_future();
+ auto callback = [&](Status status) {
+ EXPECT_NOK(status);
+ if (status.ok()) {
+ read_finished = true;
+ promise.set_value(10);
+ } else {
+ read_finished = false;
+ promise.set_value(20);
+ }
+ };
+ // invalid async read
+ in_stream->ReadAsync(read_content.data(), read_content.size(),
/*offset=*/0, callback);
+ ASSERT_EQ(future.get(), 20);
+ ASSERT_FALSE(read_finished);
+ ASSERT_NE(read_content, content);
+ ASSERT_OK(in_stream->Close());
+ }
+ {
+ // test read with invalid offset
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ std::string read_content(content.size(), '\0');
+ bool read_finished = false;
+ std::promise<int> promise;
+ std::future<int> future = promise.get_future();
+ auto callback = [&](Status status) {
+ EXPECT_NOK(status);
+ if (status.ok()) {
+ read_finished = true;
+ promise.set_value(10);
+ } else {
+ read_finished = false;
+ promise.set_value(20);
+ }
+ };
+ // invalid async read
+ in_stream->ReadAsync(read_content.data(), read_content.size(),
/*offset=*/20, callback);
+ ASSERT_EQ(future.get(), 20);
+ ASSERT_FALSE(read_finished);
+ ASSERT_NE(read_content, content);
+ ASSERT_OK(in_stream->Close());
+ }
+}
+
+TEST_P(FileSystemTest, TestReadAndWriteAndAtomicStoreFile) {
+ std::string file_path = test_root_ + "/file.data";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_FALSE(is_exist);
+
+ std::string content = "content";
+ ASSERT_OK(fs_->AtomicStore(file_path, content));
+ std::string read_content;
+ ASSERT_OK(fs_->ReadFile(file_path, &read_content));
+ ASSERT_EQ(read_content, content);
+
+ std::string new_content = "hello world";
+ ASSERT_OK(fs_->WriteFile(file_path, new_content, /*overwrite=*/true));
+ ASSERT_OK(fs_->ReadFile(file_path, &read_content));
+ ASSERT_EQ(read_content, new_content);
+
+ ASSERT_NOK_WITH_MSG(fs_->WriteFile(file_path, content,
/*overwrite=*/false),
+ "do not allow overwrite");
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+
+ ASSERT_NOK_WITH_MSG(fs_->AtomicStore(file_path, content), "dst file
already exist");
+ ASSERT_OK(fs_->ReadFile(file_path, &read_content));
+ ASSERT_EQ(read_content, new_content);
+}
+
+TEST(FileSystemStaticTest, TestIsObjectStore) {
+ ASSERT_OK_AND_ASSIGN(bool is_object_store,
FileSystem::IsObjectStore("file:///tmp/test.txt"));
+ ASSERT_FALSE(is_object_store);
+ ASSERT_OK_AND_ASSIGN(is_object_store,
FileSystem::IsObjectStore("/tmp/test.txt"));
+ ASSERT_FALSE(is_object_store);
+ ASSERT_OK_AND_ASSIGN(is_object_store,
FileSystem::IsObjectStore("dfs://tmp/test.txt"));
+ ASSERT_FALSE(is_object_store);
+ ASSERT_OK_AND_ASSIGN(is_object_store,
FileSystem::IsObjectStore("hdfs://tmp/test.txt"));
+ ASSERT_FALSE(is_object_store);
+
+ ASSERT_OK_AND_ASSIGN(is_object_store,
FileSystem::IsObjectStore("oss://bucket/test.txt"));
+ ASSERT_TRUE(is_object_store);
+ ASSERT_OK_AND_ASSIGN(is_object_store,
FileSystem::IsObjectStore("s3://bucket/test.txt"));
+ ASSERT_TRUE(is_object_store);
+}
+
+// --- seek
+TEST_P(FileSystemTest, TestSeek) {
+ std::string path = PathUtil::JoinPath(test_root_, "/test_file");
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<OutputStream> out, fs_->Create(path,
/*overwrite=*/true));
+ std::string input = "paimon";
+ ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(input.data(), input.size()));
+ ASSERT_EQ(size, input.size());
+ ASSERT_OK(out->Close());
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<InputStream> in, fs_->Open(path));
+ ASSERT_OK(in->Seek(/*offset=*/1, SeekOrigin::FS_SEEK_SET));
+ ASSERT_OK_AND_ASSIGN(int64_t pos, in->GetPos());
+ ASSERT_EQ(pos, 1);
+
+ ASSERT_OK(in->Seek(/*offset=*/1, SeekOrigin::FS_SEEK_CUR));
+ ASSERT_OK_AND_ASSIGN(int64_t pos2, in->GetPos());
+ ASSERT_EQ(pos2, 2);
+
+ ASSERT_OK(in->Seek(/*offset=*/-5, SeekOrigin::FS_SEEK_END));
+ ASSERT_OK_AND_ASSIGN(int64_t pos3, in->GetPos());
+ ASSERT_EQ(pos3, 1);
+
+ std::string read_content(3, '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t read_len, in->Read(read_content.data(),
read_content.size()));
+ ASSERT_EQ(read_len, read_content.size());
+ ASSERT_EQ("aim", read_content);
+
+ // read from offset
+ ASSERT_OK_AND_ASSIGN(read_len,
+ in->Read(read_content.data(), read_content.size(),
/*offset=*/2));
+ ASSERT_EQ(read_len, read_content.size());
+ ASSERT_EQ("imo", read_content);
+
+ ASSERT_OK_AND_ASSIGN(pos, in->GetPos());
+ ASSERT_EQ(pos, 4);
+}
+
+TEST_P(FileSystemTest, TestSeek2) {
+ std::string content = "abcdefghijk";
+ std::string file_path = test_root_ + "/file.data";
+ // write process
+ ASSERT_OK_AND_ASSIGN(auto out_stream, fs_->Create(file_path,
/*overwrite=*/true));
+ ASSERT_OK_AND_ASSIGN(int32_t write_len, out_stream->Write(content.data(),
content.size()));
+ ASSERT_EQ(write_len, content.size());
+ ASSERT_OK(out_stream->Flush());
+ ASSERT_OK(out_stream->Close());
+
+ // read process
+ ASSERT_OK_AND_ASSIGN(auto in_stream, fs_->Open(file_path));
+ ASSERT_OK_AND_ASSIGN(auto pos, in_stream->GetPos());
+ ASSERT_EQ(pos, 0);
+
+ // valid seek
+ ASSERT_OK(in_stream->Seek(/*offset=*/2, SeekOrigin::FS_SEEK_SET));
+ ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+ ASSERT_EQ(pos, 2);
+
+ ASSERT_OK(in_stream->Seek(/*offset=*/4, SeekOrigin::FS_SEEK_CUR));
+ ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+ ASSERT_EQ(pos, 6);
+
+ ASSERT_OK(in_stream->Seek(/*offset=*/-3, SeekOrigin::FS_SEEK_END));
+ ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+ ASSERT_EQ(pos, 8);
+
+ // read from cur pos
+ std::string read_content(3, '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t read_len,
+ in_stream->Read(read_content.data(),
read_content.size()));
+ ASSERT_EQ(read_len, read_content.size());
+ ASSERT_EQ("ijk", read_content);
+
+ // read from offset
+ ASSERT_OK_AND_ASSIGN(read_len,
+ in_stream->Read(read_content.data(),
read_content.size(), /*offset=*/4));
+ ASSERT_EQ(read_len, read_content.size());
+ ASSERT_EQ("efg", read_content);
+
+ ASSERT_OK_AND_ASSIGN(pos, in_stream->GetPos());
+ ASSERT_EQ(pos, 11);
+ ASSERT_OK(in_stream->Close());
+}
+
+// --- rename
+TEST_P(FileSystemTest, TestRename) {
+ std::string path = PathUtil::JoinPath(test_root_, "/test_file");
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<OutputStream> out, fs_->Create(path,
/*overwrite=*/true));
+ ASSERT_TRUE(out);
+ std::string input = "paimon";
+ ASSERT_OK_AND_ASSIGN(int32_t size, out->Write(input.data(), input.size()));
+ ASSERT_EQ(size, input.size());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<InputStream> in, fs_->Open(path));
+ ASSERT_TRUE(in);
+ char* data = new char[input.size() * 2];
+ ASSERT_OK_AND_ASSIGN(int32_t size_read, in->Read(data, input.size(),
/*offset=*/0));
+ ASSERT_EQ(size_read, input.size());
+ std::string read_data(data, input.size());
+ ASSERT_EQ(read_data, input);
+ delete[] data;
+ ASSERT_OK(in->Close());
+
+ std::string path2 = PathUtil::JoinPath(test_root_, "/test_file_renamed");
+ ASSERT_OK(fs_->Rename(path, path2));
+
+ std::string no_exist_path = PathUtil::JoinPath(test_root_,
"/no_exist_file");
+ ASSERT_NOK_WITH_MSG(
+ fs_->Rename(no_exist_path, PathUtil::JoinPath(test_root_,
"/no_exist_file_renamed")),
+ "src file not exist");
+
+ ASSERT_NOK_WITH_MSG(
+ fs_->Rename(path2, PathUtil::JoinPath(test_root_,
"/wrong_path_file_renamed/")),
+ "src file is not a dir");
+}
+
+TEST_P(FileSystemTest, TestRename2) {
+ {
+ // test rename dir
+ std::string dir_path = test_root_ + "/file_dir/";
+ std::string dir_path2 = test_root_ + "/file_dir2/";
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path2));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+ ASSERT_TRUE(is_exist);
+ }
+ {
+ // test rename itself
+ std::string dir_path = test_root_ + "/file_dir_itself/";
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path),
+ "dst file already exist");
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ }
+ {
+ // test rename from non-exist dir
+ std::string dir_path = test_root_ + "/non_exist_file_dir/";
+ std::string dir_path2 = test_root_ + "/non_exist_file_dir2/";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path2),
"src file not exist");
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+ ASSERT_FALSE(is_exist);
+ }
+ {
+ // test rename to exist dir
+ std::string dir_path = test_root_ + "/file_src_dir/";
+ std::string dir_path2 = test_root_ + "/file_dst_dir/";
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK(fs_->Mkdirs(dir_path2));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+ ASSERT_TRUE(is_exist);
+
+ ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/dir_path, /*dst=*/dir_path2),
+ "dst file already exist");
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+ ASSERT_TRUE(is_exist);
+ }
+ {
+ // test rename file
+ std::string file_path = test_root_ + "/file1/file2/file3";
+ ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+ std::string file_path2 = test_root_ + "/file1/file4";
+ ASSERT_OK(fs_->Rename(/*src=*/file_path, /*dst=*/file_path2));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+ ASSERT_FALSE(is_exist);
+
+ std::string data;
+ ASSERT_OK(fs_->ReadFile(file_path2, &data));
+ ASSERT_EQ(data, "content");
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ +
"/file1/file2/"));
+ ASSERT_TRUE(is_exist);
+ }
+ {
+ // test rename file to exist file
+ std::string file_path = test_root_ + "/file11/file12/file13";
+ ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+ std::string file_path2 = test_root_ + "/file15/file16/file13";
+ ASSERT_OK(fs_->WriteFile(file_path2, "HelloWorld",
/*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path2));
+ ASSERT_TRUE(is_exist);
+
+ ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/file_path, /*dst=*/file_path2),
+ "dst file already exist");
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+ std::string data;
+ ASSERT_OK(fs_->ReadFile(file_path2, &data));
+ ASSERT_EQ(data, "HelloWorld");
+ }
+ {
+ // test rename from non-exist file
+ std::string file_path = test_root_ + "/non_exist_file";
+ std::string file_path2 = test_root_ + "/non_exist_file2";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_NOK_WITH_MSG(fs_->Rename(/*src=*/file_path, /*dst=*/file_path2),
+ "src file not exist");
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path2));
+ ASSERT_FALSE(is_exist);
+ }
+}
+
+// --- exists
+TEST_P(FileSystemTest, TestFileExists) {
+ std::string file_path = CreateRandomFileInDirectory(test_root_);
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestFileDoesNotExist) {
+ std::string path = PathUtil::JoinPath(test_root_, RandomName());
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(path));
+ ASSERT_FALSE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestExists) {
+ std::string file_path = test_root_ + "/file.data";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_FALSE(is_exist);
+
+ ASSERT_OK(fs_->WriteFile(file_path, "/content", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+
+ std::string dir_path = test_root_ + "/file_dir/";
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+
+ ASSERT_OK(fs_->WriteFile(dir_path + "/file.data", "content",
/*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+}
+
+// --- delete
+TEST_P(FileSystemTest, TestExistingFileDeletion) {
+ auto check = [&](bool recursive) {
+ std::string file_path = CreateRandomFileInDirectory(test_root_);
+ ASSERT_OK(fs_->Delete(file_path, recursive));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_FALSE(is_exist);
+ };
+ check(true);
+ check(false);
+}
+
+TEST_P(FileSystemTest, TestNotExistingFileDeletion) {
+ auto check = [&](bool recursive) {
+ std::string path = PathUtil::JoinPath(test_root_, RandomName());
+ ASSERT_TRUE(fs_->Delete(path, recursive).IsIOError());
+ };
+ check(true);
+ check(false);
+}
+
+TEST_P(FileSystemTest, TestExistingEmptyDirectoryDeletion) {
+ auto check = [&](bool recursive) {
+ std::string path = PathUtil::JoinPath(test_root_, RandomName());
+ ASSERT_OK(fs_->Mkdirs(path));
+ ASSERT_OK(fs_->Delete(path, recursive));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(path));
+ ASSERT_FALSE(is_exist);
+ };
+ check(true);
+ check(false);
+}
+
+TEST_P(FileSystemTest, TestExistingNonEmptyDirectoryRecursiveDeletion) {
+ {
+ std::string file_path = CreateRandomFileInDirectory(test_root_);
+ ASSERT_OK(fs_->Delete(test_root_, /*recursive=*/true));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_FALSE(is_exist);
+ }
+ {
+ std::string file_path = CreateRandomFileInDirectory(test_root_);
+ ASSERT_NOK(fs_->Delete(test_root_, /*recursive=*/false));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+ }
+}
+
+TEST_P(FileSystemTest,
TestExistingNonEmptyDirectoryWithSubDirRecursiveDeletion) {
+ {
+ std::string level1_subdir_with_file = PathUtil::JoinPath(test_root_,
RandomName());
+ const std::string file_in_level1_subdir =
+ CreateRandomFileInDirectory(level1_subdir_with_file);
+ std::string level2_subdir_with_file =
+ PathUtil::JoinPath(level1_subdir_with_file, RandomName());
+ const std::string file_in_level2_subdir =
+ CreateRandomFileInDirectory(level2_subdir_with_file);
+ ASSERT_OK(fs_->Delete(test_root_, /*recursive=*/true));
+ ASSERT_FALSE(fs_->Exists(file_in_level1_subdir).value());
+ ASSERT_FALSE(fs_->Exists(level2_subdir_with_file).value());
+ ASSERT_FALSE(fs_->Exists(file_in_level2_subdir).value());
+ }
+ {
+ std::string level1_subdir_with_file = PathUtil::JoinPath(test_root_,
RandomName());
+ const std::string file_in_level1_subdir =
+ CreateRandomFileInDirectory(level1_subdir_with_file);
+ std::string level2_subdir_with_file =
+ PathUtil::JoinPath(level1_subdir_with_file, RandomName());
+ const std::string file_in_level2_subdir =
+ CreateRandomFileInDirectory(level2_subdir_with_file);
+ ASSERT_NOK(fs_->Delete(test_root_, /*recursive=*/false));
+ ASSERT_TRUE(fs_->Exists(file_in_level1_subdir).value());
+ ASSERT_TRUE(fs_->Exists(level2_subdir_with_file).value());
+ ASSERT_TRUE(fs_->Exists(file_in_level2_subdir).value());
+ }
+}
+
+TEST_P(FileSystemTest, TestDelete) {
+ {
+ std::string dir_path = test_root_ + "/file_dir/";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK(fs_->Delete(dir_path, /*recursive=*/true));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ }
+ {
+ // test recursive delete
+ std::string dir_path = test_root_ + "/file_dir1/file_dir2/file_dir3";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+
+ // rm file_dir1/file_dir2/file_dir3/
+ ASSERT_OK(fs_->Delete(dir_path, /*recursive=*/true));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ +
"/file_dir1/"));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ +
"/file_dir1/file_dir2"));
+ ASSERT_TRUE(is_exist);
+
+ // rm file_dir1/
+ ASSERT_OK(fs_->Delete(test_root_ + "/file_dir1/", /*recursive=*/true));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ +
"/file_dir1/"));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ +
"/file_dir1/file_dir2"));
+ ASSERT_FALSE(is_exist);
+ }
+ {
+ // test delete file
+ std::string file_path = test_root_ + "/file1/file2/file3";
+ ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+
+ ASSERT_OK(fs_->Delete(file_path, /*recursive=*/true));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ +
"/file1/file2/"));
+ ASSERT_TRUE(is_exist);
+
+ ASSERT_NOK_WITH_MSG(fs_->Delete(test_root_ + "/file1",
/*recursive=*/false),
+ "is not empty");
+ }
+ {
+ // test recursive delete
+ std::string file_path = test_root_ + "/file1/file2/file3";
+ ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+
+ ASSERT_OK(fs_->Delete(test_root_ + "/file1/", /*recursive=*/true));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ +
"/file1/file2/"));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ + "/file1/"));
+ ASSERT_FALSE(is_exist);
+ }
+ {
+ // test recursive delete
+ std::string dir_path = test_root_ +
"/file_recursive_dir1/file_dir2/file_dir3";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+
+ ASSERT_NOK_WITH_MSG(fs_->Delete(test_root_ + "/file_recursive_dir1",
/*recursive=*/false),
+ "is not empty");
+ }
+ {
+ // test delete non-exist file
+ std::string file_path = test_root_ + "/file_non_exist";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_NOK(fs_->Delete(file_path, /*recursive=*/false));
+ }
+}
+
+// --- mkdirs
+TEST_P(FileSystemTest, TestMkdirsReturnsTrueWhenCreatingDirectory) {
+ ASSERT_OK(fs_->Mkdirs(test_root_));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(test_root_));
+ ASSERT_TRUE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestMkdirsCreatesParentDirectories) {
+ std::string deep_path = PathUtil::JoinPath(test_root_, RandomName());
+ ASSERT_OK(fs_->Mkdirs(deep_path));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(deep_path));
+ ASSERT_TRUE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestMkdirsReturnsTrueForExistingDirectory) {
+ std::string file_path = CreateRandomFileInDirectory(test_root_);
+ ASSERT_OK(fs_->Mkdirs(test_root_));
+}
+
+TEST_P(FileSystemTest, TestMkdirsFailsForExistingFile) {
+ std::string file_path = CreateRandomFileInDirectory(test_root_);
+ auto status = fs_->Mkdirs(file_path);
+ ASSERT_TRUE(status.IsIOError());
+}
+
+TEST_P(FileSystemTest, TestMkdirsFailsWithExistingParentFile) {
+ std::string file_path = CreateRandomFileInDirectory(test_root_);
+ std::string dir_under_file = PathUtil::JoinPath(file_path, RandomName());
+ ASSERT_TRUE(fs_->Mkdirs(dir_under_file).IsIOError());
+}
+
+TEST_P(FileSystemTest, TestMkdir) {
+ ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp.txt/tmpB"));
+ ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmpA/tmpB/"));
+
+ ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp/local/f/1"));
+ ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1"));
+ ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1/f2/"));
+ ASSERT_OK(fs_->Mkdirs("/"));
+ ASSERT_NOK_WITH_MSG(fs_->Mkdirs(""), "path is an empty string.");
+}
+
+TEST_P(FileSystemTest, TestMkdir2) {
+ {
+ std::string dir_path = test_root_ + "/file_dir/";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ }
+ {
+ // test recursive mkdir
+ std::string dir_path = test_root_ + "/file_dir1/file_dir2/file_dir3";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ +
"/file_dir1/"));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(test_root_ +
"/file_dir1/file_dir2"));
+ ASSERT_TRUE(is_exist);
+ }
+}
+
+// test for create multi dir such as "/table/partition1/bucket1" and
"/table/partition1/bucket2"
+TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameNonExistParentDir) {
+ uint32_t runs_count = 10;
+ uint32_t thread_count = 10;
+ auto executor = CreateDefaultExecutor(thread_count);
+
+ for (uint32_t i = 0; i < runs_count; i++) {
+ std::string uuid;
+ ASSERT_TRUE(UUID::Generate(&uuid));
+ std::vector<std::future<void>> futures;
+ for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++)
{
+ futures.push_back(Via(executor.get(), [this, &uuid, thread_idx]()
-> void {
+ std::string dir_path =
+ PathUtil::JoinPath(test_root_, uuid + "/" +
std::to_string(thread_idx));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ }));
+ }
+ Wait(futures);
+ }
+}
+
+// test for create multi dir such as "/table/partition1" and
"/table/partition1"
+TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameName) {
+ uint32_t runs_count = 10;
+ uint32_t thread_count = 10;
+ auto executor = CreateDefaultExecutor(thread_count);
+
+ for (uint32_t i = 0; i < runs_count; i++) {
+ std::string uuid;
+ ASSERT_TRUE(UUID::Generate(&uuid));
+ std::vector<std::future<void>> futures;
+ for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++)
{
+ futures.push_back(Via(executor.get(), [this, &uuid]() -> void {
+ std::string dir_path = PathUtil::JoinPath(test_root_, uuid);
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ }));
+ }
+ Wait(futures);
+ }
+}
+
+// test for create multi dir such as "partition1" and "partition1" (relative
path)
+TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameNameWithRelativePath) {
+ uint32_t runs_count = 10;
+ uint32_t thread_count = 10;
+ auto executor = CreateDefaultExecutor(thread_count);
+
+ for (uint32_t i = 0; i < runs_count; i++) {
+ std::string uuid;
+ ASSERT_TRUE(UUID::Generate(&uuid));
+ std::vector<std::future<void>> futures;
+ for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++)
{
+ futures.push_back(Via(executor.get(), [this, &uuid]() -> void {
+ std::string dir_path = uuid;
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ }));
+ }
+ Wait(futures);
+ }
+}
+
+TEST_P(FileSystemTest, TestInvalidMkdir) {
+ {
+ // test mkdir with one exist dir
+ std::string dir_path = test_root_ + "/file_dir/";
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ }
+ {
+ // test mkdir with one exist file with same name
+ std::string file_path = test_root_ + "/file_dir/file.data";
+ ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+
+ ASSERT_NOK_WITH_MSG(fs_->Mkdirs(file_path), "already exists");
+ }
+}
+
+// --- file status
+TEST_P(FileSystemTest, TestGetFileStatus1) {
+ {
+ // test dir simple
+ std::string dir_path = test_root_ + "/file_dir/";
+ ASSERT_OK(fs_->Mkdirs(dir_path));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStatus> st,
fs_->GetFileStatus(dir_path));
+ ASSERT_EQ(RemoveLastSlashInPath(st->GetPath()),
RemoveLastSlashInPath(dir_path));
+ ASSERT_TRUE(st->IsDir());
+ auto modification_time = st->GetModificationTime();
+ ASSERT_GT(modification_time, 10000000000L);
+ ASSERT_LT(modification_time, 10000000000000L);
+
+ std::string file_path = test_root_ + "/file_dir/file.data";
+ ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+
+ // check meta in dir
+ ASSERT_OK_AND_ASSIGN(st, fs_->GetFileStatus(dir_path));
+ ASSERT_EQ(RemoveLastSlashInPath(st->GetPath()),
RemoveLastSlashInPath(dir_path));
+ ASSERT_TRUE(st->IsDir());
+ modification_time = st->GetModificationTime();
+ ASSERT_GT(modification_time, 10000000000L);
+ ASSERT_LT(modification_time, 10000000000000L);
+ }
+ {
+ // test non-exist dir
+ std::string dir_path = test_root_ + "/non_exist_file_dir/";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_NOK(fs_->GetFileStatus(dir_path));
+ }
+ {
+ // test file simple
+ std::string content = "content";
+ std::string file_path = test_root_ + "/file.data";
+ ASSERT_OK(fs_->WriteFile(file_path, content, /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStatus> st,
fs_->GetFileStatus(file_path));
+ ASSERT_EQ(st->GetPath(), file_path);
+ ASSERT_FALSE(st->IsDir());
+ ASSERT_EQ(st->GetLen(), content.size());
+ auto modification_time = st->GetModificationTime();
+ ASSERT_GT(modification_time, 10000000000L);
+ ASSERT_LT(modification_time, 10000000000000L);
+ }
+ {
+ // test non-exist file
+ std::string dir_path = test_root_ + "/non_exist_file";
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
+ ASSERT_FALSE(is_exist);
+ ASSERT_NOK(fs_->GetFileStatus(dir_path));
+ }
+}
+
+TEST_P(FileSystemTest, TestGetFileStatus2) {
+ const std::string test_path = GetTestDir() + "orc/append_09.db/append_09";
+ {
+ // input is a dir, with a trailing '/'
+ std::string dir_name = test_path + "/";
+ std::vector<std::unique_ptr<FileStatus>> status_list;
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStatus> file_status,
fs_->GetFileStatus(dir_name));
+ status_list.emplace_back(std::move(file_status));
+ CheckFileStatus(status_list, /*expected_files=*/{},
/*expected_dirs=*/{dir_name});
+ }
+ {
+ // input is a dir, without a trailing '/'
+ std::vector<std::unique_ptr<FileStatus>> status_list;
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStatus> file_status,
+ fs_->GetFileStatus(test_path));
+ status_list.emplace_back(std::move(file_status));
+ CheckFileStatus(status_list, /*expected_files=*/{},
/*expected_dirs=*/{test_path});
+ }
+ {
+ // input is a file
+ std::vector<std::unique_ptr<FileStatus>> status_list;
+ std::string file_name = PathUtil::JoinPath(test_path, "README");
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileStatus> file_status,
+ fs_->GetFileStatus(file_name));
+ status_list.emplace_back(std::move(file_status));
+ CheckFileStatus(status_list, /*expected_files=*/{file_name},
/*expected_dirs=*/{});
+ }
+ {
+ // input is not exist
+ std::vector<std::unique_ptr<FileStatus>> status_list;
+ ASSERT_NOK(fs_->GetFileStatus(PathUtil::JoinPath(test_path,
"NOT_EXIST")));
+ }
+}
+
+TEST_P(FileSystemTest, TestInvalidListFileStatus) {
+ {
+ // list non exist dir will return ok
+ std::vector<std::unique_ptr<FileStatus>> file_status_list;
+ ASSERT_OK(fs_->ListFileStatus(test_root_ + "/non-exist/",
&file_status_list));
+ ASSERT_TRUE(file_status_list.empty());
+ }
+ {
+ // test data path
+ std::string file_path = test_root_ + "/file.data";
+ ASSERT_OK(fs_->WriteFile(file_path, "hello", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+ std::vector<std::unique_ptr<FileStatus>> file_status_list;
+ ASSERT_OK(fs_->ListFileStatus(file_path, &file_status_list));
+ ASSERT_EQ(file_status_list[0]->GetPath(), file_path);
+ ASSERT_EQ(file_status_list[0]->GetLen(), 5);
+ ASSERT_FALSE(file_status_list[0]->IsDir());
+ }
+}
+
+TEST_P(FileSystemTest, TestListFileStatus1) {
+ std::string file_path = test_root_ + "/file_dir1/file.data";
+ ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+
+ std::vector<std::unique_ptr<FileStatus>> file_status_list;
+ ASSERT_OK(fs_->ListFileStatus(test_root_, &file_status_list));
+ ASSERT_EQ(file_status_list.size(), 1);
+ std::set<std::string> expected_dirs = {test_root_ + "/file_dir1/"};
+ CheckFileStatus(file_status_list, /*expected_files=*/{}, expected_dirs);
+
+ auto dir_path2 = test_root_ + "/file_dir2/";
+ ASSERT_OK(fs_->Mkdirs(dir_path2));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+ ASSERT_TRUE(is_exist);
+
+ auto dir_path3 = test_root_ + "/file_dir1/file_dir3/";
+ ASSERT_OK(fs_->Mkdirs(dir_path3));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path3));
+ ASSERT_TRUE(is_exist);
+
+ std::string file_path2 = test_root_ + "/file_dir1/second_file.data";
+ ASSERT_OK(fs_->WriteFile(file_path2, "hello!", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(file_path2));
+ ASSERT_TRUE(is_exist);
+
+ std::vector<std::unique_ptr<FileStatus>> file_status_list2;
+ ASSERT_OK(fs_->ListFileStatus(test_root_, &file_status_list2));
+ ASSERT_EQ(file_status_list2.size(), 2);
+ expected_dirs = {test_root_ + "/file_dir1/", dir_path2};
+ CheckFileStatus(file_status_list2, /*expected_files=*/{}, expected_dirs);
+
+ std::vector<std::unique_ptr<FileStatus>> file_status_list3;
+ ASSERT_OK(fs_->ListFileStatus(test_root_ + "/file_dir1/",
&file_status_list3));
+ ASSERT_EQ(file_status_list3.size(), 3);
+ expected_dirs = {dir_path3};
+ std::set<std::string> expected_files = {file_path, file_path2};
+ CheckFileStatus(file_status_list3, expected_files, expected_dirs);
+}
+
+TEST_P(FileSystemTest, TestListFileStatus2) {
+ const std::string test_path = GetTestDir() + "orc/append_09.db/append_09";
+ const std::set<std::string> expected_files =
{PathUtil::JoinPath(test_path, "README")};
+ const std::set<std::string> expected_dirs = {
+ PathUtil::JoinPath(test_path, "f1=10"),
PathUtil::JoinPath(test_path, "commit_messages"),
+ PathUtil::JoinPath(test_path, "f1=20"),
PathUtil::JoinPath(test_path, "data_splits"),
+ PathUtil::JoinPath(test_path, "manifest"),
PathUtil::JoinPath(test_path, "schema"),
+ PathUtil::JoinPath(test_path, "snapshot")};
+ {
+ // input is a dir, with a trailing '/'
+ std::vector<std::unique_ptr<FileStatus>> status_list;
+ ASSERT_OK(fs_->ListFileStatus(test_path + "/", &status_list));
+ CheckFileStatus(status_list, expected_files, expected_dirs);
+ }
+ {
+ // input is a dir, without a trailing '/'
+ std::vector<std::unique_ptr<FileStatus>> status_list;
+ ASSERT_OK(fs_->ListFileStatus(test_path, &status_list));
+ CheckFileStatus(status_list, expected_files, expected_dirs);
+ }
+ {
+ // input is a file
+ std::vector<std::unique_ptr<FileStatus>> status_list;
+ ASSERT_OK(fs_->ListFileStatus(PathUtil::JoinPath(test_path, "README"),
&status_list));
+ CheckFileStatus(status_list, expected_files, /*expected_dirs=*/{});
+ }
+ {
+ // input is not exist
+ std::vector<std::unique_ptr<FileStatus>> status_list;
+ ASSERT_OK(fs_->ListFileStatus(PathUtil::JoinPath(test_path,
"NOT_EXIST"), &status_list));
+ CheckFileStatus(status_list, /*expected_files=*/{},
/*expected_dirs=*/{});
+ }
+}
+
+TEST_P(FileSystemTest, TestListDir1) {
+ std::string file_path = test_root_ + "/file_dir1/file.data";
+ ASSERT_OK(fs_->WriteFile(file_path, "content", /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+
+ auto dir_path1 = test_root_ + "/file_dir1/";
+ std::vector<std::unique_ptr<BasicFileStatus>> file_status_list;
+ ASSERT_OK(fs_->ListDir(test_root_, &file_status_list));
+ ASSERT_EQ(file_status_list.size(), 1);
+ std::set<std::string> expected_dirs = {dir_path1};
+ CheckBasicFileStatus(file_status_list, std::set<std::string>(),
expected_dirs);
+
+ auto dir_path2 = test_root_ + "/file_dir2/";
+ ASSERT_OK(fs_->Mkdirs(dir_path2));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path2));
+ ASSERT_TRUE(is_exist);
+
+ auto dir_path3 = test_root_ + "/file_dir1/file_dir3/";
+ ASSERT_OK(fs_->Mkdirs(dir_path3));
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(dir_path3));
+ ASSERT_TRUE(is_exist);
+
+ std::vector<std::unique_ptr<BasicFileStatus>> file_status_list2;
+ ASSERT_OK(fs_->ListDir(test_root_, &file_status_list2));
+ ASSERT_EQ(file_status_list2.size(), 2);
+ expected_dirs = {dir_path1, dir_path2};
+ CheckBasicFileStatus(file_status_list2, std::set<std::string>(),
expected_dirs);
+
+ std::vector<std::unique_ptr<BasicFileStatus>> file_status_list3;
+ ASSERT_OK(fs_->ListDir(dir_path1, &file_status_list3));
+ ASSERT_EQ(file_status_list3.size(), 2);
+ std::set<std::string> expected_files = {file_path};
+ expected_dirs = {dir_path3};
+ CheckBasicFileStatus(file_status_list3, expected_files, expected_dirs);
+
+ // list non exist dir will return ok
+ std::vector<std::unique_ptr<BasicFileStatus>> file_status_list4;
+ ASSERT_OK(fs_->ListDir(test_root_ + "/non-exist/", &file_status_list4));
+ ASSERT_TRUE(file_status_list4.empty());
+
+ // list invalid path, a data file path
+ std::vector<std::unique_ptr<BasicFileStatus>> file_status_list5;
+ ASSERT_NOK_WITH_MSG(fs_->ListDir(file_path, &file_status_list5), "is not a
directory");
+}
+
+TEST_P(FileSystemTest, TestListDir2) {
+ const std::string test_path = GetTestDir() + "orc/append_09.db/append_09";
+ const std::set<std::string> expected_files =
{PathUtil::JoinPath(test_path, "README")};
+ const std::set<std::string> expected_dirs = {
+ PathUtil::JoinPath(test_path, "f1=10"),
PathUtil::JoinPath(test_path, "commit_messages"),
+ PathUtil::JoinPath(test_path, "f1=20"),
PathUtil::JoinPath(test_path, "data_splits"),
+ PathUtil::JoinPath(test_path, "manifest"),
PathUtil::JoinPath(test_path, "schema"),
+ PathUtil::JoinPath(test_path, "snapshot")};
+ {
+ // input is a dir, with a trailing '/'
+ std::vector<std::unique_ptr<BasicFileStatus>> status_list;
+ ASSERT_OK(fs_->ListDir(test_path + "/", &status_list));
+ CheckBasicFileStatus(status_list, expected_files, expected_dirs);
+ }
+ {
+ // input is a dir, without a trailing '/'
+ std::vector<std::unique_ptr<BasicFileStatus>> status_list;
+ ASSERT_OK(fs_->ListDir(test_path, &status_list));
+ CheckBasicFileStatus(status_list, expected_files, expected_dirs);
+ }
+ {
+ // input is a file
+ std::vector<std::unique_ptr<BasicFileStatus>> status_list;
+ ASSERT_NOK_WITH_MSG(fs_->ListDir(PathUtil::JoinPath(test_path,
"README"), &status_list),
+ "file " + PathUtil::JoinPath(test_path, "README") +
+ " already exists and is not a directory");
+ }
+ {
+ // input is not exist
+ std::vector<std::unique_ptr<BasicFileStatus>> status_list;
+ ASSERT_OK(fs_->ListDir(PathUtil::JoinPath(test_path, "NOT_EXIST"),
&status_list));
+ CheckBasicFileStatus(status_list, /*expected_files=*/{},
+ /*expected_dirs=*/{});
+ }
+}
+
+// --- exception
+TEST_P(FileSystemTest, TestGetNotExistFileStatus) {
+ std::string path = PathUtil::JoinPath(test_root_, RandomName());
+ ASSERT_NOK(fs_->GetFileStatus(path));
+}
+
+// --- atomic store
+TEST_P(FileSystemTest, TestAtomicStore) {
+ std::string path = PathUtil::JoinPath(test_root_, RandomName());
+ std::string content = "test_content";
+ ASSERT_OK(fs_->AtomicStore(path, content));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(path));
+ ASSERT_TRUE(is_exist);
+}
+
+TEST_P(FileSystemTest, TestAtomicStoreAlreadyExist) {
+ std::string file_path = CreateRandomFileInDirectory(test_root_);
+ std::string content = "test_content";
+ ASSERT_NOK(fs_->AtomicStore(file_path, content));
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(file_path));
+ ASSERT_TRUE(is_exist);
+}
+
+INSTANTIATE_TEST_SUITE_P(UseLocal, FileSystemTest, ::testing::Values("local"
/*, "jindo"*/));
+
+} // namespace paimon::test
diff --git a/src/paimon/common/fs/resolving_file_system_test.cpp
b/src/paimon/common/fs/resolving_file_system_test.cpp
new file mode 100644
index 0000000..c459c00
--- /dev/null
+++ b/src/paimon/common/fs/resolving_file_system_test.cpp
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/fs/resolving_file_system.h"
+
+#include <future>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/executor.h"
+#include "paimon/factories/factory_creator.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/fs/local/local_file_system_factory.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class GmockFileSystem : public FileSystem {
+ public:
+ GmockFileSystem(const std::string& identifier, const std::string&
authority)
+ : identifier_(identifier), authority_(authority) {}
+
+ MOCK_METHOD(Result<std::unique_ptr<InputStream>>, Open, (const
std::string& path),
+ (const, override));
+ MOCK_METHOD(Result<std::unique_ptr<OutputStream>>, Create,
+ (const std::string& path, bool overwrite), (const, override));
+ MOCK_METHOD(Status, Mkdirs, (const std::string& path), (const, override));
+ MOCK_METHOD(Status, Rename, (const std::string& src, const std::string&
dst),
+ (const, override));
+ MOCK_METHOD(Status, Delete, (const std::string& path, bool recursive),
(const, override));
+ MOCK_METHOD(Result<std::unique_ptr<FileStatus>>, GetFileStatus, (const
std::string& path),
+ (const, override));
+ MOCK_METHOD(Status, ListDir,
+ (const std::string& directory,
+ std::vector<std::unique_ptr<BasicFileStatus>>*
file_status_list),
+ (const, override));
+ MOCK_METHOD(Status, ListFileStatus,
+ (const std::string& path,
+ std::vector<std::unique_ptr<FileStatus>>* file_status_list),
+ (const, override));
+ MOCK_METHOD(Result<bool>, Exists, (const std::string& path), (const,
override));
+
+ const std::string& GetIdentifier() const {
+ return identifier_;
+ }
+ const std::string& GetAuthority() const {
+ return authority_;
+ }
+
+ private:
+ std::string identifier_;
+ std::string authority_;
+};
+
+class GmockFileSystemFactory : public FileSystemFactory {
+ public:
+ explicit GmockFileSystemFactory(const std::string& identifier) :
identifier_(identifier) {}
+
+ const char* Identifier() const override {
+ return identifier_.c_str();
+ }
+
+ Result<std::unique_ptr<FileSystem>> Create(
+ const std::string& path_str,
+ const std::map<std::string, std::string>& options) const override {
+ PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_str));
+ return
std::make_unique<testing::NiceMock<GmockFileSystem>>(identifier_,
path.authority);
+ }
+
+ private:
+ std::string identifier_;
+};
+
+class ResolvingFileSystemTest : public testing::Test {
+ public:
+ void SetUp() override {
+ auto factory_creator = FactoryCreator::GetInstance();
+
+ // Register mock factories
+ factory_creator->Register("mock_local", new
GmockFileSystemFactory("mock_local"));
+ factory_creator->Register("mock_hdfs", new
GmockFileSystemFactory("mock_hdfs"));
+ factory_creator->Register("mock_oss", new
GmockFileSystemFactory("mock_oss"));
+ }
+
+ void TearDown() override {
+ auto factory_creator = FactoryCreator::GetInstance();
+ factory_creator->TEST_Unregister("mock_local");
+ factory_creator->TEST_Unregister("mock_hdfs");
+ factory_creator->TEST_Unregister("mock_oss");
+ }
+};
+
+TEST_F(ResolvingFileSystemTest, GetRealFileSystemWithFileScheme) {
+ std::map<std::string, std::string> scheme_mapping = {
+ {"file", "mock_local"}, {"hdfs", "mock_hdfs"}, {"oss", "mock_oss"}};
+ ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local",
/*options=*/{});
+
+ auto check_result = [&](const std::string& uri, const std::string&
expected_identifier) {
+ ASSERT_OK_AND_ASSIGN(auto real_fs,
resolving_fs.GetRealFileSystem(uri));
+ auto casted_fs = dynamic_cast<GmockFileSystem*>(real_fs.get());
+ ASSERT_TRUE(casted_fs);
+ ASSERT_EQ(casted_fs->GetIdentifier(), expected_identifier);
+ };
+ check_result("file:///tmp/test.txt", "mock_local");
+ check_result("hdfs://node:9000/tmp/test.txt", "mock_hdfs");
+ check_result("oss://bucket/tmp/test.txt", "mock_oss");
+ // Empty scheme should map to "file" scheme
+ check_result("/tmp/test.txt", "mock_local");
+ check_result("tmp/test.txt", "mock_local");
+ // Unknown scheme should use default file system
+ check_result("s3://bucket/tmp/test.txt", "mock_local");
+}
+
+TEST_F(ResolvingFileSystemTest, EmptySchemeMapping) {
+ std::map<std::string, std::string> empty_scheme_mapping;
+ ResolvingFileSystem resolving_fs(empty_scheme_mapping, "mock_local",
/*options=*/{});
+
+ // Should use default file system for all schemes
+ ASSERT_OK_AND_ASSIGN(auto real_fs,
resolving_fs.GetRealFileSystem("any_scheme://path/test"));
+ auto casted_fs = dynamic_cast<GmockFileSystem*>(real_fs.get());
+ ASSERT_TRUE(casted_fs);
+ ASSERT_EQ(casted_fs->GetIdentifier(), "mock_local");
+}
+
+TEST_F(ResolvingFileSystemTest, FileSystemCaching) {
+ std::map<std::string, std::string> scheme_mapping = {{"file",
"mock_local"}};
+ ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local",
/*options=*/{});
+
+ // First call should create and cache the file system
+ ASSERT_OK_AND_ASSIGN(auto fs1,
resolving_fs.GetRealFileSystem("file:///tmp/test1.txt"));
+ // Second call with same scheme should use cached file system
+ ASSERT_OK_AND_ASSIGN(auto fs2,
resolving_fs.GetRealFileSystem("file:///tmp/test2.txt"));
+ ASSERT_EQ(fs1, fs2);
+}
+
+TEST_F(ResolvingFileSystemTest, DifferentAuthoritiesCacheSeparately) {
+ std::map<std::string, std::string> scheme_mapping = {{"hdfs",
"mock_hdfs"}};
+ ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local",
/*options=*/{});
+
+ // Different authorities should create separate file system instances
+ ASSERT_OK_AND_ASSIGN(auto fs1,
resolving_fs.GetRealFileSystem("hdfs://node1:9000/test.txt"));
+ ASSERT_OK_AND_ASSIGN(auto fs2,
resolving_fs.GetRealFileSystem("hdfs://node2:9000/test.txt"));
+ ASSERT_NE(fs1, fs2);
+}
+
+TEST_F(ResolvingFileSystemTest, ThreadSafety) {
+ std::map<std::string, std::string> scheme_mapping = {{"file",
"mock_local"}};
+ ResolvingFileSystem resolving_fs(scheme_mapping, "mock_local",
/*options=*/{});
+ const int32_t num_threads = 100;
+ const int32_t operations_per_thread = 10;
+
+ auto executor = CreateDefaultExecutor();
+ std::vector<std::future<std::vector<std::shared_ptr<FileSystem>>>> futures;
+ for (int32_t i = 0; i < num_threads; ++i) {
+ futures.push_back(
+ Via(executor.get(), [&resolving_fs, i]() ->
std::vector<std::shared_ptr<FileSystem>> {
+ std::vector<std::shared_ptr<FileSystem>> fs_list;
+ for (int32_t j = 0; j < operations_per_thread; ++j) {
+ std::string path = "file:///tmp/thread_" +
std::to_string(i) + "_" +
+ std::to_string(j) + ".txt";
+ EXPECT_OK_AND_ASSIGN(auto fs,
resolving_fs.GetRealFileSystem(path));
+ EXPECT_TRUE(fs);
+ fs_list.push_back(fs);
+ }
+ return fs_list;
+ }));
+ }
+
+ // Verify that all threads should get the same file system instance
+ auto results = CollectAll(futures);
+ auto fs0 = results[0][0];
+ for (const auto& fs_list : results) {
+ for (const auto& fs : fs_list) {
+ ASSERT_EQ(fs, fs0);
+ }
+ }
+}
+
+TEST_F(ResolvingFileSystemTest, ThreadSafety2) {
+ std::map<std::string, std::string> scheme_mapping = {{"oss", "mock_oss"}};
+ ResolvingFileSystem resolving_fs(scheme_mapping, "mock_oss",
/*options=*/{});
+ const int32_t num_threads = 100;
+ const int32_t operations_per_thread = 10;
+
+ auto executor = CreateDefaultExecutor();
+ std::vector<std::future<std::vector<std::shared_ptr<FileSystem>>>> futures;
+ for (int32_t i = 0; i < num_threads; ++i) {
+ futures.push_back(
+ Via(executor.get(), [&resolving_fs, i]() ->
std::vector<std::shared_ptr<FileSystem>> {
+ std::vector<std::shared_ptr<FileSystem>> fs_list;
+ for (int32_t j = 0; j < operations_per_thread; ++j) {
+ std::string path =
+ "oss://bucket_" + std::to_string(i) + "/" +
std::to_string(j) + ".txt";
+ EXPECT_OK_AND_ASSIGN(auto fs,
resolving_fs.GetRealFileSystem(path));
+ EXPECT_TRUE(fs);
+ fs_list.push_back(fs);
+ }
+ return fs_list;
+ }));
+ }
+
+ auto results = CollectAll(futures);
+ for (size_t i = 0; i < results.size(); ++i) {
+ for (size_t j = 0; j < results[i].size(); ++j) {
+ auto casted_fs =
dynamic_cast<GmockFileSystem*>(results[i][j].get());
+ ASSERT_TRUE(casted_fs);
+ ASSERT_EQ(casted_fs->GetAuthority(), "bucket_" +
std::to_string(i));
+ }
+ }
+}
+
+} // namespace paimon::test