This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 92159c3  feat(fs): introduce local file system (#41)
92159c3 is described below

commit 92159c304929cada5d1d0d967a7f89e24b89636a
Author: Zhang Jiawei <[email protected]>
AuthorDate: Fri Jun 5 13:15:38 2026 +0800

    feat(fs): introduce local file system (#41)
    
    * feat(fs): introduce local file system
    
    * fix(fs): normalize local filesystem paths
---
 src/paimon/common/fs/external_path_provider.h      |  10 +-
 .../common/fs/external_path_provider_test.cpp      |  49 +++
 src/paimon/common/fs/file_system_test.cpp          |  83 ++++-
 src/paimon/common/table/special_fields.h           |   3 +-
 src/paimon/common/table/special_fields_test.cpp    |   1 +
 src/paimon/common/utils/path_util.cpp              |  16 +
 src/paimon/common/utils/path_util.h                |   1 +
 src/paimon/common/utils/path_util_test.cpp         |  22 ++
 src/paimon/fs/local/local_file.cpp                 | 372 +++++++++++++++++++++
 src/paimon/fs/local/local_file.h                   |  79 +++++
 src/paimon/fs/local/local_file_status.h            |  76 +++++
 src/paimon/fs/local/local_file_system.cpp          | 319 ++++++++++++++++++
 src/paimon/fs/local/local_file_system.h            | 107 ++++++
 src/paimon/fs/local/local_file_system_factory.cpp  |  29 ++
 src/paimon/fs/local/local_file_system_factory.h    |  47 +++
 src/paimon/fs/local/local_file_test.cpp            | 226 +++++++++++++
 16 files changed, 1430 insertions(+), 10 deletions(-)

diff --git a/src/paimon/common/fs/external_path_provider.h 
b/src/paimon/common/fs/external_path_provider.h
index 568d3ac..c70aa2f 100644
--- a/src/paimon/common/fs/external_path_provider.h
+++ b/src/paimon/common/fs/external_path_provider.h
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <atomic>
 #include <cstddef>
 #include <memory>
 #include <random>
@@ -48,12 +49,9 @@ class ExternalPathProvider {
     ///
     /// @return the next external data path
     std::string GetNextExternalDataPath(const std::string& file_name) {
-        position_++;
-        if (position_ == external_table_paths_.size()) {
-            position_ = 0;
-        }
+        size_t position = (++position_) % external_table_paths_.size();
         return PathUtil::JoinPath(
-            PathUtil::JoinPath(external_table_paths_[position_], 
relative_bucket_path_), file_name);
+            PathUtil::JoinPath(external_table_paths_[position], 
relative_bucket_path_), file_name);
     }
 
  private:
@@ -69,6 +67,6 @@ class ExternalPathProvider {
  private:
     std::vector<std::string> external_table_paths_;
     std::string relative_bucket_path_;
-    size_t position_;
+    std::atomic<size_t> position_;
 };
 }  // namespace paimon
diff --git a/src/paimon/common/fs/external_path_provider_test.cpp 
b/src/paimon/common/fs/external_path_provider_test.cpp
index 6893bfb..5ef37f2 100644
--- a/src/paimon/common/fs/external_path_provider_test.cpp
+++ b/src/paimon/common/fs/external_path_provider_test.cpp
@@ -19,7 +19,11 @@
 
 #include "paimon/common/fs/external_path_provider.h"
 
+#include <cstdint>
+#include <future>
+#include <mutex>
 #include <set>
+#include <vector>
 
 #include "gtest/gtest.h"
 #include "paimon/testing/utils/testharness.h"
@@ -65,4 +69,49 @@ TEST(ExternalPathProviderTest, TestGetNextExternalDataPath2) 
{
                                      
"/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc",
                                  }));
 }
+
+TEST(ExternalPathProviderTest, TestGetNextExternalDataPathConcurrently) {
+    std::vector<std::string> external_table_paths;
+    external_table_paths.emplace_back("/tmp/external_path_a/");
+    external_table_paths.emplace_back("/tmp/external_path_b/");
+    external_table_paths.emplace_back("/tmp/external_path_c/");
+    std::string relative_bucket_path = "p0=1/p1=0/bucket-0";
+
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<ExternalPathProvider> provider,
+                         ExternalPathProvider::Create(external_table_paths, 
relative_bucket_path));
+
+    const std::set<std::string> expected_data_paths = {
+        "/tmp/external_path_a/p0=1/p1=0/bucket-0/file.orc",
+        "/tmp/external_path_b/p0=1/p1=0/bucket-0/file.orc",
+        "/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc",
+    };
+    std::mutex mutex;
+    std::vector<std::string> result_data_paths;
+    constexpr int32_t kThreadCount = 8;
+    constexpr int32_t kPathCountPerThread = 1000;
+
+    std::vector<std::future<void>> futures;
+    futures.reserve(kThreadCount);
+    for (int32_t i = 0; i < kThreadCount; ++i) {
+        futures.emplace_back(std::async(std::launch::async, [&]() {
+            std::vector<std::string> local_paths;
+            local_paths.reserve(kPathCountPerThread);
+            for (int32_t j = 0; j < kPathCountPerThread; ++j) {
+                
local_paths.push_back(provider->GetNextExternalDataPath("file.orc"));
+            }
+
+            std::lock_guard<std::mutex> lock(mutex);
+            result_data_paths.insert(result_data_paths.end(), 
local_paths.begin(),
+                                     local_paths.end());
+        }));
+    }
+    for (auto& future : futures) {
+        future.get();
+    }
+
+    ASSERT_EQ(result_data_paths.size(), kThreadCount * kPathCountPerThread);
+    for (const auto& data_path : result_data_paths) {
+        ASSERT_TRUE(expected_data_paths.count(data_path)) << data_path;
+    }
+}
 }  // namespace paimon::test
