This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new b53f9db  feat(fs): introduce file system abstractions (#26)
b53f9db is described below

commit b53f9db2dad4b2d5dc06110904b41cf1a17ea02d
Author: Zhang Jiawei <[email protected]>
AuthorDate: Mon Jun 1 11:28:48 2026 +0800

    feat(fs): introduce file system abstractions (#26)
    
    * feat(fs): migrate file system abstractions
    
    * feat(fs): add utility implementations
    
    * feat(fs): remove utils from migration
---
 LICENSE                                         |  11 +
 NOTICE                                          |   5 +-
 include/paimon/factories/factory.h              |  40 ++++
 include/paimon/factories/factory_creator.h      |  58 +++++
 include/paimon/factories/singleton.h            |  59 +++++
 include/paimon/fs/file_system.h                 | 272 ++++++++++++++++++++++++
 include/paimon/fs/file_system_factory.h         |  46 ++++
 src/paimon/common/factories/factory_creator.cpp |  74 +++++++
 src/paimon/common/factories/io_hook.cpp         |  90 ++++++++
 src/paimon/common/factories/io_hook.h           |  73 +++++++
 src/paimon/common/factories/singleton.cpp       |  45 ++++
 src/paimon/common/fs/file_system.cpp            |  98 +++++++++
 src/paimon/common/fs/file_system_factory.cpp    |  44 ++++
 src/paimon/common/fs/resolving_file_system.cpp  | 127 +++++++++++
 src/paimon/common/fs/resolving_file_system.h    |  72 +++++++
 15 files changed, 1113 insertions(+), 1 deletion(-)

diff --git a/LICENSE b/LICENSE
index 51438f3..5ddb768 100644
--- a/LICENSE
+++ b/LICENSE
@@ -202,6 +202,17 @@
 
 
--------------------------------------------------------------------------------
 
+This product includes code from Alibaba Havenask.
+
+* include/paimon/factories/singleton.h
+* src/paimon/common/factories/singleton.cpp
+
+Copyright: 2014-present Alibaba Inc.
+Home page: https://havenask.net/
+License: https://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
 This product includes code from RocksDB.
 
 * endian utility in src/paimon/common/utils/math.h
diff --git a/NOTICE b/NOTICE
index dbfb7d3..4005b5c 100644
--- a/NOTICE
+++ b/NOTICE
@@ -4,6 +4,9 @@ Copyright 2024-2026 The Apache Software Foundation
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
 
+This product includes software from Havenask project (Apache 2.0)
+Copyright 2014-present Alibaba Inc.
+
 Apache Arrow
 Copyright 2016-2024 The Apache Software Foundation
 
@@ -18,4 +21,4 @@ This product includes software from xxHash project (BSD 
2-clause)
 Copyright (C) 2012-2023 Yann Collet
 
 This product includes software from cppjieba project (MIT)
-Copyright 2013
\ No newline at end of file
+Copyright 2013
diff --git a/include/paimon/factories/factory.h 
b/include/paimon/factories/factory.h
new file mode 100644
index 0000000..2150fbf
--- /dev/null
+++ b/include/paimon/factories/factory.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/factories/factory_creator.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// Factory used to register, e.g., `FileFormatFactory`, `FileSystemFactory`.
+/// Call `REGISTER_PAIMON_FACTORY` to register a factory to `FactoryCreator`.
+class PAIMON_EXPORT Factory {
+ public:
+    virtual ~Factory() = default;
+    virtual const char* Identifier() const = 0;
+};
+
+}  // namespace paimon
+
+#define REGISTER_PAIMON_FACTORY(PAIMON_FACTORY)                                
    \
+    static __attribute__((constructor)) void 
Register##PAIMON_FACTORY##Factory() { \
+        auto factory_creator = paimon::FactoryCreator::GetInstance();          
    \
+        auto* factory = new PAIMON_FACTORY;                                    
    \
+        factory_creator->Register(factory->Identifier(), factory);             
    \
+    }
diff --git a/include/paimon/factories/factory_creator.h 
b/include/paimon/factories/factory_creator.h
new file mode 100644
index 0000000..5eeacf4
--- /dev/null
+++ b/include/paimon/factories/factory_creator.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "paimon/factories/singleton.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class Factory;
+
+/// Store a map from identifier to factory.
+/// After registration, you can get the registered factory by calling the 
`Create()` method.
+class PAIMON_EXPORT FactoryCreator : public Singleton<FactoryCreator> {
+ public:
+    FactoryCreator() = default;
+    ~FactoryCreator();
+
+    /// Register a factory with the given identifier.
+    void Register(const std::string& type, Factory* factory);
+
+    /// Create a factory with the given identifier.
+    Factory* Create(const std::string& type) const;
+
+    /// Get all registered types.
+    std::vector<std::string> GetRegisteredType() const;
+
+ private:
+    /// @note For test only.
+    void TEST_Unregister(const std::string& type);
+
+    std::map<std::string, Factory*> factories_;
+
+    mutable std::mutex mutex_;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/factories/singleton.h 
b/include/paimon/factories/singleton.h
new file mode 100644
index 0000000..6e12d45
--- /dev/null
+++ b/include/paimon/factories/singleton.h
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2014-present Alibaba Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Adapted from Alibaba Havenask
+// 
https://github.com/alibaba/havenask/blob/main/aios/storage/indexlib/util/Singleton.h
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/macros.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class PAIMON_EXPORT LazyInstantiation {
+ protected:
+    template <typename T>
+    static void Create(T*& ptr) {
+        T* tmp = new T;
+        MEMORY_BARRIER();
+        ptr = tmp;
+        static std::shared_ptr<T> destroyer(ptr);
+    }
+};
+
+/// A singleton implementation with customizable instantiation policy.
+template <typename T, typename InstPolicy = LazyInstantiation>
+class PAIMON_EXPORT Singleton : private InstPolicy {
+ protected:
+    Singleton(const Singleton&) {}
+    Singleton() = default;
+
+ public:
+    ~Singleton() = default;
+
+ public:
+    /// Provide access to the single instance through double-checked locking.
+    ///
+    /// Lazy create a singleton instance when `GetInstance()` is called.
+    ///
+    /// @return The single instance of object.
+    static T* GetInstance();
+};
+
+}  // namespace paimon
diff --git a/include/paimon/fs/file_system.h b/include/paimon/fs/file_system.h
new file mode 100644
index 0000000..f2388dc
--- /dev/null
+++ b/include/paimon/fs/file_system.h
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// Enumeration for stream seek origin positions.
+enum PAIMON_EXPORT SeekOrigin {
+    /// Seek from the beginning of the stream.
+    FS_SEEK_SET,
+    /// Seek from the current position in the stream.
+    FS_SEEK_CUR,
+    /// Seek from the end of the stream.
+    FS_SEEK_END
+};
+
+/// Abstract base class for all stream operations.
+class PAIMON_EXPORT Stream {
+ public:
+    virtual ~Stream() = default;
+    /// Close the stream.
+    virtual Status Close() = 0;
+};
+
+/// Abstract class for input stream operations.
+class PAIMON_EXPORT InputStream : public Stream {
+ public:
+    InputStream() = default;
+    ~InputStream() override = default;
+
+    /// Seek to a specified position in the input stream.
+    ///
+    /// @param offset The byte offset relative to the origin position.
+    /// @param origin The reference point for seeking (`::FS_SEEK_SET`, 
`::FS_SEEK_CUR`,
+    /// `::FS_SEEK_END`).
+    /// @return Status indicating success (OK) or failure with appropriate 
error information.
+    virtual Status Seek(int64_t offset, SeekOrigin origin) = 0;
+
+    /// Get the current position in the input stream.
+    ///
+    /// @return Current position in the input stream.
+    /// @return IOError returned if an I/O error occurred in the underlying 
stream.
+    /// implementation while accessing the stream's position.
+    virtual Result<int64_t> GetPos() const = 0;
+
+    /// Read data from the current position in the stream.
+    /// @param[out] buffer Pointer to the buffer where read data will be 
stored.
+    /// @param size Maximum number of bytes to read.
+    /// @return Result containing the actual number of bytes read on success, 
or an error status on
+    ///         failure.
+    /// @note The stream position advances by the number of bytes actually 
read.
+    virtual Result<int32_t> Read(char* buffer, uint32_t size) = 0;
+
+    /// Read data from given position in the stream.
+    ///
+    /// Read with offset performs like `pread()` function, which will not 
change the position in the
+    /// input stream.
+    ///
+    /// @param[out] buffer The buffer to store the read content.
+    /// @param size The number of bytes to read.
+    /// @param offset The position in the stream to read from.
+    virtual Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) 
= 0;
+
+    /// Asynchronously read data from the input stream.
+    ///
+    /// This function initiates an asynchronous read operation. The specified 
number of bytes
+    /// will be read from the stream starting at the given offset and stored 
in the provided buffer.
+    /// Once the read operation is complete, the provided callback function 
will be invoked with
+    /// the status of the read operation.
+    ///
+    /// @param[out] buffer The buffer to store the read content.
+    /// @param size The number of bytes to read.
+    /// @param offset The position in the stream to read from.
+    /// @param callback The callback function to be invoked upon completion of 
the read operation.
+    ///                 The callback will receive a Status object indicating 
the success or failure
+    ///                 of the read operation.
+    virtual void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+                           std::function<void(Status)>&& callback) = 0;
+
+    /// Get an identifier that uniquely identify the underlying content.
+    /// @return An uri if the underlying content can be uniquely identified.
+    /// @return Empty string if the underlying content cannot be uniquely 
identified.
+    virtual Result<std::string> GetUri() const = 0;
+
+    /// Get the total length of the file in bytes.
+    virtual Result<uint64_t> Length() const = 0;
+};
+
+/// Abstract class for output stream operations.
+class PAIMON_EXPORT OutputStream : public Stream {
+ public:
+    OutputStream() = default;
+
+    /// Write data to the output stream.
+    /// @param buffer Pointer to the data buffer to write.
+    /// @param size Number of bytes to write from the buffer.
+    /// @return Result containing the actual number of bytes written on 
success, or an error status
+    ///         on failure.
+    /// @note The stream position advances by the number of bytes actually 
written.
+    virtual Result<int32_t> Write(const char* buffer, uint32_t size) = 0;
+
+    /// Flush pending data to the disk.
+    virtual Status Flush() = 0;
+    /// Get the write position.
+    virtual Result<int64_t> GetPos() const = 0;
+    /// Get the uri of the output stream.
+    virtual Result<std::string> GetUri() const = 0;
+};
+
+/// Basic file status information interface.
+///
+/// This class provides fundamental file system metadata for files and 
directories. It serves as a
+/// lightweight interface for basic file operations that only require path 
information and directory
+/// status.
+class PAIMON_EXPORT BasicFileStatus {
+ public:
+    BasicFileStatus() = default;
+    virtual ~BasicFileStatus() = default;
+
+    /// Check if this entry represents a directory.
+    virtual bool IsDir() const = 0;
+
+    /// Get the path of this file or directory.
+    virtual std::string GetPath() const = 0;
+};
+
+/// Extended file status information interface.
+///
+/// This class extends BasicFileStatus to provide comprehensive file system 
metadata including file
+/// size, modification time, and other attributes. It's used for operations 
that require detailed
+/// file information.
+class PAIMON_EXPORT FileStatus {
+ public:
+    FileStatus() = default;
+    virtual ~FileStatus() = default;
+
+    /// Get the size of the file in bytes.
+    /// @note For directories, this method is undefined behavior.
+    virtual uint64_t GetLen() const = 0;
+
+    /// Check if this entry represents a directory.
+    virtual bool IsDir() const = 0;
+
+    /// Get the path of this file or directory.
+    virtual std::string GetPath() const = 0;
+
+    /// Get the last modification time of the file.
+    ///
+    /// @return A long value representing the time the file was last modified, 
measured in
+    /// milliseconds since the epoch (UTC January 1, 1970).
+    virtual int64_t GetModificationTime() const = 0;
+};
+
+/// Abstract file system interface.
+class PAIMON_EXPORT FileSystem {
+ public:
+    virtual ~FileSystem();
+
+    /// Check if the given path represents an object store.
+    /// @note Object stores typically have different semantics than 
traditional file systems.
+    static Result<bool> IsObjectStore(const std::string& path_str);
+
+    /// Open an existing file for reading.
+    /// @param path The file path to open.
+    /// @return Result containing a unique pointer to `InputStream` on 
success, or error status on
+    ///         failure (e.g., file not found, permission denied).
+    virtual Result<std::unique_ptr<InputStream>> Open(const std::string& path) 
const = 0;
+
+    /// Create a new file for writing.
+    /// @param path The file path to create.
+    /// @param overwrite If true, overwrite existing file; if false, fail if 
file exists.
+    /// @return Result containing a unique pointer to `OutputStream` on 
success, or error status on
+    ///         failure (e.g., I/O error, permission denied).
+    virtual Result<std::unique_ptr<OutputStream>> Create(const std::string& 
path,
+                                                         bool overwrite) const 
= 0;
+
+    /// Create directories recursively.
+    /// @param path The directory path to create (including all parent 
directories).
+    /// @return Status indicating success (OK) or failure with error 
information.
+    virtual Status Mkdirs(const std::string& path) const = 0;
+
+    /// Rename or move a file or directory.
+    /// @param src The source path (file or directory to rename/move).
+    /// @param dst The destination path.
+    /// @return Status indicating success (OK) or failure with error 
information.
+    virtual Status Rename(const std::string& src, const std::string& dst) 
const = 0;
+
+    /// Delete a file or directory.
+    /// @param path The path to delete.
+    /// @param recursive If true, delete directories and their contents 
recursively;
+    ///                  if false, only delete empty directories.
+    /// @return Status indicating success (OK) or failure with error 
information.
+    virtual Status Delete(const std::string& path, bool recursive = true) 
const = 0;
+    /// Get detailed status information for a file or directory.
+    /// @param path The file or directory path to query.
+    /// @return Result containing a unique pointer to `FileStatus` on success, 
or error status on
+    ///         failure (e.g., path not found, permission denied).
+    virtual Result<std::unique_ptr<FileStatus>> GetFileStatus(const 
std::string& path) const = 0;
+
+    /// List files of a directory (basic information only).
+    /// @param directory The directory path to list.
+    /// @param[out] file_status_list Output vector to store `BasicFileStatus` 
objects.
+    /// @return Status indicating success (OK) or failure with error 
information.
+    virtual Status ListDir(
+        const std::string& directory,
+        std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) const 
= 0;
+
+    /// List file status with detailed information.
+    /// @param path The file or directory path to list.
+    /// @param[out] file_status_list Output vector to store `FileStatus` 
objects.
+    /// @return Status indicating success (OK) or failure with error 
information.
+    virtual Status ListFileStatus(
+        const std::string& path,
+        std::vector<std::unique_ptr<FileStatus>>* file_status_list) const = 0;
+
+    /// Check if a file or directory exists.
+    /// @param path The file or directory path to check.
+    /// @return Result containing true if path exists, false if not found; or 
error status if
+    ///         I/O error occurs during check.
+    virtual Result<bool> Exists(const std::string& path) const = 0;
+
+    /// Read entire file content into a string at once.
+    /// @param path The file path to read.
+    /// @param[out] content Output string to store the file content.
+    /// @return Status indicating success (OK) or failure with error 
information.
+    /// @note Virtual only for mock testing purposes.
+    virtual Status ReadFile(const std::string& path, std::string* content);
+
+    /// Write the entire content to a file at once.
+    /// @param path The file path to write to.
+    /// @param content The string content to write.
+    /// @param overwrite If true, overwrite existing file; if false, fail if 
file exists.
+    /// @return Status indicating success (OK) or failure with error 
information.
+    /// @note Virtual only for mock testing purposes.
+    virtual Status WriteFile(const std::string& path, const std::string& 
content, bool overwrite);
+
+    /// Write content to a file atomically. Atomic operation: writes to 
temporary hidden file first,
+    /// then renames to target.
+    /// @param path The target file path.
+    /// @param content The string content to write.
+    /// @return Status indicating success (OK) or failure with error 
information.
+    /// @note Virtual only for mock testing purposes.
+    virtual Status AtomicStore(const std::string& path, const std::string& 
content);
+};
+
+}  // namespace paimon
diff --git a/include/paimon/fs/file_system_factory.h 
b/include/paimon/fs/file_system_factory.h
new file mode 100644
index 0000000..d18d8a4
--- /dev/null
+++ b/include/paimon/fs/file_system_factory.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/factories/factory.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// A factory for creating `FileSystem` instances.
+class PAIMON_EXPORT FileSystemFactory : public Factory {
+ public:
+    /// Create a `FileSystem` of current factory with specific path.
+    virtual Result<std::unique_ptr<FileSystem>> Create(
+        const std::string& path, const std::map<std::string, std::string>& 
options) const = 0;
+
+    /// Get `FileSystem` corresponding to identifier and specific path.
+    /// @pre Factory is already registered.
+    static Result<std::unique_ptr<FileSystem>> Get(
+        const std::string& identifier, const std::string& path,
+        const std::map<std::string, std::string>& fs_options);
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/factories/factory_creator.cpp 
b/src/paimon/common/factories/factory_creator.cpp
new file mode 100644
index 0000000..5fa65e7
--- /dev/null
+++ b/src/paimon/common/factories/factory_creator.cpp
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/factories/factory_creator.h"
+
+#include <cstdlib>
+#include <iostream>
+#include <utility>
+
+#include "paimon/factories/factory.h"
+
+namespace paimon {
+
+FactoryCreator::~FactoryCreator() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    for (auto iter : factories_) {
+        delete iter.second;
+        iter.second = nullptr;
+    }
+}
+
+Factory* FactoryCreator::Create(const std::string& type) const {
+    std::lock_guard<std::mutex> lock(mutex_);
+    auto iter = factories_.find(type);
+    if (iter == factories_.end()) {
+        return nullptr;
+    }
+    return iter->second;
+}
+
+void FactoryCreator::Register(const std::string& type, Factory* factory) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    auto iter = factories_.find(type);
+    if (iter != factories_.end()) {
+        std::cerr << "register conflict: type " << type << " already exist" << 
std::endl;
+        std::abort();
+    }
+    factories_[type] = factory;
+}
+
+void FactoryCreator::TEST_Unregister(const std::string& type) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    auto iter = factories_.find(type);
+    if (iter != factories_.end()) {
+        delete iter->second;
+        factories_.erase(iter);
+    }
+}
+
+std::vector<std::string> FactoryCreator::GetRegisteredType() const {
+    std::lock_guard<std::mutex> lock(mutex_);
+    std::vector<std::string> types;
+    for (const auto& kv : factories_) {
+        types.push_back(kv.first);
+    }
+    return types;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/factories/io_hook.cpp 
b/src/paimon/common/factories/io_hook.cpp
new file mode 100644
index 0000000..a0576b4
--- /dev/null
+++ b/src/paimon/common/factories/io_hook.cpp
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/factories/io_hook.h"
+
+#include <atomic>
+#include <stdexcept>
+
+#include "fmt/format.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class IOHook::Impl {
+ public:
+    Status Try(const std::string& path) {
+        if (io_count_.fetch_add(1) < pos_.load()) {
+            return Status::OK();
+        } else {
+            switch (mode_) {
+                case IOHook::Mode::SILENT:
+                    return Status::OK();
+                case IOHook::Mode::RETURN_ERROR:
+                    return Status::IOError(fmt::format(
+                        "io hook triggered io error at position {}, path {}", 
pos_.load(), path));
+                case IOHook::Mode::THROW_EXCEPTION:
+                    throw std::runtime_error(fmt::format(
+                        "io hook throw io exception at position {}, path {}", 
pos_.load(), path));
+                    return Status::OK();
+                default:
+                    return Status::OK();
+            }
+        }
+    }
+
+    inline void Reset(int64_t pos, IOHook::Mode mode) {
+        pos_ = pos;
+        io_count_ = 0;
+        mode_ = mode;
+    }
+
+    int64_t IOCount() const {
+        return io_count_.load();
+    }
+
+    void Clear() {
+        Reset(-1, IOHook::Mode::SILENT);
+    }
+
+ private:
+    std::atomic<int64_t> io_count_ = {0};
+    std::atomic<int64_t> pos_ = {-1};
+    IOHook::Mode mode_ = IOHook::Mode::SILENT;
+};
+
+IOHook::IOHook() : impl_(std::make_unique<IOHook::Impl>()) {}
+IOHook::~IOHook() = default;
+
+Status IOHook::Try(const std::string& path) {
+    return impl_->Try(path);
+}
+
+int64_t IOHook::IOCount() const {
+    return impl_->IOCount();
+}
+
+void IOHook::Clear() {
+    return impl_->Clear();
+}
+
+void IOHook::Reset(int64_t pos, IOHook::Mode mode) {
+    return impl_->Reset(pos, mode);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/factories/io_hook.h 
b/src/paimon/common/factories/io_hook.h
new file mode 100644
index 0000000..e0a2f68
--- /dev/null
+++ b/src/paimon/common/factories/io_hook.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "paimon/factories/singleton.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// IOHook is for IO event capture before real IO operation happens,
+/// and to simulate an IO error.
+class PAIMON_EXPORT IOHook : public Singleton<IOHook> {
+ public:
+    IOHook();
+    ~IOHook();
+    IOHook(const IOHook&) = delete;
+    IOHook& operator=(const IOHook&) = delete;
+
+    // Enum class to define modes of operation for error handling.
+    enum class Mode {
+        RETURN_ERROR,     // Mode to return an error code on IO operation.
+        THROW_EXCEPTION,  // Mode to throw an exception on IO operation.
+        SILENT,           // Mode to suppress errors silently without action.
+    };
+
+    /// Reset the IO exception position and behavior mode to handle the 
exception.
+    /// IOCount will be reset to 0.
+    ///
+    /// @params pos The position where the IO exception occurs.
+    /// @params mode The mode of behavior for handling the exception.
+    void Reset(int64_t pos, IOHook::Mode mode);
+    /// Try to trigger the IO exception based on the current settings.
+    ///
+    /// @return Status indicating the result of the operation (success or 
failure).
+    Status Try(const std::string& path);
+
+    /// Get the count of IO operations that have already occurred.
+    ///
+    /// @return The number of IO operations executed.
+    int64_t IOCount() const;
+
+    /// Clear the state of the IOHook, including resetting IO count and
+    /// any stored exception state.
+    void Clear();
+
+ private:
+    class Impl;
+
+    std::unique_ptr<Impl> impl_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/factories/singleton.cpp 
b/src/paimon/common/factories/singleton.cpp
new file mode 100644
index 0000000..a973225
--- /dev/null
+++ b/src/paimon/common/factories/singleton.cpp
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2014-present Alibaba Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Adapted from Alibaba Havenask
+// 
https://github.com/alibaba/havenask/blob/main/aios/storage/indexlib/util/Singleton.h
+
+#include "paimon/factories/singleton.h"
+
+#include <mutex>
+
+#include "paimon/common/factories/io_hook.h"
+#include "paimon/factories/factory_creator.h"
+
+namespace paimon {
+
+template <typename T, typename InstPolicy>
+T* Singleton<T, InstPolicy>::GetInstance() {
+    static T* ptr;
+    static std::mutex mutex;
+    if (PAIMON_UNLIKELY(!ptr)) {
+        std::lock_guard<std::mutex> lg(mutex);
+        if (!ptr) {
+            InstPolicy::Create(ptr);
+        }
+    }
+    return const_cast<T*>(ptr);
+}
+
+template class Singleton<FactoryCreator>;
+template class Singleton<IOHook>;
+
+}  // namespace paimon
diff --git a/src/paimon/common/fs/file_system.cpp 
b/src/paimon/common/fs/file_system.cpp
new file mode 100644
index 0000000..23895fc
--- /dev/null
+++ b/src/paimon/common/fs/file_system.cpp
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/file_system.h"
+
+#include <set>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/macros.h"
+
+namespace paimon {
+FileSystem::~FileSystem() = default;
+
+Result<bool> FileSystem::IsObjectStore(const std::string& path_str) {
+    static const std::set<std::string> non_object_store_schemes{"", "file", 
"hdfs", "dfs"};
+    PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_str));
+    auto path_scheme = StringUtils::ToLowerCase(path.scheme);
+    if (non_object_store_schemes.count(path_scheme)) {
+        return false;
+    }
+    return true;
+}
+
+Status FileSystem::ReadFile(const std::string& path, std::string* content) {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> in, Open(path));
+    if (PAIMON_UNLIKELY(in == nullptr)) {
+        return Status::Invalid("input stream is not suppose to be a null 
pointer, path: ", path);
+    }
+    {
+        ScopeGuard guard([&in]() -> void {
+            Status s = in->Close();
+            (void)s;
+        });
+        PAIMON_ASSIGN_OR_RAISE(uint64_t length, in->Length());
+        content->resize(length);
+        PAIMON_ASSIGN_OR_RAISE(int32_t read_length, in->Read(content->data(), 
length));
+        if (read_length != static_cast<int32_t>(length)) {
+            return Status::IOError(fmt::format("path {}, expect read len {}, 
actual read len {}",
+                                               path, length, read_length));
+        }
+    }
+    return Status::OK();
+}
+
+Status FileSystem::WriteFile(const std::string& path, const std::string& 
content, bool overwrite) {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<OutputStream> out, Create(path, 
overwrite));
+    if (PAIMON_UNLIKELY(out == nullptr)) {
+        return Status::Invalid("output stream is not suppose to be a null 
pointer, path: ", path);
+    }
+    {
+        ScopeGuard guard([&out]() -> void {
+            Status s = out->Close();
+            (void)s;
+        });
+        int32_t length = content.size();
+        PAIMON_ASSIGN_OR_RAISE(int32_t write_length, 
out->Write(content.data(), length));
+        if (write_length != length) {
+            return Status::IOError(fmt::format("path {}, expect write len {}, 
actual write len {}",
+                                               path, length, write_length));
+        }
+        PAIMON_RETURN_NOT_OK(out->Flush());
+    }
+    return Status::OK();
+}
+
+Status FileSystem::AtomicStore(const std::string& path, const std::string& 
content) {
+    // do not support overwrite for now
+    PAIMON_ASSIGN_OR_RAISE(std::string tmp_file_path, 
PathUtil::CreateTempPath(path));
+    ScopeGuard guard([&]() {
+        Status s = Delete(tmp_file_path);
+        (void)s;
+    });
+    PAIMON_RETURN_NOT_OK(WriteFile(tmp_file_path, content, 
/*overwrite=*/false));
+    PAIMON_RETURN_NOT_OK(Rename(tmp_file_path, path));
+    guard.Release();
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/fs/file_system_factory.cpp 
b/src/paimon/common/fs/file_system_factory.cpp
new file mode 100644
index 0000000..5c8da3e
--- /dev/null
+++ b/src/paimon/common/fs/file_system_factory.cpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/file_system_factory.h"
+
+#include "fmt/format.h"
+#include "paimon/factories/factory_creator.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+Result<std::unique_ptr<FileSystem>> FileSystemFactory::Get(
+    const std::string& identifier, const std::string& path,
+    const std::map<std::string, std::string>& fs_options) {
+    auto factory_creator = FactoryCreator::GetInstance();
+    auto factory = factory_creator->Create(identifier);
+    if (factory == nullptr) {
+        return Status::Invalid(
+            fmt::format("Create factory failed with identifier '{}'.", 
identifier));
+    }
+    auto file_system_factory = dynamic_cast<FileSystemFactory*>(factory);
+    if (file_system_factory == nullptr) {
+        return Status::Invalid(
+            fmt::format("Failed to cast file system factory with identifier 
'{}'.", identifier));
+    }
+    return file_system_factory->Create(path, fs_options);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/fs/resolving_file_system.cpp 
b/src/paimon/common/fs/resolving_file_system.cpp
new file mode 100644
index 0000000..69fc282
--- /dev/null
+++ b/src/paimon/common/fs/resolving_file_system.cpp
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/fs/resolving_file_system.h"
+
+#include <mutex>
+#include <shared_mutex>
+#include <utility>
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/fs/file_system_factory.h"
+
+namespace paimon {
+
+ResolvingFileSystem::ResolvingFileSystem(
+    const std::map<std::string, std::string>& scheme_to_fs_identifier,
+    const std::string& default_fs_identifier, const std::map<std::string, 
std::string>& options)
+    : scheme_to_fs_identifier_(scheme_to_fs_identifier),
+      default_fs_identifier_(default_fs_identifier),
+      options_(options) {
+    // local file's scheme may be 'file' or empty
+    auto identifier_iter = scheme_to_fs_identifier_.find("file");
+    if (identifier_iter != scheme_to_fs_identifier_.end()) {
+        scheme_to_fs_identifier_[""] = identifier_iter->second;
+    }
+}
+
+Result<std::shared_ptr<FileSystem>> ResolvingFileSystem::GetRealFileSystem(
+    const std::string& uri) const {
+    PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(uri));
+
+    // Try to get file system from cache with shared lock (read lock)
+    {
+        std::shared_lock<std::shared_mutex> read_lock(fs_cache_mutex_);
+        auto fs_iter = fs_cache_.find({path.scheme, path.authority});
+        if (fs_iter != fs_cache_.end()) {
+            return fs_iter->second;
+        }
+    }
+
+    // Cache miss, create file system and set it to cache with exclusive lock 
(write lock)
+    std::unique_lock<std::shared_mutex> write_lock(fs_cache_mutex_);
+
+    // Double-check pattern: check again after acquiring write lock
+    auto fs_iter = fs_cache_.find({path.scheme, path.authority});
+    if (fs_iter != fs_cache_.end()) {
+        return fs_iter->second;
+    }
+
+    // Create file system
+    std::string identifier = default_fs_identifier_;
+    auto identifier_iter = scheme_to_fs_identifier_.find(path.scheme);
+    if (identifier_iter != scheme_to_fs_identifier_.end()) {
+        identifier = identifier_iter->second;
+    }
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
+                           FileSystemFactory::Get(identifier, uri, options_));
+    fs_cache_.emplace(std::make_pair(path.scheme, path.authority), fs);
+    return fs;
+}
+
+Result<std::unique_ptr<InputStream>> ResolvingFileSystem::Open(const 
std::string& path) const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, 
GetRealFileSystem(path));
+    return fs->Open(path);
+}
+
+Result<std::unique_ptr<OutputStream>> ResolvingFileSystem::Create(const 
std::string& path,
+                                                                  bool 
overwrite) const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, 
GetRealFileSystem(path));
+    return fs->Create(path, overwrite);
+}
+
+Status ResolvingFileSystem::Mkdirs(const std::string& path) const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, 
GetRealFileSystem(path));
+    return fs->Mkdirs(path);
+}
+
+Status ResolvingFileSystem::Rename(const std::string& src, const std::string& 
dst) const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, 
GetRealFileSystem(src));
+    return fs->Rename(src, dst);
+}
+
+Status ResolvingFileSystem::Delete(const std::string& path, bool recursive) 
const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, 
GetRealFileSystem(path));
+    return fs->Delete(path, recursive);
+}
+
+Result<std::unique_ptr<FileStatus>> ResolvingFileSystem::GetFileStatus(
+    const std::string& path) const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, 
GetRealFileSystem(path));
+    return fs->GetFileStatus(path);
+}
+
+Status ResolvingFileSystem::ListDir(
+    const std::string& directory,
+    std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, 
GetRealFileSystem(directory));
+    return fs->ListDir(directory, file_status_list);
+}
+
+Status ResolvingFileSystem::ListFileStatus(
+    const std::string& path, std::vector<std::unique_ptr<FileStatus>>* 
file_status_list) const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, 
GetRealFileSystem(path));
+    return fs->ListFileStatus(path, file_status_list);
+}
+
+Result<bool> ResolvingFileSystem::Exists(const std::string& path) const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs, 
GetRealFileSystem(path));
+    return fs->Exists(path);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/fs/resolving_file_system.h 
b/src/paimon/common/fs/resolving_file_system.h
new file mode 100644
index 0000000..2c5c643
--- /dev/null
+++ b/src/paimon/common/fs/resolving_file_system.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <shared_mutex>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+/// An implementation of `FileSystem` that supports multiple file system 
schemes. It
+/// dynamically selects the appropriate `FileSystem` based on the URI scheme 
of the given path.
+class ResolvingFileSystem : public FileSystem {
+ public:
+    ResolvingFileSystem(const std::map<std::string, std::string>& 
scheme_to_fs_identifier,
+                        const std::string& default_fs_identifier,
+                        const std::map<std::string, std::string>& options);
+    ~ResolvingFileSystem() override = default;
+
+    Result<std::unique_ptr<InputStream>> Open(const std::string& path) const 
override;
+    Result<std::unique_ptr<OutputStream>> Create(const std::string& path,
+                                                 bool overwrite) const 
override;
+
+    Status Mkdirs(const std::string& path) const override;
+    Status Rename(const std::string& src, const std::string& dst) const 
override;
+    Status Delete(const std::string& path, bool recursive = true) const 
override;
+    Result<std::unique_ptr<FileStatus>> GetFileStatus(const std::string& path) 
const override;
+    Status ListDir(const std::string& directory,
+                   std::vector<std::unique_ptr<BasicFileStatus>>* 
file_status_list) const override;
+    Status ListFileStatus(
+        const std::string& path,
+        std::vector<std::unique_ptr<FileStatus>>* file_status_list) const 
override;
+    Result<bool> Exists(const std::string& path) const override;
+
+ private:
+    Result<std::shared_ptr<FileSystem>> GetRealFileSystem(const std::string& 
uri) const;
+
+ private:
+    // e.g.: {{"file", "local"}, {"oss", "jindo"}}
+    std::map<std::string, std::string> scheme_to_fs_identifier_;
+    std::string default_fs_identifier_;
+    std::map<std::string, std::string> options_;
+    // {scheme, authority} -> FileSystem
+    mutable std::map<std::pair<std::string, std::string>, 
std::shared_ptr<FileSystem>> fs_cache_;
+    // Read-write lock for fs_cache_ thread safety
+    mutable std::shared_mutex fs_cache_mutex_;
+};
+
+}  // namespace paimon


Reply via email to