This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 92159c3 feat(fs): introduce local file system (#41)
92159c3 is described below
commit 92159c304929cada5d1d0d967a7f89e24b89636a
Author: Zhang Jiawei <[email protected]>
AuthorDate: Fri Jun 5 13:15:38 2026 +0800
feat(fs): introduce local file system (#41)
* feat(fs): introduce local file system
* fix(fs): normalize local filesystem paths
---
src/paimon/common/fs/external_path_provider.h | 10 +-
.../common/fs/external_path_provider_test.cpp | 49 +++
src/paimon/common/fs/file_system_test.cpp | 83 ++++-
src/paimon/common/table/special_fields.h | 3 +-
src/paimon/common/table/special_fields_test.cpp | 1 +
src/paimon/common/utils/path_util.cpp | 16 +
src/paimon/common/utils/path_util.h | 1 +
src/paimon/common/utils/path_util_test.cpp | 22 ++
src/paimon/fs/local/local_file.cpp | 372 +++++++++++++++++++++
src/paimon/fs/local/local_file.h | 79 +++++
src/paimon/fs/local/local_file_status.h | 76 +++++
src/paimon/fs/local/local_file_system.cpp | 319 ++++++++++++++++++
src/paimon/fs/local/local_file_system.h | 107 ++++++
src/paimon/fs/local/local_file_system_factory.cpp | 29 ++
src/paimon/fs/local/local_file_system_factory.h | 47 +++
src/paimon/fs/local/local_file_test.cpp | 226 +++++++++++++
16 files changed, 1430 insertions(+), 10 deletions(-)
diff --git a/src/paimon/common/fs/external_path_provider.h
b/src/paimon/common/fs/external_path_provider.h
index 568d3ac..c70aa2f 100644
--- a/src/paimon/common/fs/external_path_provider.h
+++ b/src/paimon/common/fs/external_path_provider.h
@@ -19,6 +19,7 @@
#pragma once
+#include <atomic>
#include <cstddef>
#include <memory>
#include <random>
@@ -48,12 +49,9 @@ class ExternalPathProvider {
///
/// @return the next external data path
std::string GetNextExternalDataPath(const std::string& file_name) {
- position_++;
- if (position_ == external_table_paths_.size()) {
- position_ = 0;
- }
+ size_t position = (++position_) % external_table_paths_.size();
return PathUtil::JoinPath(
- PathUtil::JoinPath(external_table_paths_[position_],
relative_bucket_path_), file_name);
+ PathUtil::JoinPath(external_table_paths_[position],
relative_bucket_path_), file_name);
}
private:
@@ -69,6 +67,6 @@ class ExternalPathProvider {
private:
std::vector<std::string> external_table_paths_;
std::string relative_bucket_path_;
- size_t position_;
+ std::atomic<size_t> position_;
};
} // namespace paimon
diff --git a/src/paimon/common/fs/external_path_provider_test.cpp
b/src/paimon/common/fs/external_path_provider_test.cpp
index 6893bfb..5ef37f2 100644
--- a/src/paimon/common/fs/external_path_provider_test.cpp
+++ b/src/paimon/common/fs/external_path_provider_test.cpp
@@ -19,7 +19,11 @@
#include "paimon/common/fs/external_path_provider.h"
+#include <cstdint>
+#include <future>
+#include <mutex>
#include <set>
+#include <vector>
#include "gtest/gtest.h"
#include "paimon/testing/utils/testharness.h"
@@ -65,4 +69,49 @@ TEST(ExternalPathProviderTest, TestGetNextExternalDataPath2)
{
"/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc",
}));
}
+
+TEST(ExternalPathProviderTest, TestGetNextExternalDataPathConcurrently) {
+ std::vector<std::string> external_table_paths;
+ external_table_paths.emplace_back("/tmp/external_path_a/");
+ external_table_paths.emplace_back("/tmp/external_path_b/");
+ external_table_paths.emplace_back("/tmp/external_path_c/");
+ std::string relative_bucket_path = "p0=1/p1=0/bucket-0";
+
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<ExternalPathProvider> provider,
+ ExternalPathProvider::Create(external_table_paths,
relative_bucket_path));
+
+ const std::set<std::string> expected_data_paths = {
+ "/tmp/external_path_a/p0=1/p1=0/bucket-0/file.orc",
+ "/tmp/external_path_b/p0=1/p1=0/bucket-0/file.orc",
+ "/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc",
+ };
+ std::mutex mutex;
+ std::vector<std::string> result_data_paths;
+ constexpr int32_t kThreadCount = 8;
+ constexpr int32_t kPathCountPerThread = 1000;
+
+ std::vector<std::future<void>> futures;
+ futures.reserve(kThreadCount);
+ for (int32_t i = 0; i < kThreadCount; ++i) {
+ futures.emplace_back(std::async(std::launch::async, [&]() {
+ std::vector<std::string> local_paths;
+ local_paths.reserve(kPathCountPerThread);
+ for (int32_t j = 0; j < kPathCountPerThread; ++j) {
+
local_paths.push_back(provider->GetNextExternalDataPath("file.orc"));
+ }
+
+ std::lock_guard<std::mutex> lock(mutex);
+ result_data_paths.insert(result_data_paths.end(),
local_paths.begin(),
+ local_paths.end());
+ }));
+ }
+ for (auto& future : futures) {
+ future.get();
+ }
+
+ ASSERT_EQ(result_data_paths.size(), kThreadCount * kPathCountPerThread);
+ for (const auto& data_path : result_data_paths) {
+ ASSERT_TRUE(expected_data_paths.count(data_path)) << data_path;
+ }
+}
} // namespace paimon::test
diff --git a/src/paimon/common/fs/file_system_test.cpp
b/src/paimon/common/fs/file_system_test.cpp
index 815b983..b7188c4 100644
--- a/src/paimon/common/fs/file_system_test.cpp
+++ b/src/paimon/common/fs/file_system_test.cpp
@@ -162,7 +162,12 @@ class FileSystemTest : public ::testing::Test, public
::testing::WithParamInterf
std::string GetTestDir() const {
std::string file_system = GetParam();
if (file_system == "local") {
- return paimon::test::GetDataDir();
+ std::string data_dir = paimon::test::GetDataDir();
+ if (data_dir.empty() || data_dir[0] != '/') {
+ EXPECT_OK_AND_ASSIGN(std::string current_path,
PathUtil::GetWorkingDirectory());
+ data_dir = PathUtil::JoinPath(current_path, data_dir);
+ }
+ return data_dir;
} else if (file_system == "jindo") {
return "oss://paimon-unittest/test_data/";
}
@@ -212,6 +217,28 @@ TEST_P(FileSystemTest, TestCreate) {
ASSERT_NOK_WITH_MSG(fs_->Create(path, /*overwrite=*/false), "already
exists");
}
+TEST_P(FileSystemTest, TestCreateRelativeFileInCurrentDirectory) {
+ if (GetParam() != "local") {
+ GTEST_SKIP() << "this test is only tested for the local file system";
+ }
+
+ std::string path = "relative_file_" + RandomName();
+ ASSERT_OK_AND_ASSIGN(auto out, fs_->Create(path, /*overwrite=*/true));
+ std::string content = "content";
+ ASSERT_OK_AND_ASSIGN(int32_t write_len, out->Write(content.data(),
content.size()));
+ ASSERT_EQ(write_len, content.size());
+ ASSERT_OK_AND_ASSIGN(std::string uri, out->GetUri());
+ ASSERT_FALSE(uri.empty());
+ ASSERT_EQ(uri[0], '/');
+ ASSERT_EQ(PathUtil::GetName(uri), path);
+ ASSERT_OK(out->Close());
+
+ std::string read_content;
+ ASSERT_OK(fs_->ReadFile(path, &read_content));
+ ASSERT_EQ(read_content, content);
+ ASSERT_OK(fs_->Delete(path));
+}
+
// --- write&read
TEST_P(FileSystemTest, TestSimpleWriteAndRead) {
std::string content = "abcdefghijk";
@@ -651,6 +678,27 @@ TEST_P(FileSystemTest, TestRename) {
"src file is not a dir");
}
+TEST_P(FileSystemTest, TestRenameWithFileSchemeUsesNormalizedPath) {
+ if (GetParam() != "local") {
+ GTEST_SKIP() << "this test is only tested for the local file system";
+ }
+
+ const std::string src = "file:" + test_root_ + "/scheme_src.txt";
+ const std::string dst = "file:" + test_root_ + "/scheme_dst.txt";
+
+ ASSERT_OK(fs_->WriteFile(src, "content", /*overwrite=*/false));
+ ASSERT_OK(fs_->Rename(src, dst));
+
+ ASSERT_OK_AND_ASSIGN(bool src_exists, fs_->Exists(src));
+ ASSERT_FALSE(src_exists);
+ ASSERT_OK_AND_ASSIGN(bool dst_exists, fs_->Exists(dst));
+ ASSERT_TRUE(dst_exists);
+
+ std::string content;
+ ASSERT_OK(fs_->ReadFile(dst, &content));
+ ASSERT_EQ(content, "content");
+}
+
TEST_P(FileSystemTest, TestRename2) {
{
// test rename dir
@@ -784,6 +832,19 @@ TEST_P(FileSystemTest, TestExists) {
ASSERT_TRUE(is_exist);
}
+TEST_P(FileSystemTest, TestExistsInLocalFileSystem) {
+ if (GetParam() != "local") {
+ GTEST_SKIP() << "this test is only tested for the local file system";
+ }
+
+ ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists("/"));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(""));
+ ASSERT_TRUE(is_exist);
+ ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists("."));
+ ASSERT_TRUE(is_exist);
+}
+
// --- delete
TEST_P(FileSystemTest, TestExistingFileDeletion) {
auto check = [&](bool recursive) {
@@ -988,8 +1049,16 @@ TEST_P(FileSystemTest, TestMkdir) {
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp/local/f/1"));
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1"));
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1/f2/"));
+}
+
+TEST_P(FileSystemTest, TestMkdirInLocalFileSystem) {
+ if (GetParam() != "local") {
+ GTEST_SKIP() << "this test is only tested for the local file system";
+ }
+
ASSERT_OK(fs_->Mkdirs("/"));
- ASSERT_NOK_WITH_MSG(fs_->Mkdirs(""), "path is an empty string.");
+ ASSERT_OK(fs_->Mkdirs(""));
+ ASSERT_OK(fs_->Mkdirs("."));
}
TEST_P(FileSystemTest, TestMkdir2) {
@@ -1402,6 +1471,14 @@ TEST_P(FileSystemTest, TestAtomicStoreAlreadyExist) {
ASSERT_TRUE(is_exist);
}
-INSTANTIATE_TEST_SUITE_P(UseLocal, FileSystemTest, ::testing::Values("local"
/*, "jindo"*/));
+std::vector<std::string> GetTestValuesForFileSystemTest() {
+ std::vector<std::string> values;
+ values.emplace_back("local");
+ // values.emplace_back("jindo");
+ return values;
+}
+
+INSTANTIATE_TEST_SUITE_P(FsType, FileSystemTest,
+
::testing::ValuesIn(GetTestValuesForFileSystemTest()));
} // namespace paimon::test
diff --git a/src/paimon/common/table/special_fields.h
b/src/paimon/common/table/special_fields.h
index 384150e..b431964 100644
--- a/src/paimon/common/table/special_fields.h
+++ b/src/paimon/common/table/special_fields.h
@@ -67,7 +67,8 @@ struct SpecialFields {
static bool IsSpecialFieldName(const std::string& field_name) {
if (field_name == SequenceNumber().Name() || field_name ==
ValueKind().Name() ||
- field_name == RowId().Name() || field_name == IndexScore().Name())
{
+ field_name == RowKind().Name() || field_name == RowId().Name() ||
+ field_name == IndexScore().Name()) {
return true;
}
return false;
diff --git a/src/paimon/common/table/special_fields_test.cpp
b/src/paimon/common/table/special_fields_test.cpp
index 1808fad..0c17918 100644
--- a/src/paimon/common/table/special_fields_test.cpp
+++ b/src/paimon/common/table/special_fields_test.cpp
@@ -63,6 +63,7 @@ TEST(SpecialFieldsTest, TestIsSpecialFieldName) {
ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_SEQUENCE_NUMBER"));
ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_VALUE_KIND"));
ASSERT_FALSE(SpecialFields::IsSpecialFieldName("VALUE_KIND"));
+ ASSERT_TRUE(SpecialFields::IsSpecialFieldName("rowkind"));
ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_ROW_ID"));
ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_INDEX_SCORE"));
}
diff --git a/src/paimon/common/utils/path_util.cpp
b/src/paimon/common/utils/path_util.cpp
index a977ab8..143049e 100644
--- a/src/paimon/common/utils/path_util.cpp
+++ b/src/paimon/common/utils/path_util.cpp
@@ -19,8 +19,13 @@
#include "paimon/common/utils/path_util.h"
+#include <unistd.h>
+
+#include <cerrno>
#include <cstddef>
#include <cstdint>
+#include <cstdlib>
+#include <cstring>
#include <utility>
#include "fmt/format.h"
@@ -145,6 +150,17 @@ void PathUtil::TrimLastDelim(std::string* dir_path)
noexcept {
}
}
+Result<std::string> PathUtil::GetWorkingDirectory() noexcept {
+ char* path = getcwd(nullptr, 0);
+ if (path != nullptr) {
+ std::string ret(path);
+ free(path);
+ return ret;
+ }
+ return Status::IOError(
+ fmt::format("get working directory failed, ec: {}",
std::strerror(errno)));
+}
+
Result<std::string> PathUtil::CreateTempPath(const std::string& path) noexcept
{
std::string uuid;
if (!UUID::Generate(&uuid)) {
diff --git a/src/paimon/common/utils/path_util.h
b/src/paimon/common/utils/path_util.h
index b893e76..9aaff11 100644
--- a/src/paimon/common/utils/path_util.h
+++ b/src/paimon/common/utils/path_util.h
@@ -47,6 +47,7 @@ class PAIMON_EXPORT PathUtil {
static std::string GetParentDirPath(const std::string& path) noexcept;
static std::string GetName(const std::string& path) noexcept;
static void TrimLastDelim(std::string* dir_path) noexcept;
+ static Result<std::string> GetWorkingDirectory() noexcept;
static Result<std::string> CreateTempPath(const std::string& path)
noexcept;
static Result<Path> ToPath(const std::string& path) noexcept;
static Result<std::string> NormalizePath(const std::string& path) noexcept;
diff --git a/src/paimon/common/utils/path_util_test.cpp
b/src/paimon/common/utils/path_util_test.cpp
index 8e7ed4c..9ab278e 100644
--- a/src/paimon/common/utils/path_util_test.cpp
+++ b/src/paimon/common/utils/path_util_test.cpp
@@ -94,6 +94,12 @@ TEST(PathUtilsTest, TestTrimLastDelim) {
}
}
+TEST(PathUtilsTest, TestGetWorkingDirectory) {
+ ASSERT_OK_AND_ASSIGN(std::string current_path,
PathUtil::GetWorkingDirectory());
+ ASSERT_FALSE(current_path.empty());
+ ASSERT_EQ(current_path[0], '/');
+}
+
TEST(PathUtilsTest, TestToPath) {
{
std::string test_path = "";
@@ -131,6 +137,22 @@ TEST(PathUtilsTest, TestToPath) {
ASSERT_EQ(path.path, "/tmp/index");
ASSERT_EQ(path.ToString(), "/tmp/index");
}
+ {
+ std::string test_path = ".";
+ ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path));
+ ASSERT_EQ(path.scheme, "");
+ ASSERT_EQ(path.authority, "");
+ ASSERT_EQ(path.path, ".");
+ ASSERT_EQ(path.ToString(), ".");
+ }
+ {
+ std::string test_path = "relative/path";
+ ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path));
+ ASSERT_EQ(path.scheme, "");
+ ASSERT_EQ(path.authority, "");
+ ASSERT_EQ(path.path, "relative/path");
+ ASSERT_EQ(path.ToString(), "relative/path");
+ }
}
TEST(PathUtilsTest, TestGetName) {
diff --git a/src/paimon/fs/local/local_file.cpp
b/src/paimon/fs/local/local_file.cpp
new file mode 100644
index 0000000..0291c59
--- /dev/null
+++ b/src/paimon/fs/local/local_file.cpp
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/local/local_file.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include <cerrno>
+#include <cstring>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/factories/io_hook.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/fs/local/local_file_status.h"
+
+namespace paimon {
+
+// TODO(yonghao.fyh): move io_hook.h to test/test_util and add a
HookLocalFileSystem only for test
+#define CHECK_HOOK() \
+ if (hook_) { \
+ PAIMON_RETURN_NOT_OK(hook_->Try(path_)); \
+ }
+
+Result<std::unique_ptr<LocalFile>> LocalFile::Create(const std::string&
path_string) {
+ if (path_string.empty()) {
+ PAIMON_ASSIGN_OR_RAISE(std::string current_path,
PathUtil::GetWorkingDirectory());
+ return std::unique_ptr<LocalFile>(new LocalFile(current_path));
+ }
+
+ // local file system does not support path_string with scheme, e.g.,
"file:/tmp" will be
+ // rewritten to "/tmp"
+ PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_string));
+ if (!path.scheme.empty() && StringUtils::ToLowerCase(path.scheme) !=
"file") {
+ return Status::Invalid(fmt::format("invalid scheme {} for local file
system", path.scheme));
+ }
+ if (path.path.empty() || path.path[0] != '/') {
+ PAIMON_ASSIGN_OR_RAISE(std::string current_path,
PathUtil::GetWorkingDirectory());
+ return std::unique_ptr<LocalFile>(
+ new LocalFile(PathUtil::JoinPath(current_path, path.path)));
+ }
+ return std::unique_ptr<LocalFile>(new LocalFile(path.path));
+}
+
+LocalFile::LocalFile(const std::string& path) : path_(path),
hook_(IOHook::GetInstance()) {}
+
+LocalFile::~LocalFile() {
+ if (file_) {
+ std::fclose(file_);
+ }
+}
+
+Result<bool> LocalFile::Exists() const {
+ CHECK_HOOK();
+ if (access(path_.c_str(), F_OK) == 0) {
+ return true;
+ } else if (errno == ENOENT) {
+ return false;
+ }
+ int32_t cur_errno = errno;
+ return Status::IOError(
+ fmt::format("path '{}' check exists fail, ec: {}", path_,
std::strerror(cur_errno)));
+}
+
+Result<bool> LocalFile::IsFile() const {
+ CHECK_HOOK();
+ bool is_file = false;
+ struct stat buf;
+ if (stat(path_.c_str(), &buf) < 0) {
+ return Status::IOError(
+ fmt::format("path '{}' check isFile fail, ec: {}", path_,
std::strerror(errno)));
+ }
+ if (S_ISREG(buf.st_mode)) {
+ is_file = true;
+ }
+ return is_file;
+}
+
+Result<bool> LocalFile::IsDir() const {
+ CHECK_HOOK();
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFileStatus> file_status,
GetFileStatus());
+ return file_status->IsDir();
+}
+
+Status LocalFile::List(std::vector<std::string>* file_list) const {
+ CHECK_HOOK();
+ file_list->clear();
+ DIR* dp;
+ struct dirent* ep;
+ dp = opendir(path_.c_str());
+ if (dp == nullptr) {
+ int32_t cur_errno = errno;
+ return Status::IOError(
+ fmt::format("list path '{}' fail, ec: {}", path_,
std::strerror(cur_errno)));
+ }
+
+ while ((ep = readdir(dp)) != nullptr) {
+ if (strcmp(ep->d_name, ".") == 0 || strcmp(ep->d_name, "..") == 0) {
+ continue;
+ }
+ file_list->push_back(ep->d_name);
+ }
+ if (closedir(dp) < 0) {
+ file_list->clear();
+ int32_t cur_errno = errno;
+ return Status::IOError(
+ fmt::format("list path '{}' fail, ec: {}", path_,
std::strerror(cur_errno)));
+ }
+ return Status::OK();
+}
+
+Status LocalFile::ListFiles(std::vector<std::unique_ptr<LocalFile>>*
file_list) const {
+ CHECK_HOOK();
+ file_list->clear();
+ std::vector<std::string> file_names;
+ PAIMON_RETURN_NOT_OK(List(&file_names));
+ for (const auto& file_name : file_names) {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file,
+ LocalFile::Create(PathUtil::JoinPath(path_,
file_name)));
+ file_list->push_back(std::move(file));
+ }
+ return Status::OK();
+}
+
+Status LocalFile::Delete() const {
+ CHECK_HOOK();
+ PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists());
+ if (is_exist) {
+ if (::remove(path_.c_str()) != 0) {
+ if (errno != ENOENT) {
+ return Status::IOError(
+ fmt::format("delete path '{}' fail, ec: {}", path_,
std::strerror(errno)));
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Result<bool> LocalFile::Mkdir() const {
+ CHECK_HOOK();
+ return mkdir(path_.c_str(), 0755) == 0;
+}
+
+Result<std::unique_ptr<LocalFileStatus>> LocalFile::GetFileStatus() const {
+ CHECK_HOOK();
+ struct stat buf;
+ if (stat(path_.c_str(), &buf) < 0) {
+ int32_t cur_errno = errno;
+ return Status::IOError(
+ fmt::format("get file '{}' status failed, ec: {}", path_,
std::strerror(cur_errno)));
+ }
+ return std::make_unique<LocalFileStatus>(path_, buf.st_size, buf.st_mtime
* 1000,
+ S_ISDIR(buf.st_mode));
+}
+
+Result<uint64_t> LocalFile::Length() const {
+ CHECK_HOOK();
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFileStatus> file_status,
GetFileStatus());
+ return file_status->GetLen();
+}
+
+Result<int64_t> LocalFile::LastModifiedTimeMs() const {
+ CHECK_HOOK();
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFileStatus> file_status,
GetFileStatus());
+ return file_status->GetModificationTime();
+}
+
+std::unique_ptr<LocalFile> LocalFile::GetParentFile() const {
+ size_t pos = path_.rfind('/');
+ if (pos == std::string::npos) {
+ return std::unique_ptr<LocalFile>(new LocalFile(""));
+ } else {
+ std::string parent_dir = path_.substr(0, pos);
+ return std::unique_ptr<LocalFile>(new LocalFile(parent_dir));
+ }
+}
+
+const std::string& LocalFile::GetPath() const {
+ return path_;
+}
+
+Result<int32_t> LocalFile::Read(char* buffer, uint32_t length, uint64_t
offset) {
+ if (file_) {
+ CHECK_HOOK();
+ int32_t fd = fileno(file_);
+ auto more = static_cast<int32_t>(length);
+ if (more < 0) {
+ return Status::IOError(fmt::format(
+ "pread file '{}' fail, length overflow int32_t, ec:
EC_BADARGS", path_));
+ }
+
+ uint64_t off = 0;
+ int32_t ret = 0;
+ while (more > 0) {
+ ret = ::pread(fd, buffer + off, more, offset + off);
+ if (ret == -1) {
+ return Status::IOError(fmt::format("pread file '{}' fail at
off {}, ec: {}", path_,
+ off, std::strerror(errno)));
+ }
+ if (ret == 0) {
+ break;
+ }
+ more -= ret;
+ off += ret;
+ }
+ return off;
+ }
+ return Status::IOError(fmt::format(
+ "read file '{}' fail, can not read file which is opened fail, ec:
EBADF", path_));
+}
+
+Result<int32_t> LocalFile::Read(char* buffer, uint32_t length) {
+ if (file_) {
+ CHECK_HOOK();
+ auto more = static_cast<int32_t>(length);
+ if (more < 0) {
+ return Status::IOError(
+ fmt::format("fileName '{}', length '{}', ec: EC_BADARGS",
path_, length));
+ }
+
+ int32_t ret = 0;
+ uint64_t off = 0;
+ while (more > 0) {
+ ret = fread(buffer + off, 1, more, file_);
+ if (ferror(file_) != 0) {
+ return Status::IOError(fmt::format("read file '{}' fail at off
{}, ec: {}", path_,
+ off, std::strerror(errno)));
+ }
+ more -= ret;
+ off += ret;
+ if (feof(file_)) {
+ break;
+ }
+ }
+ return off;
+ }
+
+ return Status::IOError(fmt::format(
+ "read file '{}' fail, can not read file which is opened fail, ec:
EBADF", path_));
+}
+
+Result<int32_t> LocalFile::Write(const char* buffer, uint32_t length) {
+ if (file_) {
+ CHECK_HOOK();
+ auto more = static_cast<int32_t>(length);
+ if (more < 0) {
+ return Status::IOError(fmt::format(
+ "write file '{}' fail, length overflow int32_t, ec:
EC_BADARGS", path_));
+ }
+
+ int32_t ret = 0;
+ uint64_t off = 0;
+ while (more > 0) {
+ ret = fwrite(buffer + off, 1, more, file_);
+ if (ferror(file_) != 0) {
+ return Status::IOError(fmt::format("write file '{}' fail at
off {}, ec: {}", path_,
+ off, std::strerror(errno)));
+ }
+ more -= ret;
+ off += ret;
+ }
+ return off;
+ }
+
+ return Status::IOError(
+ fmt::format("write file '{}' fail, can not write file which not
opened, ec: EBADF", path_));
+}
+
+Status LocalFile::Flush() {
+ if (file_) {
+ CHECK_HOOK();
+ int32_t ret = fflush(file_);
+ if (0 == ret) {
+ CHECK_HOOK();
+ int32_t fd = fileno(file_);
+ ret |= fsync(fd);
+ }
+ if (0 != ret) {
+ return Status::IOError(
+ fmt::format("flush '{}' fail, ec: {}", path_,
std::strerror(errno)));
+ }
+ return Status::OK();
+ }
+ return Status::IOError(
+ fmt::format("flush '{}' fail, can not flush file which not opened, ec:
EBADF", path_));
+}
+
+Status LocalFile::OpenFile(bool is_read_file) {
+ if (is_read_file) {
+ PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists());
+ if (!is_exist) {
+ return Status::IOError(
+ fmt::format("direct openFile '{}' fail, file not exist, ec:
ENOENT", path_));
+ }
+ PAIMON_ASSIGN_OR_RAISE(bool is_dir, IsDir());
+ if (is_dir) {
+ return Status::IOError(fmt::format(
+ "direct openFile '{}' fail, cannot open a directory, ec:
EISDIR", path_));
+ }
+ CHECK_HOOK();
+ file_ = fopen(path_.c_str(), "r");
+ } else {
+ CHECK_HOOK();
+ file_ = fopen(path_.c_str(), "w");
+ }
+ if (file_ == nullptr) {
+ return Status::IOError(fmt::format("open '{}' fail, ec: {}", path_,
std::strerror(errno)));
+ }
+ return Status::OK();
+}
+
+Status LocalFile::Close() {
+ if (file_) {
+ CHECK_HOOK();
+ if (fclose(file_) != 0) {
+ file_ = nullptr;
+ return Status::IOError(
+ fmt::format("close '{}' fail, ec: {}", path_,
std::strerror(errno)));
+ }
+ file_ = nullptr;
+ }
+ return Status::OK();
+}
+
+Status LocalFile::Seek(int64_t offset, int32_t seek_origin) {
+ if (file_) {
+ CHECK_HOOK();
+ int32_t ret = 0;
+ ret = fseek(file_, offset, seek_origin);
+ if (ret != 0) {
+ return Status::IOError(
+ fmt::format("seek '{}' fail, ec: {}", path_,
std::strerror(errno)));
+ }
+ return Status::OK();
+ }
+ return Status::IOError(
+ fmt::format("seek '{}' fail, can not read file which not opened, ec:
EBADF", path_));
+}
+
+Result<int64_t> LocalFile::Tell() const {
+ if (file_) {
+ CHECK_HOOK();
+ int64_t ret = ftell(file_);
+ if (ret < 0) {
+ return Status::IOError(
+ fmt::format("tell '{}' fail, ec: {}", path_,
std::strerror(errno)));
+ }
+ return ret;
+ }
+ return Status::IOError(
+ fmt::format("tell '{}' fail, can not read file which not opened, ec:
EBADF", path_));
+}
+
+} // namespace paimon
diff --git a/src/paimon/fs/local/local_file.h b/src/paimon/fs/local/local_file.h
new file mode 100644
index 0000000..655c414
--- /dev/null
+++ b/src/paimon/fs/local/local_file.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <dirent.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/macros.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class IOHook;
+class LocalFileStatus;
+
+class LocalFile {
+ public:
+ static Result<std::unique_ptr<LocalFile>> Create(const std::string&
path_string);
+ PAIMON_DISALLOW_COPY_AND_ASSIGN(LocalFile);
+ ~LocalFile();
+
+ Result<bool> Exists() const;
+ Result<bool> IsFile() const;
+ Result<bool> IsDir() const;
+ Status List(std::vector<std::string>* file_list) const;
+ Status ListFiles(std::vector<std::unique_ptr<LocalFile>>* file_list) const;
+ Status Delete() const;
+ const std::string& GetPath() const;
+ std::unique_ptr<LocalFile> GetParentFile() const;
+ Result<bool> Mkdir() const;
+ Result<std::unique_ptr<LocalFileStatus>> GetFileStatus() const;
+ Result<uint64_t> Length() const;
+ Result<int64_t> LastModifiedTimeMs() const;
+ Status OpenFile(bool is_read_file);
+ Result<int32_t> Read(char* buffer, uint32_t length);
+ Result<int32_t> Read(char* buffer, uint32_t length, uint64_t offset);
+ Result<int32_t> Write(const char* buffer, uint32_t length);
+ Status Flush();
+ Status Close();
+ Status Seek(int64_t offset, int32_t seek_origin);
+ Result<int64_t> Tell() const;
+
+ bool IsEmpty() const {
+ return path_.empty();
+ }
+
+ private:
+ explicit LocalFile(const std::string& path);
+
+ const std::string path_;
+ FILE* file_ = nullptr;
+ IOHook* hook_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/fs/local/local_file_status.h
b/src/paimon/fs/local/local_file_status.h
new file mode 100644
index 0000000..eb1af1f
--- /dev/null
+++ b/src/paimon/fs/local/local_file_status.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "paimon/fs/file_system.h"
+
+namespace paimon {
+
+class LocalBasicFileStatus : public BasicFileStatus {
+ public:
+ LocalBasicFileStatus(const std::string& path, bool is_dir) : path_(path),
is_dir_(is_dir) {}
+
+ std::string GetPath() const override {
+ return path_;
+ }
+
+ bool IsDir() const override {
+ return is_dir_;
+ }
+
+ private:
+ std::string path_;
+ bool is_dir_;
+};
+
+class LocalFileStatus : public FileStatus {
+ public:
+ LocalFileStatus(const std::string& path, uint64_t length, int64_t
last_modification_time,
+ bool is_dir)
+ : path_(path),
+ length_(length),
+ last_modification_time_(last_modification_time),
+ is_dir_(is_dir) {}
+
+ std::string GetPath() const override {
+ return path_;
+ }
+
+ uint64_t GetLen() const override {
+ return length_;
+ }
+
+ int64_t GetModificationTime() const override {
+ return last_modification_time_;
+ }
+
+ bool IsDir() const override {
+ return is_dir_;
+ }
+
+ private:
+ std::string path_;
+ uint64_t length_;
+ int64_t last_modification_time_;
+ bool is_dir_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/fs/local/local_file_system.cpp
b/src/paimon/fs/local/local_file_system.cpp
new file mode 100644
index 0000000..258a39f
--- /dev/null
+++ b/src/paimon/fs/local/local_file_system.cpp
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/local/local_file_system.h"
+
+#include <fcntl.h>
+
+#include <cassert>
+#include <cerrno>
+#include <cstdio>
+#include <cstring>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/fs/local/local_file_status.h"
+
+namespace paimon {
+
+Result<bool> LocalFileSystem::Exists(const std::string& path) const {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file,
LocalFile::Create(path));
+ return file->Exists();
+}
+
+Result<std::unique_ptr<InputStream>> LocalFileSystem::Open(const std::string&
path) const {
+ PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists(path));
+ if (!is_exist) {
+ return Status::NotExist(fmt::format("File '{}' not exists", path));
+ }
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file,
LocalFile::Create(path));
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalInputStream> in,
+ LocalInputStream::Create(std::move(file)));
+ return in;
+}
+
+Result<std::unique_ptr<OutputStream>> LocalFileSystem::Create(const
std::string& path,
+ bool overwrite)
const {
+ PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists(path));
+ if (is_exist && !overwrite) {
+ return Status::Invalid(
+ fmt::format("do not allow overwrite, but the file {} already
exists", path));
+ }
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file,
LocalFile::Create(path));
+ std::unique_ptr<LocalFile> parent = file->GetParentFile();
+ PAIMON_RETURN_NOT_OK(Mkdirs(parent->GetPath()));
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalOutputStream> out,
+ LocalOutputStream::Create(std::move(file)));
+ return out;
+}
+
+Status LocalFileSystem::Mkdirs(const std::string& path) const {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file,
LocalFile::Create(path));
+ return MkdirsInternal(std::move(file));
+}
+
+Status LocalFileSystem::MkdirsInternal(std::unique_ptr<LocalFile>&& file)
const {
+ // Important: The 'Exists()' check above must come before the 'IsDir()'
+ // check to be safe when multiple parallel instances try to create the
directory
+ PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists());
+ if (is_exist) {
+ PAIMON_ASSIGN_OR_RAISE(bool is_dir, file->IsDir());
+ if (is_dir) {
+ return Status::OK();
+ } else {
+ // exists and is not a directory -> is a regular file
+ return Status::IOError(
+ fmt::format("file {} already exists and is not a directory",
file->GetPath()));
+ }
+ }
+
+ std::unique_ptr<LocalFile> parent = file->GetParentFile();
+ if (!parent->IsEmpty()) {
+ PAIMON_RETURN_NOT_OK(MkdirsInternal(std::move(parent)));
+ }
+ PAIMON_ASSIGN_OR_RAISE(bool success, file->Mkdir());
+ if (!success) {
+ PAIMON_ASSIGN_OR_RAISE(bool is_dir, file->IsDir());
+ if (is_dir) {
+ return Status::OK();
+ } else {
+ return Status::IOError(fmt::format("create directory '{}' failed",
file->GetPath()));
+ }
+ }
+ return Status::OK();
+}
+
+Result<std::unique_ptr<FileStatus>> LocalFileSystem::GetFileStatus(const
std::string& path) const {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file,
LocalFile::Create(path));
+ PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists());
+ if (is_exist) {
+ return file->GetFileStatus();
+ } else {
+ return Status::NotExist(
+ fmt::format("File {} does not exist or the user running "
+ "Paimon has insufficient permissions to access it.",
+ file->GetPath()));
+ }
+}
+
+Status LocalFileSystem::ListDir(
+ const std::string& directory,
+ std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) const {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file,
LocalFile::Create(directory));
+ PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists());
+ if (!is_exist) {
+ return Status::OK();
+ }
+ PAIMON_ASSIGN_OR_RAISE(bool is_file, file->IsFile());
+ if (is_file) {
+ return Status::IOError(
+ fmt::format("file {} already exists and is not a directory",
file->GetPath()));
+ } else {
+ std::vector<std::string> file_list;
+ PAIMON_RETURN_NOT_OK(file->List(&file_list));
+ file_status_list->reserve(file_status_list->size() + file_list.size());
+ for (const auto& f : file_list) {
+ Result<std::unique_ptr<FileStatus>> file_status =
+ GetFileStatus(PathUtil::JoinPath(directory, f));
+ if (!file_status.ok() && !file_status.status().IsNotExist()) {
+ return file_status.status();
+ } else if (file_status.ok()) {
+
file_status_list->emplace_back(std::make_unique<LocalBasicFileStatus>(
+ file_status.value()->GetPath(),
file_status.value()->IsDir()));
+ }
+ }
+ return Status::OK();
+ }
+}
+
+Status LocalFileSystem::ListFileStatus(
+ const std::string& path, std::vector<std::unique_ptr<FileStatus>>*
file_status_list) const {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file,
LocalFile::Create(path));
+ PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists());
+ if (!is_exist) {
+ return Status::OK();
+ }
+ PAIMON_ASSIGN_OR_RAISE(bool is_file, file->IsFile());
+ if (is_file) {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStatus> file_status,
file->GetFileStatus());
+ file_status_list->emplace_back(std::move(file_status));
+ } else {
+ std::vector<std::string> file_list;
+ PAIMON_RETURN_NOT_OK(file->List(&file_list));
+ file_status_list->reserve(file_status_list->size() + file_list.size());
+ for (const auto& f : file_list) {
+ Result<std::unique_ptr<FileStatus>> file_status =
+ GetFileStatus(PathUtil::JoinPath(path, f));
+ if (!file_status.ok() && !file_status.status().IsNotExist()) {
+ return file_status.status();
+ } else if (file_status.ok()) {
+ file_status_list->emplace_back(std::move(file_status).value());
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status LocalFileSystem::Delete(const std::string& path, bool recursive) const {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file,
LocalFile::Create(path));
+ PAIMON_ASSIGN_OR_RAISE(bool is_file, file->IsFile());
+ if (is_file) {
+ return file->Delete();
+ }
+ return Delete(std::move(file), recursive);
+}
+
+Status LocalFileSystem::Delete(std::unique_ptr<LocalFile>&& file, bool
recursive) const {
+ PAIMON_ASSIGN_OR_RAISE(bool is_dir, file->IsDir());
+ if (is_dir) {
+ std::vector<std::unique_ptr<LocalFile>> files;
+ PAIMON_RETURN_NOT_OK(file->ListFiles(&files));
+ if (recursive == false && !files.empty()) {
+ return Status::IOError(
+ fmt::format("cannot delete {}, directory is not empty",
file->GetPath()));
+ }
+ for (auto& child : files) {
+ PAIMON_RETURN_NOT_OK(Delete(std::move(child)));
+ }
+ }
+ // Now directory is empty
+ return file->Delete();
+}
+
+Status LocalFileSystem::Rename(const std::string& src, const std::string& dst)
const {
+ std::string err_msg = fmt::format("rename '{}' to '{}' failed, because: ",
src, dst);
+ PAIMON_ASSIGN_OR_RAISE(bool is_src_exist, Exists(src));
+ if (!is_src_exist) {
+ return Status::NotExist(err_msg, "src file not exist");
+ }
+ PAIMON_ASSIGN_OR_RAISE(bool is_dst_exist, Exists(dst));
+ if (is_dst_exist) {
+ return Status::Invalid(err_msg, "dst file already exist");
+ }
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> src_file,
LocalFile::Create(src));
+ PAIMON_ASSIGN_OR_RAISE(bool is_file, src_file->IsFile());
+ std::string new_file_name = dst;
+
+ if (is_file && new_file_name[new_file_name.length() - 1] == '/') {
+ return Status::Invalid(err_msg, "src file is not a dir");
+ }
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> dst_file,
LocalFile::Create(dst));
+ std::unique_ptr<LocalFile> parent = dst_file->GetParentFile();
+ PAIMON_RETURN_NOT_OK(Mkdirs(parent->GetPath()));
+ if (::rename(src_file->GetPath().c_str(), dst_file->GetPath().c_str()) !=
0) {
+ int32_t cur_errno = errno;
+ return Status::IOError(err_msg, std::strerror(cur_errno));
+ }
+ return Status::OK();
+}
+
+// input stream
+Result<std::unique_ptr<LocalInputStream>> LocalInputStream::Create(
+ std::unique_ptr<LocalFile> file) {
+ PAIMON_RETURN_NOT_OK(file->OpenFile(/*is_read_file=*/true));
+ return std::unique_ptr<LocalInputStream>(new
LocalInputStream(std::move(file)));
+}
+
+LocalInputStream::LocalInputStream(std::unique_ptr<LocalFile>&& file) :
file_(std::move(file)) {}
+
+Status LocalInputStream::Seek(int64_t offset, SeekOrigin origin) {
+ if (origin != FS_SEEK_SET && origin != FS_SEEK_CUR && origin !=
FS_SEEK_END) {
+ return Status::Invalid(
+ "invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR, and
FS_SEEK_END");
+ }
+ auto convert_origin = [](SeekOrigin origin) -> int32_t {
+ switch (origin) {
+ case FS_SEEK_SET:
+ return SEEK_SET;
+ case FS_SEEK_CUR:
+ return SEEK_CUR;
+ case FS_SEEK_END:
+ return SEEK_END;
+ default:
+ return SEEK_SET;
+ }
+ };
+ return file_->Seek(offset, convert_origin(origin));
+}
+
+Result<int64_t> LocalInputStream::GetPos() const {
+ return file_->Tell();
+}
+
+Result<int32_t> LocalInputStream::Read(char* buffer, uint32_t size) {
+ PAIMON_ASSIGN_OR_RAISE(int32_t read_length, file_->Read(buffer, size));
+ if (read_length != static_cast<int32_t>(size)) {
+ return Status::IOError(fmt::format("file '{}' read size {} != expected
{}",
+ file_->GetPath(), read_length,
size));
+ }
+ return read_length;
+}
+
+Result<int32_t> LocalInputStream::Read(char* buffer, uint32_t size, uint64_t
offset) {
+ PAIMON_ASSIGN_OR_RAISE(int32_t read_length, file_->Read(buffer, size,
offset));
+ if (read_length != static_cast<int32_t>(size)) {
+ return Status::IOError(fmt::format("file '{}' read size {} != expected
{}",
+ file_->GetPath(), read_length,
size));
+ }
+ return read_length;
+}
+
+void LocalInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+ std::function<void(Status)>&& callback) {
+ Result<int32_t> read_size = Read(buffer, size, offset);
+ Status status = Status::OK();
+ if (!read_size.ok()) {
+ status = read_size.status();
+ } else {
+ assert(read_size.value() == static_cast<int32_t>(size));
+ }
+ callback(status);
+}
+
+Result<uint64_t> LocalInputStream::Length() const {
+ return file_->Length();
+}
+
+Status LocalInputStream::Close() {
+ return file_->Close();
+}
+
+// output stream
+Result<std::unique_ptr<LocalOutputStream>> LocalOutputStream::Create(
+ std::unique_ptr<LocalFile> file) {
+ PAIMON_RETURN_NOT_OK(file->OpenFile(/*is_read_file=*/false));
+ return std::unique_ptr<LocalOutputStream>(new
LocalOutputStream(std::move(file)));
+}
+
+LocalOutputStream::LocalOutputStream(std::unique_ptr<LocalFile>&& file) :
file_(std::move(file)) {}
+
+Result<int64_t> LocalOutputStream::GetPos() const {
+ return file_->Tell();
+}
+Result<int32_t> LocalOutputStream::Write(const char* buffer, uint32_t size) {
+ return file_->Write(buffer, size);
+}
+Status LocalOutputStream::Flush() {
+ return file_->Flush();
+}
+Status LocalOutputStream::Close() {
+ return file_->Close();
+}
+
+} // namespace paimon
diff --git a/src/paimon/fs/local/local_file_system.h
b/src/paimon/fs/local/local_file_system.h
new file mode 100644
index 0000000..3a4bc7a
--- /dev/null
+++ b/src/paimon/fs/local/local_file_system.h
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+// `FileSystem` for local file.
+class LocalFileSystem : public FileSystem {
+ public:
+ LocalFileSystem() = default;
+ ~LocalFileSystem() override = default;
+
+ Result<std::unique_ptr<InputStream>> Open(const std::string& path) const
override;
+ Result<std::unique_ptr<OutputStream>> Create(const std::string& path,
+ bool overwrite) const
override;
+
+ Status Mkdirs(const std::string& path) const override;
+ Status Rename(const std::string& src, const std::string& dst) const
override;
+ Status Delete(const std::string& path, bool recursive = true) const
override;
+ Result<std::unique_ptr<FileStatus>> GetFileStatus(const std::string& path)
const override;
+ Status ListDir(const std::string& directory,
+ std::vector<std::unique_ptr<BasicFileStatus>>*
file_status_list) const override;
+ Status ListFileStatus(
+ const std::string& path,
+ std::vector<std::unique_ptr<FileStatus>>* file_status_list) const
override;
+ Result<bool> Exists(const std::string& path) const override;
+
+ private:
+ // the lock to ensure atomic renaming
+ static const std::mutex RENAME_LOCK;
+
+ Status Delete(std::unique_ptr<LocalFile>&& file, bool recursive = true)
const;
+ std::string GetParentPath(const std::string& path) const;
+ Status MkdirsInternal(std::unique_ptr<LocalFile>&& file) const;
+};
+
+class LocalInputStream : public InputStream {
+ public:
+ static Result<std::unique_ptr<LocalInputStream>>
Create(std::unique_ptr<LocalFile> file);
+
+ Status Seek(int64_t offset, SeekOrigin origin) override;
+ Result<int64_t> GetPos() const override;
+ Result<int32_t> Read(char* buffer, uint32_t size) override;
+ Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset)
override;
+ void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+ std::function<void(Status)>&& callback) override;
+
+ Status Close() override;
+ Result<std::string> GetUri() const override {
+ return file_->GetPath();
+ }
+ Result<uint64_t> Length() const override;
+
+ private:
+ explicit LocalInputStream(std::unique_ptr<LocalFile>&& file);
+
+ std::unique_ptr<LocalFile> file_;
+};
+
+class LocalOutputStream : public OutputStream {
+ public:
+ static Result<std::unique_ptr<LocalOutputStream>>
Create(std::unique_ptr<LocalFile> file);
+
+ Result<int64_t> GetPos() const override;
+ Result<int32_t> Write(const char* buffer, uint32_t size) override;
+ Status Flush() override;
+ Status Close() override;
+ Result<std::string> GetUri() const override {
+ return file_->GetPath();
+ }
+
+ private:
+ explicit LocalOutputStream(std::unique_ptr<LocalFile>&& file);
+
+ std::unique_ptr<LocalFile> file_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/fs/local/local_file_system_factory.cpp
b/src/paimon/fs/local/local_file_system_factory.cpp
new file mode 100644
index 0000000..affac39
--- /dev/null
+++ b/src/paimon/fs/local/local_file_system_factory.cpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/local/local_file_system_factory.h"
+
+#include "paimon/factories/factory.h"
+
+namespace paimon {
+
+const char LocalFileSystemFactory::IDENTIFIER[] = "local";
+
+REGISTER_PAIMON_FACTORY(LocalFileSystemFactory);
+
+} // namespace paimon
diff --git a/src/paimon/fs/local/local_file_system_factory.h
b/src/paimon/fs/local/local_file_system_factory.h
new file mode 100644
index 0000000..1a3f763
--- /dev/null
+++ b/src/paimon/fs/local/local_file_system_factory.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class FileSystem;
+
+class LocalFileSystemFactory : public FileSystemFactory {
+ public:
+ static const char IDENTIFIER[];
+
+ const char* Identifier() const override {
+ return IDENTIFIER;
+ }
+
+ Result<std::unique_ptr<FileSystem>> Create(
+ const std::string& path, const std::map<std::string, std::string>&
options) const override {
+ return std::make_unique<LocalFileSystem>();
+ }
+};
+
+} // namespace paimon
diff --git a/src/paimon/fs/local/local_file_test.cpp
b/src/paimon/fs/local/local_file_test.cpp
new file mode 100644
index 0000000..4ad3b97
--- /dev/null
+++ b/src/paimon/fs/local/local_file_test.cpp
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/local/local_file.h"
+
+#include <cstring>
+#include <string>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(LocalFileTest, TestReadWriteEmptyContent) {
+ auto test_root_dir = UniqueTestDirectory::Create();
+ ASSERT_TRUE(test_root_dir);
+ std::string test_root = test_root_dir->Str();
+ ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root));
+ if (dir->Exists().ok()) {
+ ASSERT_TRUE(dir->Delete().ok());
+ }
+ ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+ ASSERT_TRUE(success);
+ std::string path = test_root + "/test.txt";
+ ASSERT_OK_AND_ASSIGN(auto file, LocalFile::Create(path));
+ if (file->Exists().ok()) {
+ ASSERT_TRUE(file->Delete().ok());
+ }
+ ASSERT_OK_AND_ASSIGN(bool is_exist, file->Exists());
+ ASSERT_FALSE(is_exist);
+
+ ASSERT_OK(file->OpenFile(/*is_read_file=*/false));
+
+ const char* str = "";
+ const int32_t str_size = 0;
+ ASSERT_OK_AND_ASSIGN(int32_t write_size, file->Write(str, str_size));
+ ASSERT_EQ(write_size, str_size);
+
+ ASSERT_OK(file->Flush());
+ ASSERT_TRUE(file->Exists().value());
+
+ ASSERT_OK(file->Close());
+
+ ASSERT_OK_AND_ASSIGN(auto file2, LocalFile::Create(path));
+ ASSERT_OK(file2->OpenFile(/*is_read_file=*/true));
+ char buffer[10];
+ ASSERT_OK_AND_ASSIGN(int32_t read_len, file2->Read(buffer, 10));
+ ASSERT_EQ(0, read_len);
+}
+
+TEST(LocalFileTest, TestSimple) {
+ auto test_root_dir = UniqueTestDirectory::Create();
+ ASSERT_TRUE(test_root_dir);
+ std::string test_root = test_root_dir->Str();
+ ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root));
+ if (dir->Exists().ok()) {
+ ASSERT_OK(dir->Delete());
+ }
+ ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+ ASSERT_TRUE(success);
+ std::string path = test_root + "/test.txt";
+ ASSERT_OK_AND_ASSIGN(auto file, LocalFile::Create(path));
+ if (file->Exists().ok()) {
+ ASSERT_OK(file->Delete());
+ }
+ ASSERT_OK_AND_ASSIGN(bool is_exist, file->Exists());
+ ASSERT_FALSE(is_exist);
+
+ ASSERT_OK(file->OpenFile(/*is_read_file=*/false));
+
+ const char* str = "test_data";
+ const int32_t str_size = 9;
+ ASSERT_OK_AND_ASSIGN(int32_t write_size, file->Write(str, str_size));
+ ASSERT_EQ(write_size, str_size);
+
+ ASSERT_OK(file->Flush());
+ ASSERT_OK(file->Close());
+
+ ASSERT_OK_AND_ASSIGN(bool is_file, file->IsFile());
+ ASSERT_TRUE(is_file);
+
+ ASSERT_OK_AND_ASSIGN(bool is_dir, file->IsDir());
+ ASSERT_FALSE(is_dir);
+
+ std::vector<std::string> file_list;
+ ASSERT_NOK(file->List(&file_list));
+
+ ASSERT_OK_AND_ASSIGN(size_t len, file->Length());
+ ASSERT_EQ(len, str_size);
+
+ ASSERT_OK_AND_ASSIGN(auto file2, LocalFile::Create(path));
+ ASSERT_OK(file2->Exists());
+
+ ASSERT_OK(file2->OpenFile(true));
+ char str_read[str_size + 1];
+ {
+ ASSERT_OK_AND_ASSIGN(int32_t read_size, file2->Read(str_read, 4));
+ ASSERT_EQ(read_size, 4);
+ str_read[read_size] = '\0';
+ ASSERT_EQ(strcmp(str_read, "test"), 0);
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(int64_t pos, file2->Tell());
+ ASSERT_EQ(pos, 4);
+ ASSERT_OK(file2->Seek(5, SEEK_SET));
+ ASSERT_OK_AND_ASSIGN(int32_t read_size, file2->Read(str_read, 4));
+ ASSERT_EQ(read_size, 4);
+ str_read[read_size] = '\0';
+ ASSERT_EQ(strcmp(str_read, "data"), 0);
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(int32_t read_size, file2->Read(str_read,
str_size, 0));
+ ASSERT_EQ(read_size, str_size);
+ str_read[read_size] = '\0';
+ ASSERT_EQ(strcmp(str_read, "test_data"), 0);
+ }
+
+ // dir already exists
+ ASSERT_OK_AND_ASSIGN(success, dir->Mkdir());
+ ASSERT_FALSE(success);
+
+ ASSERT_OK(file2->Delete());
+ ASSERT_FALSE(file2->Exists().value());
+}
+
+TEST(LocalFileTest, TestUsage) {
+ std::string test_root = "local_file_test_usage";
+ ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root));
+ ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+ ASSERT_TRUE(success);
+ std::vector<std::string> file_list;
+ ASSERT_OK(dir->List(&file_list));
+ std::string path_deep_dir = test_root + "/tmp2";
+ ASSERT_OK_AND_ASSIGN(auto deep_dir, LocalFile::Create(path_deep_dir));
+ ASSERT_OK_AND_ASSIGN(success, deep_dir->Mkdir());
+ ASSERT_TRUE(success);
+ std::unique_ptr<LocalFile> parent_deep_dir = deep_dir->GetParentFile();
+ ASSERT_EQ(parent_deep_dir->GetPath(), dir->GetPath());
+ ASSERT_OK(deep_dir->Delete());
+ ASSERT_OK(parent_deep_dir->Delete());
+ ASSERT_OK(dir->Delete());
+}
+
+TEST(LocalFileTest, TestOpenFile) {
+ auto test_root_dir = UniqueTestDirectory::Create();
+ ASSERT_TRUE(test_root_dir);
+ std::string test_root = test_root_dir->Str();
+ ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root));
+ if (dir->Exists().ok()) {
+ ASSERT_OK(dir->Delete());
+ }
+ ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+ ASSERT_TRUE(success);
+ std::string path = test_root + "/test.txt";
+ ASSERT_OK_AND_ASSIGN(auto file, LocalFile::Create(path));
+ if (file->Exists().ok()) {
+ ASSERT_OK(file->Delete());
+ }
+ ASSERT_OK_AND_ASSIGN(bool is_exist, file->Exists());
+ ASSERT_FALSE(is_exist);
+
+ ASSERT_NOK_WITH_MSG(file->OpenFile(/*is_read_file=*/true), "file not
exist");
+ ASSERT_NOK_WITH_MSG(dir->OpenFile(/*is_read_file=*/true), "cannot open a
directory");
+
+ std::string path3 = "test.txt";
+ ASSERT_OK_AND_ASSIGN(auto file3, LocalFile::Create(path3));
+ ASSERT_OK(file3->OpenFile(/*is_read_file=*/false));
+ ASSERT_OK_AND_ASSIGN(int64_t modify_time, file3->LastModifiedTimeMs());
+ ASSERT_GE(modify_time, -1);
+
+ ASSERT_OK_AND_ASSIGN(auto dir2, LocalFile::Create("/"));
+ ASSERT_OK_AND_ASSIGN(success, dir2->Mkdir());
+ ASSERT_FALSE(success);
+ ASSERT_OK_AND_ASSIGN(auto dir3, LocalFile::Create(test_root + "/"));
+ ASSERT_OK_AND_ASSIGN(success, dir3->Mkdir());
+ ASSERT_FALSE(success);
+}
+
+TEST(LocalFileTest, TestMkdir) {
+ auto test_root_dir = UniqueTestDirectory::Create();
+ ASSERT_TRUE(test_root_dir);
+ std::string test_root = test_root_dir->Str();
+ {
+ ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root +
"tmp/local/f/1"));
+ ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+ ASSERT_FALSE(success);
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root + "tmp1"));
+ ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+ ASSERT_TRUE(success);
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root +
"tmp1/f2/"));
+ ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+ ASSERT_TRUE(success);
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create("/"));
+ ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+ ASSERT_FALSE(success);
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(""));
+ ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+ ASSERT_FALSE(success);
+ }
+}
+
+} // namespace paimon::test