This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new b53f9db feat(fs): introduce file system abstractions (#26)
b53f9db is described below
commit b53f9db2dad4b2d5dc06110904b41cf1a17ea02d
Author: Zhang Jiawei <[email protected]>
AuthorDate: Mon Jun 1 11:28:48 2026 +0800
feat(fs): introduce file system abstractions (#26)
* feat(fs): migrate file system abstractions
* feat(fs): add utility implementations
* feat(fs): remove utils from migration
---
LICENSE | 11 +
NOTICE | 5 +-
include/paimon/factories/factory.h | 40 ++++
include/paimon/factories/factory_creator.h | 58 +++++
include/paimon/factories/singleton.h | 59 +++++
include/paimon/fs/file_system.h | 272 ++++++++++++++++++++++++
include/paimon/fs/file_system_factory.h | 46 ++++
src/paimon/common/factories/factory_creator.cpp | 74 +++++++
src/paimon/common/factories/io_hook.cpp | 90 ++++++++
src/paimon/common/factories/io_hook.h | 73 +++++++
src/paimon/common/factories/singleton.cpp | 45 ++++
src/paimon/common/fs/file_system.cpp | 98 +++++++++
src/paimon/common/fs/file_system_factory.cpp | 44 ++++
src/paimon/common/fs/resolving_file_system.cpp | 127 +++++++++++
src/paimon/common/fs/resolving_file_system.h | 72 +++++++
15 files changed, 1113 insertions(+), 1 deletion(-)
diff --git a/LICENSE b/LICENSE
index 51438f3..5ddb768 100644
--- a/LICENSE
+++ b/LICENSE
@@ -202,6 +202,17 @@
--------------------------------------------------------------------------------
+This product includes code from Alibaba Havenask.
+
+* include/paimon/factories/singleton.h
+* src/paimon/common/factories/singleton.cpp
+
+Copyright: 2014-present Alibaba Inc.
+Home page: https://havenask.net/
+License: https://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
This product includes code from RocksDB.
* endian utility in src/paimon/common/utils/math.h
diff --git a/NOTICE b/NOTICE
index dbfb7d3..4005b5c 100644
--- a/NOTICE
+++ b/NOTICE
@@ -4,6 +4,9 @@ Copyright 2024-2026 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+This product includes software from Havenask project (Apache 2.0)
+Copyright 2014-present Alibaba Inc.
+
Apache Arrow
Copyright 2016-2024 The Apache Software Foundation
@@ -18,4 +21,4 @@ This product includes software from xxHash project (BSD
2-clause)
Copyright (C) 2012-2023 Yann Collet
This product includes software from cppjieba project (MIT)
-Copyright 2013
\ No newline at end of file
+Copyright 2013
diff --git a/include/paimon/factories/factory.h
b/include/paimon/factories/factory.h
new file mode 100644
index 0000000..2150fbf
--- /dev/null
+++ b/include/paimon/factories/factory.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/factories/factory_creator.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// Factory used to register, e.g., `FileFormatFactory`, `FileSystemFactory`.
+/// Call `REGISTER_PAIMON_FACTORY` to register a factory to `FactoryCreator`.
+class PAIMON_EXPORT Factory {
+ public:
+ virtual ~Factory() = default;
+ virtual const char* Identifier() const = 0;
+};
+
+} // namespace paimon
+
+#define REGISTER_PAIMON_FACTORY(PAIMON_FACTORY)
\
+ static __attribute__((constructor)) void
Register##PAIMON_FACTORY##Factory() { \
+ auto factory_creator = paimon::FactoryCreator::GetInstance();
\
+ auto* factory = new PAIMON_FACTORY;
\
+ factory_creator->Register(factory->Identifier(), factory);
\
+ }
diff --git a/include/paimon/factories/factory_creator.h
b/include/paimon/factories/factory_creator.h
new file mode 100644
index 0000000..5eeacf4
--- /dev/null
+++ b/include/paimon/factories/factory_creator.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "paimon/factories/singleton.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class Factory;
+
+/// Store a map from identifier to factory.
+/// After registration, you can get the registered factory by calling the
`Create()` method.
+class PAIMON_EXPORT FactoryCreator : public Singleton<FactoryCreator> {
+ public:
+ FactoryCreator() = default;
+ ~FactoryCreator();
+
+ /// Register a factory with the given identifier.
+ void Register(const std::string& type, Factory* factory);
+
+ /// Create a factory with the given identifier.
+ Factory* Create(const std::string& type) const;
+
+ /// Get all registered types.
+ std::vector<std::string> GetRegisteredType() const;
+
+ private:
+ /// @note For test only.
+ void TEST_Unregister(const std::string& type);
+
+ std::map<std::string, Factory*> factories_;
+
+ mutable std::mutex mutex_;
+};
+
+} // namespace paimon
diff --git a/include/paimon/factories/singleton.h
b/include/paimon/factories/singleton.h
new file mode 100644
index 0000000..6e12d45
--- /dev/null
+++ b/include/paimon/factories/singleton.h
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2014-present Alibaba Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Adapted from Alibaba Havenask
+//
https://github.com/alibaba/havenask/blob/main/aios/storage/indexlib/util/Singleton.h
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/macros.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class PAIMON_EXPORT LazyInstantiation {
+ protected:
+ template <typename T>
+ static void Create(T*& ptr) {
+ T* tmp = new T;
+ MEMORY_BARRIER();
+ ptr = tmp;
+ static std::shared_ptr<T> destroyer(ptr);
+ }
+};
+
+/// A singleton implementation with customizable instantiation policy.
+template <typename T, typename InstPolicy = LazyInstantiation>
+class PAIMON_EXPORT Singleton : private InstPolicy {
+ protected:
+ Singleton(const Singleton&) {}
+ Singleton() = default;
+
+ public:
+ ~Singleton() = default;
+
+ public:
+ /// Provide access to the single instance through double-checked locking.
+ ///
+ /// Lazy create a singleton instance when `GetInstance()` is called.
+ ///
+ /// @return The single instance of object.
+ static T* GetInstance();
+};
+
+} // namespace paimon
diff --git a/include/paimon/fs/file_system.h b/include/paimon/fs/file_system.h
new file mode 100644
index 0000000..f2388dc
--- /dev/null
+++ b/include/paimon/fs/file_system.h
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// Enumeration for stream seek origin positions.
+enum PAIMON_EXPORT SeekOrigin {
+ /// Seek from the beginning of the stream.
+ FS_SEEK_SET,
+ /// Seek from the current position in the stream.
+ FS_SEEK_CUR,
+ /// Seek from the end of the stream.
+ FS_SEEK_END
+};
+
+/// Abstract base class for all stream operations.
+class PAIMON_EXPORT Stream {
+ public:
+ virtual ~Stream() = default;
+ /// Close the stream.
+ virtual Status Close() = 0;
+};
+
+/// Abstract class for input stream operations.
+class PAIMON_EXPORT InputStream : public Stream {
+ public:
+ InputStream() = default;
+ ~InputStream() override = default;
+
+ /// Seek to a specified position in the input stream.
+ ///
+ /// @param offset The byte offset relative to the origin position.
+ /// @param origin The reference point for seeking (`::FS_SEEK_SET`,
`::FS_SEEK_CUR`,
+ /// `::FS_SEEK_END`).
+ /// @return Status indicating success (OK) or failure with appropriate
error information.
+ virtual Status Seek(int64_t offset, SeekOrigin origin) = 0;
+
+ /// Get the current position in the input stream.
+ ///
+ /// @return Current position in the input stream.
+ /// @return IOError returned if an I/O error occurred in the underlying
stream.
+ /// implementation while accessing the stream's position.
+ virtual Result<int64_t> GetPos() const = 0;
+
+ /// Read data from the current position in the stream.
+ /// @param[out] buffer Pointer to the buffer where read data will be
stored.
+ /// @param size Maximum number of bytes to read.
+ /// @return Result containing the actual number of bytes read on success,
or an error status on
+ /// failure.
+ /// @note The stream position advances by the number of bytes actually
read.
+ virtual Result<int32_t> Read(char* buffer, uint32_t size) = 0;
+
+ /// Read data from given position in the stream.
+ ///
+ /// Read with offset performs like `pread()` function, which will not
change the position in the
+ /// input stream.
+ ///
+ /// @param[out] buffer The buffer to store the read content.
+ /// @param size The number of bytes to read.
+ /// @param offset The position in the stream to read from.
+ virtual Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset)
= 0;
+
+ /// Asynchronously read data from the input stream.
+ ///
+ /// This function initiates an asynchronous read operation. The specified
number of bytes
+ /// will be read from the stream starting at the given offset and stored
in the provided buffer.
+ /// Once the read operation is complete, the provided callback function
will be invoked with
+ /// the status of the read operation.
+ ///
+ /// @param[out] buffer The buffer to store the read content.
+ /// @param size The number of bytes to read.
+ /// @param offset The position in the stream to read from.
+ /// @param callback The callback function to be invoked upon completion of
the read operation.
+ /// The callback will receive a Status object indicating
the success or failure
+ /// of the read operation.
+ virtual void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+ std::function<void(Status)>&& callback) = 0;
+
+ /// Get an identifier that uniquely identify the underlying content.
+ /// @return An uri if the underlying content can be uniquely identified.
+ /// @return Empty string if the underlying content cannot be uniquely
identified.
+ virtual Result<std::string> GetUri() const = 0;
+
+ /// Get the total length of the file in bytes.
+ virtual Result<uint64_t> Length() const = 0;
+};
+
+/// Abstract class for output stream operations.
+class PAIMON_EXPORT OutputStream : public Stream {
+ public:
+ OutputStream() = default;
+
+ /// Write data to the output stream.
+ /// @param buffer Pointer to the data buffer to write.
+ /// @param size Number of bytes to write from the buffer.
+ /// @return Result containing the actual number of bytes written on
success, or an error status
+ /// on failure.
+ /// @note The stream position advances by the number of bytes actually
written.
+ virtual Result<int32_t> Write(const char* buffer, uint32_t size) = 0;
+
+ /// Flush pending data to the disk.
+ virtual Status Flush() = 0;
+ /// Get the write position.
+ virtual Result<int64_t> GetPos() const = 0;
+ /// Get the uri of the output stream.
+ virtual Result<std::string> GetUri() const = 0;
+};
+
+/// Basic file status information interface.
+///
+/// This class provides fundamental file system metadata for files and
directories. It serves as a
+/// lightweight interface for basic file operations that only require path
information and directory
+/// status.
+class PAIMON_EXPORT BasicFileStatus {
+ public:
+ BasicFileStatus() = default;
+ virtual ~BasicFileStatus() = default;
+
+ /// Check if this entry represents a directory.
+ virtual bool IsDir() const = 0;
+
+ /// Get the path of this file or directory.
+ virtual std::string GetPath() const = 0;
+};
+
+/// Extended file status information interface.
+///
+/// This class extends BasicFileStatus to provide comprehensive file system
metadata including file
+/// size, modification time, and other attributes. It's used for operations
that require detailed
+/// file information.
+class PAIMON_EXPORT FileStatus {
+ public:
+ FileStatus() = default;
+ virtual ~FileStatus() = default;
+
+ /// Get the size of the file in bytes.
+ /// @note For directories, this method is undefined behavior.
+ virtual uint64_t GetLen() const = 0;
+
+ /// Check if this entry represents a directory.
+ virtual bool IsDir() const = 0;
+
+ /// Get the path of this file or directory.
+ virtual std::string GetPath() const = 0;
+
+ /// Get the last modification time of the file.
+ ///
+ /// @return A long value representing the time the file was last modified,
measured in
+ /// milliseconds since the epoch (UTC January 1, 1970).
+ virtual int64_t GetModificationTime() const = 0;
+};
+
+/// Abstract file system interface.
+class PAIMON_EXPORT FileSystem {
+ public:
+ virtual ~FileSystem();
+
+ /// Check if the given path represents an object store.
+ /// @note Object stores typically have different semantics than
traditional file systems.
+ static Result<bool> IsObjectStore(const std::string& path_str);
+
+ /// Open an existing file for reading.
+ /// @param path The file path to open.
+ /// @return Result containing a unique pointer to `InputStream` on
success, or error status on
+ /// failure (e.g., file not found, permission denied).
+ virtual Result<std::unique_ptr<InputStream>> Open(const std::string& path)
const = 0;
+
+ /// Create a new file for writing.
+ /// @param path The file path to create.
+ /// @param overwrite If true, overwrite existing file; if false, fail if
file exists.
+ /// @return Result containing a unique pointer to `OutputStream` on
success, or error status on
+ /// failure (e.g., I/O error, permission denied).
+ virtual Result<std::unique_ptr<OutputStream>> Create(const std::string&
path,
+ bool overwrite) const
= 0;
+
+ /// Create directories recursively.
+ /// @param path The directory path to create (including all parent
directories).
+ /// @return Status indicating success (OK) or failure with error
information.
+ virtual Status Mkdirs(const std::string& path) const = 0;
+
+ /// Rename or move a file or directory.
+ /// @param src The source path (file or directory to rename/move).
+ /// @param dst The destination path.
+ /// @return Status indicating success (OK) or failure with error
information.
+ virtual Status Rename(const std::string& src, const std::string& dst)
const = 0;
+
+ /// Delete a file or directory.
+ /// @param path The path to delete.
+ /// @param recursive If true, delete directories and their contents
recursively;
+ /// if false, only delete empty directories.
+ /// @return Status indicating success (OK) or failure with error
information.
+ virtual Status Delete(const std::string& path, bool recursive = true)
const = 0;
+ /// Get detailed status information for a file or directory.
+ /// @param path The file or directory path to query.
+ /// @return Result containing a unique pointer to `FileStatus` on success,
or error status on
+ /// failure (e.g., path not found, permission denied).
+ virtual Result<std::unique_ptr<FileStatus>> GetFileStatus(const
std::string& path) const = 0;
+
+ /// List files of a directory (basic information only).
+ /// @param directory The directory path to list.
+ /// @param[out] file_status_list Output vector to store `BasicFileStatus`
objects.
+ /// @return Status indicating success (OK) or failure with error
information.
+ virtual Status ListDir(
+ const std::string& directory,
+ std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) const
= 0;
+
+ /// List file status with detailed information.
+ /// @param path The file or directory path to list.
+ /// @param[out] file_status_list Output vector to store `FileStatus`
objects.
+ /// @return Status indicating success (OK) or failure with error
information.
+ virtual Status ListFileStatus(
+ const std::string& path,
+ std::vector<std::unique_ptr<FileStatus>>* file_status_list) const = 0;
+
+ /// Check if a file or directory exists.
+ /// @param path The file or directory path to check.
+ /// @return Result containing true if path exists, false if not found; or
error status if
+ /// I/O error occurs during check.
+ virtual Result<bool> Exists(const std::string& path) const = 0;
+
+ /// Read entire file content into a string at once.
+ /// @param path The file path to read.
+ /// @param[out] content Output string to store the file content.
+ /// @return Status indicating success (OK) or failure with error
information.
+ /// @note Virtual only for mock testing purposes.
+ virtual Status ReadFile(const std::string& path, std::string* content);
+
+ /// Write the entire content to a file at once.
+ /// @param path The file path to write to.
+ /// @param content The string content to write.
+ /// @param overwrite If true, overwrite existing file; if false, fail if
file exists.
+ /// @return Status indicating success (OK) or failure with error
information.
+ /// @note Virtual only for mock testing purposes.
+ virtual Status WriteFile(const std::string& path, const std::string&
content, bool overwrite);
+
+ /// Write content to a file atomically. Atomic operation: writes to
temporary hidden file first,
+ /// then renames to target.
+ /// @param path The target file path.
+ /// @param content The string content to write.
+ /// @return Status indicating success (OK) or failure with error
information.
+ /// @note Virtual only for mock testing purposes.
+ virtual Status AtomicStore(const std::string& path, const std::string&
content);
+};
+
+} // namespace paimon
diff --git a/include/paimon/fs/file_system_factory.h
b/include/paimon/fs/file_system_factory.h
new file mode 100644
index 0000000..d18d8a4
--- /dev/null
+++ b/include/paimon/fs/file_system_factory.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/factories/factory.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// A factory for creating `FileSystem` instances.
+class PAIMON_EXPORT FileSystemFactory : public Factory {
+ public:
+ /// Create a `FileSystem` of current factory with specific path.
+ virtual Result<std::unique_ptr<FileSystem>> Create(
+ const std::string& path, const std::map<std::string, std::string>&
options) const = 0;
+
+ /// Get `FileSystem` corresponding to identifier and specific path.
+ /// @pre Factory is already registered.
+ static Result<std::unique_ptr<FileSystem>> Get(
+ const std::string& identifier, const std::string& path,
+ const std::map<std::string, std::string>& fs_options);
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/factories/factory_creator.cpp
b/src/paimon/common/factories/factory_creator.cpp
new file mode 100644
index 0000000..5fa65e7
--- /dev/null
+++ b/src/paimon/common/factories/factory_creator.cpp
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/factories/factory_creator.h"
+
+#include <cstdlib>
+#include <iostream>
+#include <utility>
+
+#include "paimon/factories/factory.h"
+
+namespace paimon {
+
+FactoryCreator::~FactoryCreator() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto iter : factories_) {
+ delete iter.second;
+ iter.second = nullptr;
+ }
+}
+
+Factory* FactoryCreator::Create(const std::string& type) const {
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto iter = factories_.find(type);
+ if (iter == factories_.end()) {
+ return nullptr;
+ }
+ return iter->second;
+}
+
+void FactoryCreator::Register(const std::string& type, Factory* factory) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto iter = factories_.find(type);
+ if (iter != factories_.end()) {
+ std::cerr << "register conflict: type " << type << " already exist" <<
std::endl;
+ std::abort();
+ }
+ factories_[type] = factory;
+}
+
+void FactoryCreator::TEST_Unregister(const std::string& type) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto iter = factories_.find(type);
+ if (iter != factories_.end()) {
+ delete iter->second;
+ factories_.erase(iter);
+ }
+}
+
+std::vector<std::string> FactoryCreator::GetRegisteredType() const {
+ std::lock_guard<std::mutex> lock(mutex_);
+ std::vector<std::string> types;
+ for (const auto& kv : factories_) {
+ types.push_back(kv.first);
+ }
+ return types;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/factories/io_hook.cpp
b/src/paimon/common/factories/io_hook.cpp
new file mode 100644
index 0000000..a0576b4
--- /dev/null
+++ b/src/paimon/common/factories/io_hook.cpp
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/factories/io_hook.h"
+
+#include <atomic>
+#include <stdexcept>
+
+#include "fmt/format.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class IOHook::Impl {
+ public:
+ Status Try(const std::string& path) {
+ if (io_count_.fetch_add(1) < pos_.load()) {
+ return Status::OK();
+ } else {
+ switch (mode_) {
+ case IOHook::Mode::SILENT:
+ return Status::OK();
+ case IOHook::Mode::RETURN_ERROR:
+ return Status::IOError(fmt::format(
+ "io hook triggered io error at position {}, path {}",
pos_.load(), path));
+ case IOHook::Mode::THROW_EXCEPTION:
+ throw std::runtime_error(fmt::format(
+ "io hook throw io exception at position {}, path {}",
pos_.load(), path));
+ return Status::OK();
+ default:
+ return Status::OK();
+ }
+ }
+ }
+
+ inline void Reset(int64_t pos, IOHook::Mode mode) {
+ pos_ = pos;
+ io_count_ = 0;
+ mode_ = mode;
+ }
+
+ int64_t IOCount() const {
+ return io_count_.load();
+ }
+
+ void Clear() {
+ Reset(-1, IOHook::Mode::SILENT);
+ }
+
+ private:
+ std::atomic<int64_t> io_count_ = {0};
+ std::atomic<int64_t> pos_ = {-1};
+ IOHook::Mode mode_ = IOHook::Mode::SILENT;
+};
+
+IOHook::IOHook() : impl_(std::make_unique<IOHook::Impl>()) {}
+IOHook::~IOHook() = default;
+
+Status IOHook::Try(const std::string& path) {
+ return impl_->Try(path);
+}
+
+int64_t IOHook::IOCount() const {
+ return impl_->IOCount();
+}
+
+void IOHook::Clear() {
+ return impl_->Clear();
+}
+
+void IOHook::Reset(int64_t pos, IOHook::Mode mode) {
+ return impl_->Reset(pos, mode);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/factories/io_hook.h
b/src/paimon/common/factories/io_hook.h
new file mode 100644
index 0000000..e0a2f68
--- /dev/null
+++ b/src/paimon/common/factories/io_hook.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "paimon/factories/singleton.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// IOHook is for IO event capture before real IO operation happens,
+/// and to simulate an IO error.
+class PAIMON_EXPORT IOHook : public Singleton<IOHook> {
+ public:
+ IOHook();
+ ~IOHook();
+ IOHook(const IOHook&) = delete;
+ IOHook& operator=(const IOHook&) = delete;
+
+ // Enum class to define modes of operation for error handling.
+ enum class Mode {
+ RETURN_ERROR, // Mode to return an error code on IO operation.
+ THROW_EXCEPTION, // Mode to throw an exception on IO operation.
+ SILENT, // Mode to suppress errors silently without action.
+ };
+
+ /// Reset the IO exception position and behavior mode to handle the
exception.
+ /// IOCount will be reset to 0.
+ ///
+ /// @params pos The position where the IO exception occurs.
+ /// @params mode The mode of behavior for handling the exception.
+ void Reset(int64_t pos, IOHook::Mode mode);
+ /// Try to trigger the IO exception based on the current settings.
+ ///
+ /// @return Status indicating the result of the operation (success or
failure).
+ Status Try(const std::string& path);
+
+ /// Get the count of IO operations that have already occurred.
+ ///
+ /// @return The number of IO operations executed.
+ int64_t IOCount() const;
+
+ /// Clear the state of the IOHook, including resetting IO count and
+ /// any stored exception state.
+ void Clear();
+
+ private:
+ class Impl;
+
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/factories/singleton.cpp
b/src/paimon/common/factories/singleton.cpp
new file mode 100644
index 0000000..a973225
--- /dev/null
+++ b/src/paimon/common/factories/singleton.cpp
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2014-present Alibaba Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Adapted from Alibaba Havenask
+//
https://github.com/alibaba/havenask/blob/main/aios/storage/indexlib/util/Singleton.h
+
+#include "paimon/factories/singleton.h"
+
+#include <mutex>
+
+#include "paimon/common/factories/io_hook.h"
+#include "paimon/factories/factory_creator.h"
+
+namespace paimon {
+
+template <typename T, typename InstPolicy>
+T* Singleton<T, InstPolicy>::GetInstance() {
+ static T* ptr;
+ static std::mutex mutex;
+ if (PAIMON_UNLIKELY(!ptr)) {
+ std::lock_guard<std::mutex> lg(mutex);
+ if (!ptr) {
+ InstPolicy::Create(ptr);
+ }
+ }
+ return const_cast<T*>(ptr);
+}
+
+template class Singleton<FactoryCreator>;
+template class Singleton<IOHook>;
+
+} // namespace paimon
diff --git a/src/paimon/common/fs/file_system.cpp
b/src/paimon/common/fs/file_system.cpp
new file mode 100644
index 0000000..23895fc
--- /dev/null
+++ b/src/paimon/common/fs/file_system.cpp
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/file_system.h"
+
+#include <set>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/macros.h"
+
+namespace paimon {
+FileSystem::~FileSystem() = default;
+
+Result<bool> FileSystem::IsObjectStore(const std::string& path_str) {
+ static const std::set<std::string> non_object_store_schemes{"", "file",
"hdfs", "dfs"};
+ PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_str));
+ auto path_scheme = StringUtils::ToLowerCase(path.scheme);
+ if (non_object_store_schemes.count(path_scheme)) {
+ return false;
+ }
+ return true;
+}
+
+Status FileSystem::ReadFile(const std::string& path, std::string* content) {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> in, Open(path));
+ if (PAIMON_UNLIKELY(in == nullptr)) {
+ return Status::Invalid("input stream is not suppose to be a null
pointer, path: ", path);
+ }
+ {
+ ScopeGuard guard([&in]() -> void {
+ Status s = in->Close();
+ (void)s;
+ });
+ PAIMON_ASSIGN_OR_RAISE(uint64_t length, in->Length());
+ content->resize(length);
+ PAIMON_ASSIGN_OR_RAISE(int32_t read_length, in->Read(content->data(),
length));
+ if (read_length != static_cast<int32_t>(length)) {
+ return Status::IOError(fmt::format("path {}, expect read len {},
actual read len {}",
+ path, length, read_length));
+ }
+ }
+ return Status::OK();
+}
+
+Status FileSystem::WriteFile(const std::string& path, const std::string&
content, bool overwrite) {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<OutputStream> out, Create(path,
overwrite));
+ if (PAIMON_UNLIKELY(out == nullptr)) {
+ return Status::Invalid("output stream is not suppose to be a null
pointer, path: ", path);
+ }
+ {
+ ScopeGuard guard([&out]() -> void {
+ Status s = out->Close();
+ (void)s;
+ });
+ int32_t length = content.size();
+ PAIMON_ASSIGN_OR_RAISE(int32_t write_length,
out->Write(content.data(), length));
+ if (write_length != length) {
+ return Status::IOError(fmt::format("path {}, expect write len {},
actual write len {}",
+ path, length, write_length));
+ }
+ PAIMON_RETURN_NOT_OK(out->Flush());
+ }
+ return Status::OK();
+}
+
+Status FileSystem::AtomicStore(const std::string& path, const std::string&
content) {
+ // do not support overwrite for now
+ PAIMON_ASSIGN_OR_RAISE(std::string tmp_file_path,
PathUtil::CreateTempPath(path));
+ ScopeGuard guard([&]() {
+ Status s = Delete(tmp_file_path);
+ (void)s;
+ });
+ PAIMON_RETURN_NOT_OK(WriteFile(tmp_file_path, content,
/*overwrite=*/false));
+ PAIMON_RETURN_NOT_OK(Rename(tmp_file_path, path));
+ guard.Release();
+ return Status::OK();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/fs/file_system_factory.cpp
b/src/paimon/common/fs/file_system_factory.cpp
new file mode 100644
index 0000000..5c8da3e
--- /dev/null
+++ b/src/paimon/common/fs/file_system_factory.cpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/fs/file_system_factory.h"
+
+#include "fmt/format.h"
+#include "paimon/factories/factory_creator.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+Result<std::unique_ptr<FileSystem>> FileSystemFactory::Get(
+ const std::string& identifier, const std::string& path,
+ const std::map<std::string, std::string>& fs_options) {
+ auto factory_creator = FactoryCreator::GetInstance();
+ auto factory = factory_creator->Create(identifier);
+ if (factory == nullptr) {
+ return Status::Invalid(
+ fmt::format("Create factory failed with identifier '{}'.",
identifier));
+ }
+ auto file_system_factory = dynamic_cast<FileSystemFactory*>(factory);
+ if (file_system_factory == nullptr) {
+ return Status::Invalid(
+ fmt::format("Failed to cast file system factory with identifier
'{}'.", identifier));
+ }
+ return file_system_factory->Create(path, fs_options);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/fs/resolving_file_system.cpp
b/src/paimon/common/fs/resolving_file_system.cpp
new file mode 100644
index 0000000..69fc282
--- /dev/null
+++ b/src/paimon/common/fs/resolving_file_system.cpp
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/fs/resolving_file_system.h"
+
+#include <mutex>
+#include <shared_mutex>
+#include <utility>
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/fs/file_system_factory.h"
+
+namespace paimon {
+
+ResolvingFileSystem::ResolvingFileSystem(
+ const std::map<std::string, std::string>& scheme_to_fs_identifier,
+ const std::string& default_fs_identifier, const std::map<std::string,
std::string>& options)
+ : scheme_to_fs_identifier_(scheme_to_fs_identifier),
+ default_fs_identifier_(default_fs_identifier),
+ options_(options) {
+ // local file's scheme may be 'file' or empty
+ auto identifier_iter = scheme_to_fs_identifier_.find("file");
+ if (identifier_iter != scheme_to_fs_identifier_.end()) {
+ scheme_to_fs_identifier_[""] = identifier_iter->second;
+ }
+}
+
+Result<std::shared_ptr<FileSystem>> ResolvingFileSystem::GetRealFileSystem(
+ const std::string& uri) const {
+ PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(uri));
+
+ // Try to get file system from cache with shared lock (read lock)
+ {
+ std::shared_lock<std::shared_mutex> read_lock(fs_cache_mutex_);
+ auto fs_iter = fs_cache_.find({path.scheme, path.authority});
+ if (fs_iter != fs_cache_.end()) {
+ return fs_iter->second;
+ }
+ }
+
+ // Cache miss, create file system and set it to cache with exclusive lock
(write lock)
+ std::unique_lock<std::shared_mutex> write_lock(fs_cache_mutex_);
+
+ // Double-check pattern: check again after acquiring write lock
+ auto fs_iter = fs_cache_.find({path.scheme, path.authority});
+ if (fs_iter != fs_cache_.end()) {
+ return fs_iter->second;
+ }
+
+ // Create file system
+ std::string identifier = default_fs_identifier_;
+ auto identifier_iter = scheme_to_fs_identifier_.find(path.scheme);
+ if (identifier_iter != scheme_to_fs_identifier_.end()) {
+ identifier = identifier_iter->second;
+ }
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get(identifier, uri, options_));
+ fs_cache_.emplace(std::make_pair(path.scheme, path.authority), fs);
+ return fs;
+}
+
+Result<std::unique_ptr<InputStream>> ResolvingFileSystem::Open(const
std::string& path) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
GetRealFileSystem(path));
+ return fs->Open(path);
+}
+
+Result<std::unique_ptr<OutputStream>> ResolvingFileSystem::Create(const
std::string& path,
+ bool
overwrite) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
GetRealFileSystem(path));
+ return fs->Create(path, overwrite);
+}
+
+Status ResolvingFileSystem::Mkdirs(const std::string& path) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
GetRealFileSystem(path));
+ return fs->Mkdirs(path);
+}
+
+Status ResolvingFileSystem::Rename(const std::string& src, const std::string&
dst) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
GetRealFileSystem(src));
+ return fs->Rename(src, dst);
+}
+
+Status ResolvingFileSystem::Delete(const std::string& path, bool recursive)
const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
GetRealFileSystem(path));
+ return fs->Delete(path, recursive);
+}
+
+Result<std::unique_ptr<FileStatus>> ResolvingFileSystem::GetFileStatus(
+ const std::string& path) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
GetRealFileSystem(path));
+ return fs->GetFileStatus(path);
+}
+
+Status ResolvingFileSystem::ListDir(
+ const std::string& directory,
+ std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
GetRealFileSystem(directory));
+ return fs->ListDir(directory, file_status_list);
+}
+
+Status ResolvingFileSystem::ListFileStatus(
+ const std::string& path, std::vector<std::unique_ptr<FileStatus>>*
file_status_list) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
GetRealFileSystem(path));
+ return fs->ListFileStatus(path, file_status_list);
+}
+
+Result<bool> ResolvingFileSystem::Exists(const std::string& path) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileSystem> fs,
GetRealFileSystem(path));
+ return fs->Exists(path);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/fs/resolving_file_system.h
b/src/paimon/common/fs/resolving_file_system.h
new file mode 100644
index 0000000..2c5c643
--- /dev/null
+++ b/src/paimon/common/fs/resolving_file_system.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <shared_mutex>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+/// An implementation of `FileSystem` that supports multiple file system
schemes. It
+/// dynamically selects the appropriate `FileSystem` based on the URI scheme
of the given path.
+class ResolvingFileSystem : public FileSystem {
+ public:
+ ResolvingFileSystem(const std::map<std::string, std::string>&
scheme_to_fs_identifier,
+ const std::string& default_fs_identifier,
+ const std::map<std::string, std::string>& options);
+ ~ResolvingFileSystem() override = default;
+
+ Result<std::unique_ptr<InputStream>> Open(const std::string& path) const
override;
+ Result<std::unique_ptr<OutputStream>> Create(const std::string& path,
+ bool overwrite) const
override;
+
+ Status Mkdirs(const std::string& path) const override;
+ Status Rename(const std::string& src, const std::string& dst) const
override;
+ Status Delete(const std::string& path, bool recursive = true) const
override;
+ Result<std::unique_ptr<FileStatus>> GetFileStatus(const std::string& path)
const override;
+ Status ListDir(const std::string& directory,
+ std::vector<std::unique_ptr<BasicFileStatus>>*
file_status_list) const override;
+ Status ListFileStatus(
+ const std::string& path,
+ std::vector<std::unique_ptr<FileStatus>>* file_status_list) const
override;
+ Result<bool> Exists(const std::string& path) const override;
+
+ private:
+ Result<std::shared_ptr<FileSystem>> GetRealFileSystem(const std::string&
uri) const;
+
+ private:
+ // e.g.: {{"file", "local"}, {"oss", "jindo"}}
+ std::map<std::string, std::string> scheme_to_fs_identifier_;
+ std::string default_fs_identifier_;
+ std::map<std::string, std::string> options_;
+ // {scheme, authority} -> FileSystem
+ mutable std::map<std::pair<std::string, std::string>,
std::shared_ptr<FileSystem>> fs_cache_;
+ // Read-write lock for fs_cache_ thread safety
+ mutable std::shared_mutex fs_cache_mutex_;
+};
+
+} // namespace paimon