diff --git a/src/paimon/common/fs/file_system_test.cpp 
b/src/paimon/common/fs/file_system_test.cpp
index 815b983..b7188c4 100644
--- a/src/paimon/common/fs/file_system_test.cpp
+++ b/src/paimon/common/fs/file_system_test.cpp
@@ -162,7 +162,12 @@ class FileSystemTest : public ::testing::Test, public 
::testing::WithParamInterf
     std::string GetTestDir() const {
         std::string file_system = GetParam();
         if (file_system == "local") {
-            return paimon::test::GetDataDir();
+            std::string data_dir = paimon::test::GetDataDir();
+            if (data_dir.empty() || data_dir[0] != '/') {
+                EXPECT_OK_AND_ASSIGN(std::string current_path, 
PathUtil::GetWorkingDirectory());
+                data_dir = PathUtil::JoinPath(current_path, data_dir);
+            }
+            return data_dir;
         } else if (file_system == "jindo") {
             return "oss://paimon-unittest/test_data/";
         }
@@ -212,6 +217,28 @@ TEST_P(FileSystemTest, TestCreate) {
     ASSERT_NOK_WITH_MSG(fs_->Create(path, /*overwrite=*/false), "already 
exists");
 }
 
+TEST_P(FileSystemTest, TestCreateRelativeFileInCurrentDirectory) {
+    if (GetParam() != "local") {
+        GTEST_SKIP() << "this test is only tested for the local file system";
+    }
+
+    std::string path = "relative_file_" + RandomName();
+    ASSERT_OK_AND_ASSIGN(auto out, fs_->Create(path, /*overwrite=*/true));
+    std::string content = "content";
+    ASSERT_OK_AND_ASSIGN(int32_t write_len, out->Write(content.data(), 
content.size()));
+    ASSERT_EQ(write_len, content.size());
+    ASSERT_OK_AND_ASSIGN(std::string uri, out->GetUri());
+    ASSERT_FALSE(uri.empty());
+    ASSERT_EQ(uri[0], '/');
+    ASSERT_EQ(PathUtil::GetName(uri), path);
+    ASSERT_OK(out->Close());
+
+    std::string read_content;
+    ASSERT_OK(fs_->ReadFile(path, &read_content));
+    ASSERT_EQ(read_content, content);
+    ASSERT_OK(fs_->Delete(path));
+}
+
 // --- write&read
 TEST_P(FileSystemTest, TestSimpleWriteAndRead) {
     std::string content = "abcdefghijk";
@@ -651,6 +678,27 @@ TEST_P(FileSystemTest, TestRename) {
         "src file is not a dir");
 }
 
+TEST_P(FileSystemTest, TestRenameWithFileSchemeUsesNormalizedPath) {
+    if (GetParam() != "local") {
+        GTEST_SKIP() << "this test is only tested for the local file system";
+    }
+
+    const std::string src = "file:" + test_root_ + "/scheme_src.txt";
+    const std::string dst = "file:" + test_root_ + "/scheme_dst.txt";
+
+    ASSERT_OK(fs_->WriteFile(src, "content", /*overwrite=*/false));
+    ASSERT_OK(fs_->Rename(src, dst));
+
+    ASSERT_OK_AND_ASSIGN(bool src_exists, fs_->Exists(src));
+    ASSERT_FALSE(src_exists);
+    ASSERT_OK_AND_ASSIGN(bool dst_exists, fs_->Exists(dst));
+    ASSERT_TRUE(dst_exists);
+
+    std::string content;
+    ASSERT_OK(fs_->ReadFile(dst, &content));
+    ASSERT_EQ(content, "content");
+}
+
 TEST_P(FileSystemTest, TestRename2) {
     {
         // test rename dir
@@ -784,6 +832,19 @@ TEST_P(FileSystemTest, TestExists) {
     ASSERT_TRUE(is_exist);
 }
 
+TEST_P(FileSystemTest, TestExistsInLocalFileSystem) {
+    if (GetParam() != "local") {
+        GTEST_SKIP() << "this test is only tested for the local file system";
+    }
+
+    ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists("/"));
+    ASSERT_TRUE(is_exist);
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(""));
+    ASSERT_TRUE(is_exist);
+    ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists("."));
+    ASSERT_TRUE(is_exist);
+}
+
 // --- delete
 TEST_P(FileSystemTest, TestExistingFileDeletion) {
     auto check = [&](bool recursive) {
@@ -988,8 +1049,16 @@ TEST_P(FileSystemTest, TestMkdir) {
     ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp/local/f/1"));
     ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1"));
     ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1/f2/"));
+}
+
+TEST_P(FileSystemTest, TestMkdirInLocalFileSystem) {
+    if (GetParam() != "local") {
+        GTEST_SKIP() << "this test is only tested for the local file system";
+    }
+
     ASSERT_OK(fs_->Mkdirs("/"));
-    ASSERT_NOK_WITH_MSG(fs_->Mkdirs(""), "path is an empty string.");
+    ASSERT_OK(fs_->Mkdirs(""));
+    ASSERT_OK(fs_->Mkdirs("."));
 }
 
 TEST_P(FileSystemTest, TestMkdir2) {
@@ -1402,6 +1471,14 @@ TEST_P(FileSystemTest, TestAtomicStoreAlreadyExist) {
     ASSERT_TRUE(is_exist);
 }
 
-INSTANTIATE_TEST_SUITE_P(UseLocal, FileSystemTest, ::testing::Values("local" 
/*, "jindo"*/));
+std::vector<std::string> GetTestValuesForFileSystemTest() {
+    std::vector<std::string> values;
+    values.emplace_back("local");
+    // values.emplace_back("jindo");
+    return values;
+}
+
+INSTANTIATE_TEST_SUITE_P(FsType, FileSystemTest,
+                         
::testing::ValuesIn(GetTestValuesForFileSystemTest()));
 
 }  // namespace paimon::test
diff --git a/src/paimon/common/table/special_fields.h 
b/src/paimon/common/table/special_fields.h
index 384150e..b431964 100644
--- a/src/paimon/common/table/special_fields.h
+++ b/src/paimon/common/table/special_fields.h
@@ -67,7 +67,8 @@ struct SpecialFields {
 
     static bool IsSpecialFieldName(const std::string& field_name) {
         if (field_name == SequenceNumber().Name() || field_name == 
ValueKind().Name() ||
-            field_name == RowId().Name() || field_name == IndexScore().Name()) 
{
+            field_name == RowKind().Name() || field_name == RowId().Name() ||
+            field_name == IndexScore().Name()) {
             return true;
         }
         return false;
diff --git a/src/paimon/common/table/special_fields_test.cpp 
b/src/paimon/common/table/special_fields_test.cpp
index 1808fad..0c17918 100644
--- a/src/paimon/common/table/special_fields_test.cpp
+++ b/src/paimon/common/table/special_fields_test.cpp
@@ -63,6 +63,7 @@ TEST(SpecialFieldsTest, TestIsSpecialFieldName) {
     ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_SEQUENCE_NUMBER"));
     ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_VALUE_KIND"));
     ASSERT_FALSE(SpecialFields::IsSpecialFieldName("VALUE_KIND"));
+    ASSERT_TRUE(SpecialFields::IsSpecialFieldName("rowkind"));
     ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_ROW_ID"));
     ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_INDEX_SCORE"));
 }
diff --git a/src/paimon/common/utils/path_util.cpp 
b/src/paimon/common/utils/path_util.cpp
index a977ab8..143049e 100644
--- a/src/paimon/common/utils/path_util.cpp
+++ b/src/paimon/common/utils/path_util.cpp
@@ -19,8 +19,13 @@
 
 #include "paimon/common/utils/path_util.h"
 
+#include <unistd.h>
+
+#include <cerrno>
 #include <cstddef>
 #include <cstdint>
+#include <cstdlib>
+#include <cstring>
 #include <utility>
 
 #include "fmt/format.h"
@@ -145,6 +150,17 @@ void PathUtil::TrimLastDelim(std::string* dir_path) 
noexcept {
     }
 }
 
+Result<std::string> PathUtil::GetWorkingDirectory() noexcept {
+    char* path = getcwd(nullptr, 0);
+    if (path != nullptr) {
+        std::string ret(path);
+        free(path);
+        return ret;
+    }
+    return Status::IOError(
+        fmt::format("get working directory failed, ec: {}", 
std::strerror(errno)));
+}
+
 Result<std::string> PathUtil::CreateTempPath(const std::string& path) noexcept 
{
     std::string uuid;
     if (!UUID::Generate(&uuid)) {
diff --git a/src/paimon/common/utils/path_util.h 
b/src/paimon/common/utils/path_util.h
index b893e76..9aaff11 100644
--- a/src/paimon/common/utils/path_util.h
+++ b/src/paimon/common/utils/path_util.h
@@ -47,6 +47,7 @@ class PAIMON_EXPORT PathUtil {
     static std::string GetParentDirPath(const std::string& path) noexcept;
     static std::string GetName(const std::string& path) noexcept;
     static void TrimLastDelim(std::string* dir_path) noexcept;
+    static Result<std::string> GetWorkingDirectory() noexcept;
     static Result<std::string> CreateTempPath(const std::string& path) 
noexcept;
     static Result<Path> ToPath(const std::string& path) noexcept;
     static Result<std::string> NormalizePath(const std::string& path) noexcept;
diff --git a/src/paimon/common/utils/path_util_test.cpp 
b/src/paimon/common/utils/path_util_test.cpp
index 8e7ed4c..9ab278e 100644
--- a/src/paimon/common/utils/path_util_test.cpp
+++ b/src/paimon/common/utils/path_util_test.cpp
@@ -94,6 +94,12 @@ TEST(PathUtilsTest, TestTrimLastDelim) {
     }
 }
 
+TEST(PathUtilsTest, TestGetWorkingDirectory) {
+    ASSERT_OK_AND_ASSIGN(std::string current_path, 
PathUtil::GetWorkingDirectory());
+    ASSERT_FALSE(current_path.empty());
+    ASSERT_EQ(current_path[0], '/');
+}
+
 TEST(PathUtilsTest, TestToPath) {
     {
         std::string test_path = "";
@@ -131,6 +137,22 @@ TEST(PathUtilsTest, TestToPath) {
         ASSERT_EQ(path.path, "/tmp/index");
         ASSERT_EQ(path.ToString(), "/tmp/index");
     }
+    {
+        std::string test_path = ".";
+        ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path));
+        ASSERT_EQ(path.scheme, "");
+        ASSERT_EQ(path.authority, "");
+        ASSERT_EQ(path.path, ".");
+        ASSERT_EQ(path.ToString(), ".");
+    }
+    {
+        std::string test_path = "relative/path";
+        ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path));
+        ASSERT_EQ(path.scheme, "");
+        ASSERT_EQ(path.authority, "");
+        ASSERT_EQ(path.path, "relative/path");
+        ASSERT_EQ(path.ToString(), "relative/path");
+    }
 }
 
 TEST(PathUtilsTest, TestGetName) {
diff --git a/src/paimon/fs/local/local_file.cpp 
b/src/paimon/fs/local/local_file.cpp
new file mode 100644
index 0000000..0291c59
--- /dev/null
+++ b/src/paimon/fs/local/local_file.cpp
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/local/local_file.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include <cerrno>
+#include <cstring>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/factories/io_hook.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/fs/local/local_file_status.h"
+
+namespace paimon {
+
+// TODO(yonghao.fyh): move io_hook.h to test/test_util and add a 
HookLocalFileSystem only for test
+#define CHECK_HOOK()                             \
+    if (hook_) {                                 \
+        PAIMON_RETURN_NOT_OK(hook_->Try(path_)); \
+    }
+
+Result<std::unique_ptr<LocalFile>> LocalFile::Create(const std::string& 
path_string) {
+    if (path_string.empty()) {
+        PAIMON_ASSIGN_OR_RAISE(std::string current_path, 
PathUtil::GetWorkingDirectory());
+        return std::unique_ptr<LocalFile>(new LocalFile(current_path));
+    }
+
+    // local file system does not support path_string with scheme, e.g., 
"file:/tmp" will be
+    // rewritten to "/tmp"
+    PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_string));
+    if (!path.scheme.empty() && StringUtils::ToLowerCase(path.scheme) != 
"file") {
+        return Status::Invalid(fmt::format("invalid scheme {} for local file 
system", path.scheme));
+    }
+    if (path.path.empty() || path.path[0] != '/') {
+        PAIMON_ASSIGN_OR_RAISE(std::string current_path, 
PathUtil::GetWorkingDirectory());
+        return std::unique_ptr<LocalFile>(
+            new LocalFile(PathUtil::JoinPath(current_path, path.path)));
+    }
+    return std::unique_ptr<LocalFile>(new LocalFile(path.path));
+}
+
+LocalFile::LocalFile(const std::string& path) : path_(path), 
hook_(IOHook::GetInstance()) {}
+
+LocalFile::~LocalFile() {
+    if (file_) {
+        std::fclose(file_);
+    }
+}
+
+Result<bool> LocalFile::Exists() const {
+    CHECK_HOOK();
+    if (access(path_.c_str(), F_OK) == 0) {
+        return true;
+    } else if (errno == ENOENT) {
+        return false;
+    }
+    int32_t cur_errno = errno;
+    return Status::IOError(
+        fmt::format("path '{}' check exists fail, ec: {}", path_, 
std::strerror(cur_errno)));
+}
+
+Result<bool> LocalFile::IsFile() const {
+    CHECK_HOOK();
+    bool is_file = false;
+    struct stat buf;
+    if (stat(path_.c_str(), &buf) < 0) {
+        return Status::IOError(
+            fmt::format("path '{}' check isFile fail, ec: {}", path_, 
std::strerror(errno)));
+    }
+    if (S_ISREG(buf.st_mode)) {
+        is_file = true;
+    }
+    return is_file;
+}
+
+Result<bool> LocalFile::IsDir() const {
+    CHECK_HOOK();
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFileStatus> file_status, 
GetFileStatus());
+    return file_status->IsDir();
+}
+
+Status LocalFile::List(std::vector<std::string>* file_list) const {
+    CHECK_HOOK();
+    file_list->clear();
+    DIR* dp;
+    struct dirent* ep;
+    dp = opendir(path_.c_str());
+    if (dp == nullptr) {
+        int32_t cur_errno = errno;
+        return Status::IOError(
+            fmt::format("list path '{}' fail, ec: {}", path_, 
std::strerror(cur_errno)));
+    }
+
+    while ((ep = readdir(dp)) != nullptr) {
+        if (strcmp(ep->d_name, ".") == 0 || strcmp(ep->d_name, "..") == 0) {
+            continue;
+        }
+        file_list->push_back(ep->d_name);
+    }
+    if (closedir(dp) < 0) {
+        file_list->clear();
+        int32_t cur_errno = errno;
+        return Status::IOError(
+            fmt::format("list path '{}' fail, ec: {}", path_, 
std::strerror(cur_errno)));
+    }
+    return Status::OK();
+}
+
+Status LocalFile::ListFiles(std::vector<std::unique_ptr<LocalFile>>* 
file_list) const {
+    CHECK_HOOK();
+    file_list->clear();
+    std::vector<std::string> file_names;
+    PAIMON_RETURN_NOT_OK(List(&file_names));
+    for (const auto& file_name : file_names) {
+        PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file,
+                               LocalFile::Create(PathUtil::JoinPath(path_, 
file_name)));
+        file_list->push_back(std::move(file));
+    }
+    return Status::OK();
+}
+
+Status LocalFile::Delete() const {
+    CHECK_HOOK();
+    PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists());
+    if (is_exist) {
+        if (::remove(path_.c_str()) != 0) {
+            if (errno != ENOENT) {
+                return Status::IOError(
+                    fmt::format("delete path '{}' fail, ec: {}", path_, 
std::strerror(errno)));
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Result<bool> LocalFile::Mkdir() const {
+    CHECK_HOOK();
+    return mkdir(path_.c_str(), 0755) == 0;
+}
+
+Result<std::unique_ptr<LocalFileStatus>> LocalFile::GetFileStatus() const {
+    CHECK_HOOK();
+    struct stat buf;
+    if (stat(path_.c_str(), &buf) < 0) {
+        int32_t cur_errno = errno;
+        return Status::IOError(
+            fmt::format("get file '{}' status failed, ec: {}", path_, 
std::strerror(cur_errno)));
+    }
+    return std::make_unique<LocalFileStatus>(path_, buf.st_size, buf.st_mtime 
* 1000,
+                                             S_ISDIR(buf.st_mode));
+}
+
+Result<uint64_t> LocalFile::Length() const {
+    CHECK_HOOK();
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFileStatus> file_status, 
GetFileStatus());
+    return file_status->GetLen();
+}
+
+Result<int64_t> LocalFile::LastModifiedTimeMs() const {
+    CHECK_HOOK();
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFileStatus> file_status, 
GetFileStatus());
+    return file_status->GetModificationTime();
+}
+
+std::unique_ptr<LocalFile> LocalFile::GetParentFile() const {
+    size_t pos = path_.rfind('/');
+    if (pos == std::string::npos) {
+        return std::unique_ptr<LocalFile>(new LocalFile(""));
+    } else {
+        std::string parent_dir = path_.substr(0, pos);
+        return std::unique_ptr<LocalFile>(new LocalFile(parent_dir));
+    }
+}
+
+const std::string& LocalFile::GetPath() const {
+    return path_;
+}
+
+Result<int32_t> LocalFile::Read(char* buffer, uint32_t length, uint64_t 
offset) {
+    if (file_) {
+        CHECK_HOOK();
+        int32_t fd = fileno(file_);
+        auto more = static_cast<int32_t>(length);
+        if (more < 0) {
+            return Status::IOError(fmt::format(
+                "pread file '{}' fail, length overflow int32_t, ec: 
EC_BADARGS", path_));
+        }
+
+        uint64_t off = 0;
+        int32_t ret = 0;
+        while (more > 0) {
+            ret = ::pread(fd, buffer + off, more, offset + off);
+            if (ret == -1) {
+                return Status::IOError(fmt::format("pread file '{}' fail at 
off {}, ec: {}", path_,
+                                                   off, std::strerror(errno)));
+            }
+            if (ret == 0) {
+                break;
+            }
+            more -= ret;
+            off += ret;
+        }
+        return off;
+    }
+    return Status::IOError(fmt::format(
+        "read file '{}' fail, can not read file which is opened fail, ec: 
EBADF", path_));
+}
+
+Result<int32_t> LocalFile::Read(char* buffer, uint32_t length) {
+    if (file_) {
+        CHECK_HOOK();
+        auto more = static_cast<int32_t>(length);
+        if (more < 0) {
+            return Status::IOError(
+                fmt::format("fileName '{}', length '{}', ec: EC_BADARGS", 
path_, length));
+        }
+
+        int32_t ret = 0;
+        uint64_t off = 0;
+        while (more > 0) {
+            ret = fread(buffer + off, 1, more, file_);
+            if (ferror(file_) != 0) {
+                return Status::IOError(fmt::format("read file '{}' fail at off 
{}, ec: {}", path_,
+                                                   off, std::strerror(errno)));
+            }
+            more -= ret;
+            off += ret;
+            if (feof(file_)) {
+                break;
+            }
+        }
+        return off;
+    }
+
+    return Status::IOError(fmt::format(
+        "read file '{}' fail, can not read file which is opened fail, ec: 
EBADF", path_));
+}
+
+Result<int32_t> LocalFile::Write(const char* buffer, uint32_t length) {
+    if (file_) {
+        CHECK_HOOK();
+        auto more = static_cast<int32_t>(length);
+        if (more < 0) {
+            return Status::IOError(fmt::format(
+                "write file '{}' fail, length overflow int32_t, ec: 
EC_BADARGS", path_));
+        }
+
+        int32_t ret = 0;
+        uint64_t off = 0;
+        while (more > 0) {
+            ret = fwrite(buffer + off, 1, more, file_);
+            if (ferror(file_) != 0) {
+                return Status::IOError(fmt::format("write file '{}' fail at 
off {}, ec: {}", path_,
+                                                   off, std::strerror(errno)));
+            }
+            more -= ret;
+            off += ret;
+        }
+        return off;
+    }
+
+    return Status::IOError(
+        fmt::format("write file '{}' fail, can not write file which not 
opened, ec: EBADF", path_));
+}
+
+Status LocalFile::Flush() {
+    if (file_) {
+        CHECK_HOOK();
+        int32_t ret = fflush(file_);
+        if (0 == ret) {
+            CHECK_HOOK();
+            int32_t fd = fileno(file_);
+            ret |= fsync(fd);
+        }
+        if (0 != ret) {
+            return Status::IOError(
+                fmt::format("flush '{}' fail, ec: {}", path_, 
std::strerror(errno)));
+        }
+        return Status::OK();
+    }
+    return Status::IOError(
+        fmt::format("flush '{}' fail, can not flush file which not opened, ec: 
EBADF", path_));
+}
+
+Status LocalFile::OpenFile(bool is_read_file) {
+    if (is_read_file) {
+        PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists());
+        if (!is_exist) {
+            return Status::IOError(
+                fmt::format("direct openFile '{}' fail, file not exist, ec: 
ENOENT", path_));
+        }
+        PAIMON_ASSIGN_OR_RAISE(bool is_dir, IsDir());
+        if (is_dir) {
+            return Status::IOError(fmt::format(
+                "direct openFile '{}' fail, cannot open a directory, ec: 
EISDIR", path_));
+        }
+        CHECK_HOOK();
+        file_ = fopen(path_.c_str(), "r");
+    } else {
+        CHECK_HOOK();
+        file_ = fopen(path_.c_str(), "w");
+    }
+    if (file_ == nullptr) {
+        return Status::IOError(fmt::format("open '{}' fail, ec: {}", path_, 
std::strerror(errno)));
+    }
+    return Status::OK();
+}
+
+Status LocalFile::Close() {
+    if (file_) {
+        CHECK_HOOK();
+        if (fclose(file_) != 0) {
+            file_ = nullptr;
+            return Status::IOError(
+                fmt::format("close '{}' fail, ec: {}", path_, 
std::strerror(errno)));
+        }
+        file_ = nullptr;
+    }
+    return Status::OK();
+}
+
+Status LocalFile::Seek(int64_t offset, int32_t seek_origin) {
+    if (file_) {
+        CHECK_HOOK();
+        int32_t ret = 0;
+        ret = fseek(file_, offset, seek_origin);
+        if (ret != 0) {
+            return Status::IOError(
+                fmt::format("seek '{}' fail, ec: {}", path_, 
std::strerror(errno)));
+        }
+        return Status::OK();
+    }
+    return Status::IOError(
+        fmt::format("seek '{}' fail, can not read file which not opened, ec: 
EBADF", path_));
+}
+
+Result<int64_t> LocalFile::Tell() const {
+    if (file_) {
+        CHECK_HOOK();
+        int64_t ret = ftell(file_);
+        if (ret < 0) {
+            return Status::IOError(
+                fmt::format("tell '{}' fail, ec: {}", path_, 
std::strerror(errno)));
+        }
+        return ret;
+    }
+    return Status::IOError(
+        fmt::format("tell '{}' fail, can not read file which not opened, ec: 
EBADF", path_));
+}
+
+}  // namespace paimon
diff --git a/src/paimon/fs/local/local_file.h b/src/paimon/fs/local/local_file.h
new file mode 100644
index 0000000..655c414
--- /dev/null
+++ b/src/paimon/fs/local/local_file.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <dirent.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/macros.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class IOHook;
+class LocalFileStatus;
+
+class LocalFile {
+ public:
+    static Result<std::unique_ptr<LocalFile>> Create(const std::string& 
path_string);
+    PAIMON_DISALLOW_COPY_AND_ASSIGN(LocalFile);
+    ~LocalFile();
+
+    Result<bool> Exists() const;
+    Result<bool> IsFile() const;
+    Result<bool> IsDir() const;
+    Status List(std::vector<std::string>* file_list) const;
+    Status ListFiles(std::vector<std::unique_ptr<LocalFile>>* file_list) const;
+    Status Delete() const;
+    const std::string& GetPath() const;
+    std::unique_ptr<LocalFile> GetParentFile() const;
+    Result<bool> Mkdir() const;
+    Result<std::unique_ptr<LocalFileStatus>> GetFileStatus() const;
+    Result<uint64_t> Length() const;
+    Result<int64_t> LastModifiedTimeMs() const;
+    Status OpenFile(bool is_read_file);
+    Result<int32_t> Read(char* buffer, uint32_t length);
+    Result<int32_t> Read(char* buffer, uint32_t length, uint64_t offset);
+    Result<int32_t> Write(const char* buffer, uint32_t length);
+    Status Flush();
+    Status Close();
+    Status Seek(int64_t offset, int32_t seek_origin);
+    Result<int64_t> Tell() const;
+
+    bool IsEmpty() const {
+        return path_.empty();
+    }
+
+ private:
+    explicit LocalFile(const std::string& path);
+
+    const std::string path_;
+    FILE* file_ = nullptr;
+    IOHook* hook_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/fs/local/local_file_status.h 
b/src/paimon/fs/local/local_file_status.h
new file mode 100644
index 0000000..eb1af1f
--- /dev/null
+++ b/src/paimon/fs/local/local_file_status.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "paimon/fs/file_system.h"
+
+namespace paimon {
+
+class LocalBasicFileStatus : public BasicFileStatus {
+ public:
+    LocalBasicFileStatus(const std::string& path, bool is_dir) : path_(path), 
is_dir_(is_dir) {}
+
+    std::string GetPath() const override {
+        return path_;
+    }
+
+    bool IsDir() const override {
+        return is_dir_;
+    }
+
+ private:
+    std::string path_;
+    bool is_dir_;
+};
+
+class LocalFileStatus : public FileStatus {
+ public:
+    LocalFileStatus(const std::string& path, uint64_t length, int64_t 
last_modification_time,
+                    bool is_dir)
+        : path_(path),
+          length_(length),
+          last_modification_time_(last_modification_time),
+          is_dir_(is_dir) {}
+
+    std::string GetPath() const override {
+        return path_;
+    }
+
+    uint64_t GetLen() const override {
+        return length_;
+    }
+
+    int64_t GetModificationTime() const override {
+        return last_modification_time_;
+    }
+
+    bool IsDir() const override {
+        return is_dir_;
+    }
+
+ private:
+    std::string path_;
+    uint64_t length_;
+    int64_t last_modification_time_;
+    bool is_dir_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/fs/local/local_file_system.cpp 
b/src/paimon/fs/local/local_file_system.cpp
new file mode 100644
index 0000000..258a39f
--- /dev/null
+++ b/src/paimon/fs/local/local_file_system.cpp
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/local/local_file_system.h"
+
+#include <fcntl.h>
+
+#include <cassert>
+#include <cerrno>
+#include <cstdio>
+#include <cstring>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/fs/local/local_file_status.h"
+
+namespace paimon {
+
+Result<bool> LocalFileSystem::Exists(const std::string& path) const {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file, 
LocalFile::Create(path));
+    return file->Exists();
+}
+
+Result<std::unique_ptr<InputStream>> LocalFileSystem::Open(const std::string& 
path) const {
+    PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists(path));
+    if (!is_exist) {
+        return Status::NotExist(fmt::format("File '{}' not exists", path));
+    }
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file, 
LocalFile::Create(path));
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalInputStream> in,
+                           LocalInputStream::Create(std::move(file)));
+    return in;
+}
+
+Result<std::unique_ptr<OutputStream>> LocalFileSystem::Create(const 
std::string& path,
+                                                              bool overwrite) 
const {
+    PAIMON_ASSIGN_OR_RAISE(bool is_exist, Exists(path));
+    if (is_exist && !overwrite) {
+        return Status::Invalid(
+            fmt::format("do not allow overwrite, but the file {} already 
exists", path));
+    }
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file, 
LocalFile::Create(path));
+    std::unique_ptr<LocalFile> parent = file->GetParentFile();
+    PAIMON_RETURN_NOT_OK(Mkdirs(parent->GetPath()));
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalOutputStream> out,
+                           LocalOutputStream::Create(std::move(file)));
+    return out;
+}
+
+Status LocalFileSystem::Mkdirs(const std::string& path) const {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file, 
LocalFile::Create(path));
+    return MkdirsInternal(std::move(file));
+}
+
+Status LocalFileSystem::MkdirsInternal(std::unique_ptr<LocalFile>&& file) 
const {
+    // Important: The 'Exists()' check above must come before the 'IsDir()'
+    // check to be safe when multiple parallel instances try to create the 
directory
+    PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists());
+    if (is_exist) {
+        PAIMON_ASSIGN_OR_RAISE(bool is_dir, file->IsDir());
+        if (is_dir) {
+            return Status::OK();
+        } else {
+            // exists and is not a directory -> is a regular file
+            return Status::IOError(
+                fmt::format("file {} already exists and is not a directory", 
file->GetPath()));
+        }
+    }
+
+    std::unique_ptr<LocalFile> parent = file->GetParentFile();
+    if (!parent->IsEmpty()) {
+        PAIMON_RETURN_NOT_OK(MkdirsInternal(std::move(parent)));
+    }
+    PAIMON_ASSIGN_OR_RAISE(bool success, file->Mkdir());
+    if (!success) {
+        PAIMON_ASSIGN_OR_RAISE(bool is_dir, file->IsDir());
+        if (is_dir) {
+            return Status::OK();
+        } else {
+            return Status::IOError(fmt::format("create directory '{}' failed", 
file->GetPath()));
+        }
+    }
+    return Status::OK();
+}
+
+Result<std::unique_ptr<FileStatus>> LocalFileSystem::GetFileStatus(const 
std::string& path) const {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file, 
LocalFile::Create(path));
+    PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists());
+    if (is_exist) {
+        return file->GetFileStatus();
+    } else {
+        return Status::NotExist(
+            fmt::format("File {} does not exist or the user running "
+                        "Paimon has insufficient permissions to access it.",
+                        file->GetPath()));
+    }
+}
+
+Status LocalFileSystem::ListDir(
+    const std::string& directory,
+    std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) const {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file, 
LocalFile::Create(directory));
+    PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists());
+    if (!is_exist) {
+        return Status::OK();
+    }
+    PAIMON_ASSIGN_OR_RAISE(bool is_file, file->IsFile());
+    if (is_file) {
+        return Status::IOError(
+            fmt::format("file {} already exists and is not a directory", 
file->GetPath()));
+    } else {
+        std::vector<std::string> file_list;
+        PAIMON_RETURN_NOT_OK(file->List(&file_list));
+        file_status_list->reserve(file_status_list->size() + file_list.size());
+        for (const auto& f : file_list) {
+            Result<std::unique_ptr<FileStatus>> file_status =
+                GetFileStatus(PathUtil::JoinPath(directory, f));
+            if (!file_status.ok() && !file_status.status().IsNotExist()) {
+                return file_status.status();
+            } else if (file_status.ok()) {
+                
file_status_list->emplace_back(std::make_unique<LocalBasicFileStatus>(
+                    file_status.value()->GetPath(), 
file_status.value()->IsDir()));
+            }
+        }
+        return Status::OK();
+    }
+}
+
+Status LocalFileSystem::ListFileStatus(
+    const std::string& path, std::vector<std::unique_ptr<FileStatus>>* 
file_status_list) const {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file, 
LocalFile::Create(path));
+    PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists());
+    if (!is_exist) {
+        return Status::OK();
+    }
+    PAIMON_ASSIGN_OR_RAISE(bool is_file, file->IsFile());
+    if (is_file) {
+        PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStatus> file_status, 
file->GetFileStatus());
+        file_status_list->emplace_back(std::move(file_status));
+    } else {
+        std::vector<std::string> file_list;
+        PAIMON_RETURN_NOT_OK(file->List(&file_list));
+        file_status_list->reserve(file_status_list->size() + file_list.size());
+        for (const auto& f : file_list) {
+            Result<std::unique_ptr<FileStatus>> file_status =
+                GetFileStatus(PathUtil::JoinPath(path, f));
+            if (!file_status.ok() && !file_status.status().IsNotExist()) {
+                return file_status.status();
+            } else if (file_status.ok()) {
+                file_status_list->emplace_back(std::move(file_status).value());
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status LocalFileSystem::Delete(const std::string& path, bool recursive) const {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> file, 
LocalFile::Create(path));
+    PAIMON_ASSIGN_OR_RAISE(bool is_file, file->IsFile());
+    if (is_file) {
+        return file->Delete();
+    }
+    return Delete(std::move(file), recursive);
+}
+
+Status LocalFileSystem::Delete(std::unique_ptr<LocalFile>&& file, bool 
recursive) const {
+    PAIMON_ASSIGN_OR_RAISE(bool is_dir, file->IsDir());
+    if (is_dir) {
+        std::vector<std::unique_ptr<LocalFile>> files;
+        PAIMON_RETURN_NOT_OK(file->ListFiles(&files));
+        if (recursive == false && !files.empty()) {
+            return Status::IOError(
+                fmt::format("cannot delete {}, directory is not empty", 
file->GetPath()));
+        }
+        for (auto& child : files) {
+            PAIMON_RETURN_NOT_OK(Delete(std::move(child)));
+        }
+    }
+    // Now directory is empty
+    return file->Delete();
+}
+
+Status LocalFileSystem::Rename(const std::string& src, const std::string& dst) 
const {
+    std::string err_msg = fmt::format("rename '{}' to '{}' failed, because: ", 
src, dst);
+    PAIMON_ASSIGN_OR_RAISE(bool is_src_exist, Exists(src));
+    if (!is_src_exist) {
+        return Status::NotExist(err_msg, "src file not exist");
+    }
+    PAIMON_ASSIGN_OR_RAISE(bool is_dst_exist, Exists(dst));
+    if (is_dst_exist) {
+        return Status::Invalid(err_msg, "dst file already exist");
+    }
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> src_file, 
LocalFile::Create(src));
+    PAIMON_ASSIGN_OR_RAISE(bool is_file, src_file->IsFile());
+    std::string new_file_name = dst;
+
+    if (is_file && new_file_name[new_file_name.length() - 1] == '/') {
+        return Status::Invalid(err_msg, "src file is not a dir");
+    }
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<LocalFile> dst_file, 
LocalFile::Create(dst));
+    std::unique_ptr<LocalFile> parent = dst_file->GetParentFile();
+    PAIMON_RETURN_NOT_OK(Mkdirs(parent->GetPath()));
+    if (::rename(src_file->GetPath().c_str(), dst_file->GetPath().c_str()) != 
0) {
+        int32_t cur_errno = errno;
+        return Status::IOError(err_msg, std::strerror(cur_errno));
+    }
+    return Status::OK();
+}
+
+// input stream
+Result<std::unique_ptr<LocalInputStream>> LocalInputStream::Create(
+    std::unique_ptr<LocalFile> file) {
+    PAIMON_RETURN_NOT_OK(file->OpenFile(/*is_read_file=*/true));
+    return std::unique_ptr<LocalInputStream>(new 
LocalInputStream(std::move(file)));
+}
+
+LocalInputStream::LocalInputStream(std::unique_ptr<LocalFile>&& file) : 
file_(std::move(file)) {}
+
+Status LocalInputStream::Seek(int64_t offset, SeekOrigin origin) {
+    if (origin != FS_SEEK_SET && origin != FS_SEEK_CUR && origin != 
FS_SEEK_END) {
+        return Status::Invalid(
+            "invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR, and 
FS_SEEK_END");
+    }
+    auto convert_origin = [](SeekOrigin origin) -> int32_t {
+        switch (origin) {
+            case FS_SEEK_SET:
+                return SEEK_SET;
+            case FS_SEEK_CUR:
+                return SEEK_CUR;
+            case FS_SEEK_END:
+                return SEEK_END;
+            default:
+                return SEEK_SET;
+        }
+    };
+    return file_->Seek(offset, convert_origin(origin));
+}
+
+Result<int64_t> LocalInputStream::GetPos() const {
+    return file_->Tell();
+}
+
+Result<int32_t> LocalInputStream::Read(char* buffer, uint32_t size) {
+    PAIMON_ASSIGN_OR_RAISE(int32_t read_length, file_->Read(buffer, size));
+    if (read_length != static_cast<int32_t>(size)) {
+        return Status::IOError(fmt::format("file '{}' read size {} != expected 
{}",
+                                           file_->GetPath(), read_length, 
size));
+    }
+    return read_length;
+}
+
+Result<int32_t> LocalInputStream::Read(char* buffer, uint32_t size, uint64_t 
offset) {
+    PAIMON_ASSIGN_OR_RAISE(int32_t read_length, file_->Read(buffer, size, 
offset));
+    if (read_length != static_cast<int32_t>(size)) {
+        return Status::IOError(fmt::format("file '{}' read size {} != expected 
{}",
+                                           file_->GetPath(), read_length, 
size));
+    }
+    return read_length;
+}
+
+void LocalInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+                                 std::function<void(Status)>&& callback) {
+    Result<int32_t> read_size = Read(buffer, size, offset);
+    Status status = Status::OK();
+    if (!read_size.ok()) {
+        status = read_size.status();
+    } else {
+        assert(read_size.value() == static_cast<int32_t>(size));
+    }
+    callback(status);
+}
+
+Result<uint64_t> LocalInputStream::Length() const {
+    return file_->Length();
+}
+
+Status LocalInputStream::Close() {
+    return file_->Close();
+}
+
+// output stream
+Result<std::unique_ptr<LocalOutputStream>> LocalOutputStream::Create(
+    std::unique_ptr<LocalFile> file) {
+    PAIMON_RETURN_NOT_OK(file->OpenFile(/*is_read_file=*/false));
+    return std::unique_ptr<LocalOutputStream>(new 
LocalOutputStream(std::move(file)));
+}
+
+LocalOutputStream::LocalOutputStream(std::unique_ptr<LocalFile>&& file) : 
file_(std::move(file)) {}
+
+Result<int64_t> LocalOutputStream::GetPos() const {
+    return file_->Tell();
+}
+Result<int32_t> LocalOutputStream::Write(const char* buffer, uint32_t size) {
+    return file_->Write(buffer, size);
+}
+Status LocalOutputStream::Flush() {
+    return file_->Flush();
+}
+Status LocalOutputStream::Close() {
+    return file_->Close();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/fs/local/local_file_system.h 
b/src/paimon/fs/local/local_file_system.h
new file mode 100644
index 0000000..3a4bc7a
--- /dev/null
+++ b/src/paimon/fs/local/local_file_system.h
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+// `FileSystem` for local file.
+class LocalFileSystem : public FileSystem {
+ public:
+    LocalFileSystem() = default;
+    ~LocalFileSystem() override = default;
+
+    Result<std::unique_ptr<InputStream>> Open(const std::string& path) const 
override;
+    Result<std::unique_ptr<OutputStream>> Create(const std::string& path,
+                                                 bool overwrite) const 
override;
+
+    Status Mkdirs(const std::string& path) const override;
+    Status Rename(const std::string& src, const std::string& dst) const 
override;
+    Status Delete(const std::string& path, bool recursive = true) const 
override;
+    Result<std::unique_ptr<FileStatus>> GetFileStatus(const std::string& path) 
const override;
+    Status ListDir(const std::string& directory,
+                   std::vector<std::unique_ptr<BasicFileStatus>>* 
file_status_list) const override;
+    Status ListFileStatus(
+        const std::string& path,
+        std::vector<std::unique_ptr<FileStatus>>* file_status_list) const 
override;
+    Result<bool> Exists(const std::string& path) const override;
+
+ private:
+    // the lock to ensure atomic renaming
+    static const std::mutex RENAME_LOCK;
+
+    Status Delete(std::unique_ptr<LocalFile>&& file, bool recursive = true) 
const;
+    std::string GetParentPath(const std::string& path) const;
+    Status MkdirsInternal(std::unique_ptr<LocalFile>&& file) const;
+};
+
+class LocalInputStream : public InputStream {
+ public:
+    static Result<std::unique_ptr<LocalInputStream>> 
Create(std::unique_ptr<LocalFile> file);
+
+    Status Seek(int64_t offset, SeekOrigin origin) override;
+    Result<int64_t> GetPos() const override;
+    Result<int32_t> Read(char* buffer, uint32_t size) override;
+    Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) 
override;
+    void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+                   std::function<void(Status)>&& callback) override;
+
+    Status Close() override;
+    Result<std::string> GetUri() const override {
+        return file_->GetPath();
+    }
+    Result<uint64_t> Length() const override;
+
+ private:
+    explicit LocalInputStream(std::unique_ptr<LocalFile>&& file);
+
+    std::unique_ptr<LocalFile> file_;
+};
+
+class LocalOutputStream : public OutputStream {
+ public:
+    static Result<std::unique_ptr<LocalOutputStream>> 
Create(std::unique_ptr<LocalFile> file);
+
+    Result<int64_t> GetPos() const override;
+    Result<int32_t> Write(const char* buffer, uint32_t size) override;
+    Status Flush() override;
+    Status Close() override;
+    Result<std::string> GetUri() const override {
+        return file_->GetPath();
+    }
+
+ private:
+    explicit LocalOutputStream(std::unique_ptr<LocalFile>&& file);
+
+    std::unique_ptr<LocalFile> file_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/fs/local/local_file_system_factory.cpp 
b/src/paimon/fs/local/local_file_system_factory.cpp
new file mode 100644
index 0000000..affac39
--- /dev/null
+++ b/src/paimon/fs/local/local_file_system_factory.cpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/local/local_file_system_factory.h"
+
+#include "paimon/factories/factory.h"
+
+namespace paimon {
+
+const char LocalFileSystemFactory::IDENTIFIER[] = "local";
+
+REGISTER_PAIMON_FACTORY(LocalFileSystemFactory);
+
+}  // namespace paimon
diff --git a/src/paimon/fs/local/local_file_system_factory.h 
b/src/paimon/fs/local/local_file_system_factory.h
new file mode 100644
index 0000000..1a3f763
--- /dev/null
+++ b/src/paimon/fs/local/local_file_system_factory.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class FileSystem;
+
+class LocalFileSystemFactory : public FileSystemFactory {
+ public:
+    static const char IDENTIFIER[];
+
+    const char* Identifier() const override {
+        return IDENTIFIER;
+    }
+
+    Result<std::unique_ptr<FileSystem>> Create(
+        const std::string& path, const std::map<std::string, std::string>& 
options) const override {
+        return std::make_unique<LocalFileSystem>();
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/fs/local/local_file_test.cpp 
b/src/paimon/fs/local/local_file_test.cpp
new file mode 100644
index 0000000..4ad3b97
--- /dev/null
+++ b/src/paimon/fs/local/local_file_test.cpp
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/local/local_file.h"
+
+#include <cstring>
+#include <string>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(LocalFileTest, TestReadWriteEmptyContent) {
+    auto test_root_dir = UniqueTestDirectory::Create();
+    ASSERT_TRUE(test_root_dir);
+    std::string test_root = test_root_dir->Str();
+    ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root));
+    if (dir->Exists().ok()) {
+        ASSERT_TRUE(dir->Delete().ok());
+    }
+    ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+    ASSERT_TRUE(success);
+    std::string path = test_root + "/test.txt";
+    ASSERT_OK_AND_ASSIGN(auto file, LocalFile::Create(path));
+    if (file->Exists().ok()) {
+        ASSERT_TRUE(file->Delete().ok());
+    }
+    ASSERT_OK_AND_ASSIGN(bool is_exist, file->Exists());
+    ASSERT_FALSE(is_exist);
+
+    ASSERT_OK(file->OpenFile(/*is_read_file=*/false));
+
+    const char* str = "";
+    const int32_t str_size = 0;
+    ASSERT_OK_AND_ASSIGN(int32_t write_size, file->Write(str, str_size));
+    ASSERT_EQ(write_size, str_size);
+
+    ASSERT_OK(file->Flush());
+    ASSERT_TRUE(file->Exists().value());
+
+    ASSERT_OK(file->Close());
+
+    ASSERT_OK_AND_ASSIGN(auto file2, LocalFile::Create(path));
+    ASSERT_OK(file2->OpenFile(/*is_read_file=*/true));
+    char buffer[10];
+    ASSERT_OK_AND_ASSIGN(int32_t read_len, file2->Read(buffer, 10));
+    ASSERT_EQ(0, read_len);
+}
+
+TEST(LocalFileTest, TestSimple) {
+    auto test_root_dir = UniqueTestDirectory::Create();
+    ASSERT_TRUE(test_root_dir);
+    std::string test_root = test_root_dir->Str();
+    ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root));
+    if (dir->Exists().ok()) {
+        ASSERT_OK(dir->Delete());
+    }
+    ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+    ASSERT_TRUE(success);
+    std::string path = test_root + "/test.txt";
+    ASSERT_OK_AND_ASSIGN(auto file, LocalFile::Create(path));
+    if (file->Exists().ok()) {
+        ASSERT_OK(file->Delete());
+    }
+    ASSERT_OK_AND_ASSIGN(bool is_exist, file->Exists());
+    ASSERT_FALSE(is_exist);
+
+    ASSERT_OK(file->OpenFile(/*is_read_file=*/false));
+
+    const char* str = "test_data";
+    const int32_t str_size = 9;
+    ASSERT_OK_AND_ASSIGN(int32_t write_size, file->Write(str, str_size));
+    ASSERT_EQ(write_size, str_size);
+
+    ASSERT_OK(file->Flush());
+    ASSERT_OK(file->Close());
+
+    ASSERT_OK_AND_ASSIGN(bool is_file, file->IsFile());
+    ASSERT_TRUE(is_file);
+
+    ASSERT_OK_AND_ASSIGN(bool is_dir, file->IsDir());
+    ASSERT_FALSE(is_dir);
+
+    std::vector<std::string> file_list;
+    ASSERT_NOK(file->List(&file_list));
+
+    ASSERT_OK_AND_ASSIGN(size_t len, file->Length());
+    ASSERT_EQ(len, str_size);
+
+    ASSERT_OK_AND_ASSIGN(auto file2, LocalFile::Create(path));
+    ASSERT_OK(file2->Exists());
+
+    ASSERT_OK(file2->OpenFile(true));
+    char str_read[str_size + 1];
+    {
+        ASSERT_OK_AND_ASSIGN(int32_t read_size, file2->Read(str_read, 4));
+        ASSERT_EQ(read_size, 4);
+        str_read[read_size] = '\0';
+        ASSERT_EQ(strcmp(str_read, "test"), 0);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(int64_t pos, file2->Tell());
+        ASSERT_EQ(pos, 4);
+        ASSERT_OK(file2->Seek(5, SEEK_SET));
+        ASSERT_OK_AND_ASSIGN(int32_t read_size, file2->Read(str_read, 4));
+        ASSERT_EQ(read_size, 4);
+        str_read[read_size] = '\0';
+        ASSERT_EQ(strcmp(str_read, "data"), 0);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(int32_t read_size, file2->Read(str_read, 
str_size, 0));
+        ASSERT_EQ(read_size, str_size);
+        str_read[read_size] = '\0';
+        ASSERT_EQ(strcmp(str_read, "test_data"), 0);
+    }
+
+    // dir already exists
+    ASSERT_OK_AND_ASSIGN(success, dir->Mkdir());
+    ASSERT_FALSE(success);
+
+    ASSERT_OK(file2->Delete());
+    ASSERT_FALSE(file2->Exists().value());
+}
+
+TEST(LocalFileTest, TestUsage) {
+    std::string test_root = "local_file_test_usage";
+    ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root));
+    ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+    ASSERT_TRUE(success);
+    std::vector<std::string> file_list;
+    ASSERT_OK(dir->List(&file_list));
+    std::string path_deep_dir = test_root + "/tmp2";
+    ASSERT_OK_AND_ASSIGN(auto deep_dir, LocalFile::Create(path_deep_dir));
+    ASSERT_OK_AND_ASSIGN(success, deep_dir->Mkdir());
+    ASSERT_TRUE(success);
+    std::unique_ptr<LocalFile> parent_deep_dir = deep_dir->GetParentFile();
+    ASSERT_EQ(parent_deep_dir->GetPath(), dir->GetPath());
+    ASSERT_OK(deep_dir->Delete());
+    ASSERT_OK(parent_deep_dir->Delete());
+    ASSERT_OK(dir->Delete());
+}
+
+TEST(LocalFileTest, TestOpenFile) {
+    auto test_root_dir = UniqueTestDirectory::Create();
+    ASSERT_TRUE(test_root_dir);
+    std::string test_root = test_root_dir->Str();
+    ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root));
+    if (dir->Exists().ok()) {
+        ASSERT_OK(dir->Delete());
+    }
+    ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+    ASSERT_TRUE(success);
+    std::string path = test_root + "/test.txt";
+    ASSERT_OK_AND_ASSIGN(auto file, LocalFile::Create(path));
+    if (file->Exists().ok()) {
+        ASSERT_OK(file->Delete());
+    }
+    ASSERT_OK_AND_ASSIGN(bool is_exist, file->Exists());
+    ASSERT_FALSE(is_exist);
+
+    ASSERT_NOK_WITH_MSG(file->OpenFile(/*is_read_file=*/true), "file not 
exist");
+    ASSERT_NOK_WITH_MSG(dir->OpenFile(/*is_read_file=*/true), "cannot open a 
directory");
+
+    std::string path3 = "test.txt";
+    ASSERT_OK_AND_ASSIGN(auto file3, LocalFile::Create(path3));
+    ASSERT_OK(file3->OpenFile(/*is_read_file=*/false));
+    ASSERT_OK_AND_ASSIGN(int64_t modify_time, file3->LastModifiedTimeMs());
+    ASSERT_GE(modify_time, -1);
+
+    ASSERT_OK_AND_ASSIGN(auto dir2, LocalFile::Create("/"));
+    ASSERT_OK_AND_ASSIGN(success, dir2->Mkdir());
+    ASSERT_FALSE(success);
+    ASSERT_OK_AND_ASSIGN(auto dir3, LocalFile::Create(test_root + "/"));
+    ASSERT_OK_AND_ASSIGN(success, dir3->Mkdir());
+    ASSERT_FALSE(success);
+}
+
+TEST(LocalFileTest, TestMkdir) {
+    auto test_root_dir = UniqueTestDirectory::Create();
+    ASSERT_TRUE(test_root_dir);
+    std::string test_root = test_root_dir->Str();
+    {
+        ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root + 
"tmp/local/f/1"));
+        ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+        ASSERT_FALSE(success);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root + "tmp1"));
+        ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+        ASSERT_TRUE(success);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root + 
"tmp1/f2/"));
+        ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+        ASSERT_TRUE(success);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create("/"));
+        ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+        ASSERT_FALSE(success);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(""));
+        ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir());
+        ASSERT_FALSE(success);
+    }
+}
+
+}  // namespace paimon::test

Reply via email to