This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ee4a84a  feat(core): introduce storage metadata components (#62)
ee4a84a is described below

commit ee4a84a3f1d5afabab7f5c15f0ec6316910c64e2
Author: Zhang Jiawei <[email protected]>
AuthorDate: Tue Jun 9 15:45:25 2026 +0800

    feat(core): introduce storage metadata components (#62)
---
 src/paimon/core/disk/file_channel_manager.h        |  92 ++++++++
 src/paimon/core/disk/file_io_channel.cpp           |  76 +++++++
 src/paimon/core/disk/file_io_channel.h             |  78 +++++++
 src/paimon/core/disk/io_manager.h                  |  77 +++++++
 src/paimon/core/disk/io_manager_test.cpp           |  81 +++++++
 src/paimon/core/memory/writer_memory_manager.cpp   | 112 ++++++++++
 src/paimon/core/memory/writer_memory_manager.h     |  62 ++++++
 .../core/memory/writer_memory_manager_test.cpp     | 248 +++++++++++++++++++++
 src/paimon/core/partition/partition_info.h         |  80 +++++++
 src/paimon/core/partition/partition_statistics.h   | 140 ++++++++++++
 .../core/partition/partition_statistics_test.cpp   |  65 ++++++
 src/paimon/core/tag/tag.cpp                        | 116 ++++++++++
 src/paimon/core/tag/tag.h                          |  89 ++++++++
 src/paimon/core/tag/tag_test.cpp                   | 247 ++++++++++++++++++++
 src/paimon/core/view/view.h                        |  58 +++++
 15 files changed, 1621 insertions(+)

diff --git a/src/paimon/core/disk/file_channel_manager.h 
b/src/paimon/core/disk/file_channel_manager.h
new file mode 100644
index 0000000..6c03efc
--- /dev/null
+++ b/src/paimon/core/disk/file_channel_manager.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <random>
+#include <string>
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/uuid.h"
+#include "paimon/core/disk/file_io_channel.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class FileChannelManager {
+ public:
+    static Result<std::unique_ptr<FileChannelManager>> Create(
+        const std::string& tmp_dir, const std::string& prefix,
+        const std::shared_ptr<FileSystem>& file_system) {
+        std::string uuid;
+        if (!UUID::Generate(&uuid)) {
+            return Status::Invalid("Failed to generate UUID for 
FileChannelManager.");
+        }
+        std::string spill_dir = PathUtil::JoinPath(tmp_dir, "paimon-" + prefix 
+ "-" + uuid);
+
+        PAIMON_RETURN_NOT_OK(file_system->Mkdirs(spill_dir));
+
+        std::random_device rd;
+        std::mt19937 random(rd());
+
+        return std::unique_ptr<FileChannelManager>(
+            new FileChannelManager(spill_dir, std::move(random), file_system));
+    }
+
+    ~FileChannelManager() {
+        if (!spill_dir_.empty() && fs_ != nullptr) {
+            [[maybe_unused]] auto status = fs_->Delete(spill_dir_, 
/*recursive=*/true);
+        }
+    }
+
+    FileChannelManager(const FileChannelManager&) = delete;
+    FileChannelManager& operator=(const FileChannelManager&) = delete;
+
+    FileIOChannel::ID CreateChannel() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return FileIOChannel::ID(spill_dir_, &random_);
+    }
+
+    FileIOChannel::ID CreateChannel(const std::string& prefix) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return FileIOChannel::ID(spill_dir_, prefix, &random_);
+    }
+
+    std::shared_ptr<FileIOChannel::Enumerator> CreateChannelEnumerator() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return std::make_shared<FileIOChannel::Enumerator>(spill_dir_, 
&random_);
+    }
+
+    const std::string& GetSpillDir() const {
+        return spill_dir_;
+    }
+
+ private:
+    FileChannelManager(const std::string& spill_dir, std::mt19937&& random,
+                       const std::shared_ptr<FileSystem>& fs)
+        : spill_dir_(spill_dir), random_(std::move(random)), fs_(fs) {}
+    std::string spill_dir_;
+    std::mutex mutex_;
+    std::mt19937 random_;
+    std::shared_ptr<FileSystem> fs_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/disk/file_io_channel.cpp 
b/src/paimon/core/disk/file_io_channel.cpp
new file mode 100644
index 0000000..8276461
--- /dev/null
+++ b/src/paimon/core/disk/file_io_channel.cpp
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/disk/file_io_channel.h"
+
+#include <iomanip>
+#include <sstream>
+#include <utility>
+
+#include "paimon/common/utils/path_util.h"
+
+namespace paimon {
+std::string FileIOChannel::GenerateRandomHexString(std::mt19937* random) {
+    std::uniform_int_distribution<int32_t> dist(0, 255);
+    std::ostringstream hex_stream;
+    hex_stream << std::hex << std::setfill('0');
+    for (int32_t i = 0; i < kRandomBytesLength; ++i) {
+        hex_stream << std::setw(2) << dist(*random);
+    }
+    return hex_stream.str();
+}
+
+FileIOChannel::ID::ID(const std::string& path) : path_(path) {}
+
+FileIOChannel::ID::ID(const std::string& base_path, std::mt19937* random)
+    : path_(PathUtil::JoinPath(base_path, GenerateRandomHexString(random) + 
".channel")) {}
+
+FileIOChannel::ID::ID(const std::string& base_path, const std::string& prefix, 
std::mt19937* random)
+    : path_(PathUtil::JoinPath(base_path,
+                               prefix + "-" + GenerateRandomHexString(random) 
+ ".channel")) {}
+
+const std::string& FileIOChannel::ID::GetPath() const {
+    return path_;
+}
+
+bool FileIOChannel::ID::operator==(const ID& other) const {
+    return path_ == other.path_;
+}
+
+bool FileIOChannel::ID::operator!=(const ID& other) const {
+    return !(*this == other);
+}
+
+size_t FileIOChannel::ID::Hash::operator()(const ID& id) const {
+    return std::hash<std::string>{}(id.path_);
+}
+
+FileIOChannel::Enumerator::Enumerator(const std::string& base_path, 
std::mt19937* random)
+    : path_(base_path), name_prefix_(GenerateRandomHexString(random)) {}
+
+FileIOChannel::ID FileIOChannel::Enumerator::Next() {
+    std::ostringstream filename;
+    filename << name_prefix_ << "." << std::setfill('0') << std::setw(6) << 
(local_counter_++)
+             << ".channel";
+
+    std::string full_path = PathUtil::JoinPath(path_, filename.str());
+    return ID(full_path);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/disk/file_io_channel.h 
b/src/paimon/core/disk/file_io_channel.h
new file mode 100644
index 0000000..652056c
--- /dev/null
+++ b/src/paimon/core/disk/file_io_channel.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <random>
+#include <string>
+
+#include "paimon/visibility.h"
+
+namespace paimon {
+class PAIMON_EXPORT FileIOChannel {
+ public:
+    class PAIMON_EXPORT ID {
+     public:
+        ID() = default;
+
+        explicit ID(const std::string& path);
+
+        ID(const std::string& base_path, std::mt19937* random);
+
+        ID(const std::string& base_path, const std::string& prefix, 
std::mt19937* random);
+
+        const std::string& GetPath() const;
+
+        bool operator==(const ID& other) const;
+
+        bool operator!=(const ID& other) const;
+
+        struct Hash {
+            size_t operator()(const ID& id) const;
+        };
+
+     private:
+        std::string path_;
+    };
+
+ private:
+    static constexpr int32_t kRandomBytesLength = 16;
+    static std::string GenerateRandomHexString(std::mt19937* random);
+
+ public:
+    class PAIMON_EXPORT Enumerator {
+     public:
+        Enumerator(const std::string& base_path, std::mt19937* random);
+
+        ID Next();
+
+     private:
+        std::string path_;
+        std::string name_prefix_;
+        uint64_t local_counter_{0};
+    };
+};
+
+struct FileChannelInfo {
+    FileIOChannel::ID channel_id;
+    int64_t file_size;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/disk/io_manager.h 
b/src/paimon/core/disk/io_manager.h
new file mode 100644
index 0000000..4b79771
--- /dev/null
+++ b/src/paimon/core/disk/io_manager.h
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include "paimon/core/disk/file_channel_manager.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class IOManager {
+ public:
+    IOManager(const std::string& tmp_dir, const std::shared_ptr<FileSystem>& 
file_system)
+        : tmp_dir_(tmp_dir), file_system_(file_system) {}
+
+    const std::string& GetTempDir() const {
+        return tmp_dir_;
+    }
+
+    Result<FileIOChannel::ID> CreateChannel() {
+        PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager());
+        return manager->CreateChannel();
+    }
+
+    Result<FileIOChannel::ID> CreateChannel(const std::string& prefix) {
+        PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager());
+        return manager->CreateChannel(prefix);
+    }
+
+    Result<std::shared_ptr<FileIOChannel::Enumerator>> 
CreateChannelEnumerator() {
+        PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager());
+        return manager->CreateChannelEnumerator();
+    }
+
+    Result<std::string> GetSpillDir() {
+        PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager());
+        return manager->GetSpillDir();
+    }
+
+ private:
+    Result<FileChannelManager*> GetFileChannelManager() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (file_channel_manager_ == nullptr) {
+            PAIMON_ASSIGN_OR_RAISE(
+                file_channel_manager_,
+                FileChannelManager::Create(tmp_dir_, kDirNamePrefix, 
file_system_));
+        }
+        return file_channel_manager_.get();
+    }
+
+    static constexpr char kDirNamePrefix[] = "io";
+    std::string tmp_dir_;
+    std::shared_ptr<FileSystem> file_system_;
+    std::mutex mutex_;
+    std::unique_ptr<FileChannelManager> file_channel_manager_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/disk/io_manager_test.cpp 
b/src/paimon/core/disk/io_manager_test.cpp
new file mode 100644
index 0000000..3290042
--- /dev/null
+++ b/src/paimon/core/disk/io_manager_test.cpp
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/disk/io_manager.h"
+
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(IOManagerTest, CreateShouldReturnManagerWithGivenTempDir) {
+    auto tmp_dir = UniqueTestDirectory::Create();
+
+    auto manager = std::make_unique<IOManager>(tmp_dir->Str(), 
tmp_dir->GetFileSystem());
+    ASSERT_NE(manager, nullptr);
+    ASSERT_EQ(manager->GetTempDir(), tmp_dir->Str());
+}
+
+TEST(IOManagerTest, CreateChannelShouldReturnValidAndUniquePaths) {
+    auto tmp_dir = UniqueTestDirectory::Create();
+    auto manager = std::make_shared<IOManager>(tmp_dir->Str(), 
tmp_dir->GetFileSystem());
+    const std::string prefix = "spill";
+
+    ASSERT_OK_AND_ASSIGN(auto channel1, manager->CreateChannel());
+    ASSERT_TRUE(StringUtils::StartsWith(channel1.GetPath(), tmp_dir->Str() + 
"/paimon-io-"));
+    ASSERT_TRUE(StringUtils::EndsWith(channel1.GetPath(), ".channel"));
+    ASSERT_EQ(PathUtil::GetName(channel1.GetPath()).size(), 32 + 
std::string(".channel").size());
+
+    ASSERT_OK_AND_ASSIGN(auto channel2, manager->CreateChannel(prefix));
+    ASSERT_TRUE(StringUtils::StartsWith(PathUtil::GetName(channel2.GetPath()), 
prefix + "-"));
+}
+
+TEST(IOManagerTest, 
CreateChannelEnumeratorShouldReturnSequentialAndUniquePaths) {
+    auto tmp_dir = UniqueTestDirectory::Create();
+    auto manager = std::make_shared<IOManager>(tmp_dir->Str(), 
tmp_dir->GetFileSystem());
+
+    ASSERT_OK_AND_ASSIGN(auto enumerator, manager->CreateChannelEnumerator());
+
+    for (int i = 0; i < 10; ++i) {
+        auto channel_id = enumerator->Next();
+        ASSERT_TRUE(StringUtils::StartsWith(channel_id.GetPath(), 
tmp_dir->Str() + "/paimon-io-"));
+        std::string counter_str = std::to_string(i);
+        std::string padded_counter = std::string(6 - counter_str.size(), '0') 
+ counter_str;
+        ASSERT_TRUE(StringUtils::EndsWith(channel_id.GetPath(), "." + 
padded_counter + ".channel"));
+    }
+}
+
+TEST(IOManagerTest, GetSpillDirShouldReturnPaimonIoSubdirectory) {
+    auto tmp_dir = UniqueTestDirectory::Create();
+    auto manager = std::make_shared<IOManager>(tmp_dir->Str(), 
tmp_dir->GetFileSystem());
+
+    ASSERT_OK_AND_ASSIGN(std::string spill_dir, manager->GetSpillDir());
+    ASSERT_TRUE(StringUtils::StartsWith(spill_dir, tmp_dir->Str() + 
"/paimon-io-"));
+    ASSERT_FALSE(StringUtils::EndsWith(spill_dir, "/"));
+
+    ASSERT_OK_AND_ASSIGN(auto channel, manager->CreateChannel());
+    ASSERT_TRUE(StringUtils::StartsWith(channel.GetPath(), spill_dir + "/"));
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/memory/writer_memory_manager.cpp 
b/src/paimon/core/memory/writer_memory_manager.cpp
new file mode 100644
index 0000000..3c4ea0c
--- /dev/null
+++ b/src/paimon/core/memory/writer_memory_manager.cpp
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/memory/writer_memory_manager.h"
+
+#include <cassert>
+
+#include "fmt/format.h"
+#include "paimon/core/utils/batch_writer.h"
+
+namespace paimon {
+
+void WriterMemoryManager::RegisterWriter(BatchWriter* writer) {
+    UpdateWriterMemory(writer);
+}
+
+void WriterMemoryManager::UnregisterWriter(BatchWriter* writer) {
+    auto iter = writer_memory_.find(writer);
+    if (iter == writer_memory_.end()) {
+        return;
+    }
+
+    assert(total_memory_ >= iter->second);
+    total_memory_ -= iter->second;
+    writer_memory_.erase(iter);
+}
+
+void WriterMemoryManager::RefreshWriterMemory(BatchWriter* writer) {
+    UpdateWriterMemory(writer);
+}
+
+Status WriterMemoryManager::OnWriteCompleted(BatchWriter* writer) {
+    UpdateWriterMemory(writer);
+    if (total_memory_ < memory_limit_) {
+        return Status::OK();
+    }
+
+    return ShrinkToLimit();
+}
+
+void WriterMemoryManager::UpdateWriterMemory(BatchWriter* writer) {
+    uint64_t current_memory_usage = writer->GetMemoryUsage();
+    auto [iter, inserted] = writer_memory_.emplace(writer, 
current_memory_usage);
+    if (inserted) {
+        total_memory_ += current_memory_usage;
+    } else {
+        uint64_t previous_memory = iter->second;
+        if (current_memory_usage >= previous_memory) {
+            total_memory_ += (current_memory_usage - previous_memory);
+        } else {
+            assert(total_memory_ >= previous_memory - current_memory_usage);
+            total_memory_ -= (previous_memory - current_memory_usage);
+        }
+        iter->second = current_memory_usage;
+    }
+}
+
+WriterMemoryManager::Candidate WriterMemoryManager::PickLargest() const {
+    Candidate candidate;
+    for (const auto& [writer, memory] : writer_memory_) {
+        if (memory > candidate.memory) {
+            candidate = {writer, memory};
+        }
+    }
+    return candidate;
+}
+
+Status WriterMemoryManager::ShrinkToLimit() {
+    while (true) {
+        if (total_memory_ < memory_limit_) {
+            return Status::OK();
+        }
+
+        Candidate picked = PickLargest();
+        if (picked.memory == 0) {
+            return Status::Invalid(
+                fmt::format("Unable to release memory to below the 
write-buffer-size limit ({} "
+                            "bytes), this might be a bug.",
+                            memory_limit_));
+        }
+        BatchWriter* candidate = picked.writer;
+        uint64_t before_memory = picked.memory;
+        PAIMON_RETURN_NOT_OK(candidate->FlushMemory());
+
+        UpdateWriterMemory(candidate);
+        uint64_t after_memory = candidate->GetMemoryUsage();
+        if (after_memory >= before_memory) {
+            return Status::Invalid(fmt::format(
+                "Before flushing memory, writer had {} bytes of memory 
allocated, After flushing "
+                "memory, writer still has {} bytes of memory allocated, this 
might be a bug.",
+                before_memory, after_memory));
+        }
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/memory/writer_memory_manager.h 
b/src/paimon/core/memory/writer_memory_manager.h
new file mode 100644
index 0000000..811667c
--- /dev/null
+++ b/src/paimon/core/memory/writer_memory_manager.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <unordered_map>
+
+#include "paimon/status.h"
+
+namespace paimon {
+
+class BatchWriter;
+
+/// Coordinates global write-buffer memory across managed writers. Used in 
`AbstractFileStoreWrite`.
+/// @note This class is not thread-safe.
+class WriterMemoryManager {
+ public:
+    explicit WriterMemoryManager(uint64_t memory_limit) : 
memory_limit_(memory_limit) {}
+
+    /// Register a writer when create a new `BatchWriter` in 
`AbstractFileStoreWrite::GetWriter()`
+    void RegisterWriter(BatchWriter* writer);
+    /// Unregister a writer when the `BatchWriter` has been erased in 
`AbstractFileStoreWrite`
+    void UnregisterWriter(BatchWriter* writer);
+    /// Refresh the memory usage of a writer when after 
`BatchWriter::PrepareCommit()`
+    void RefreshWriterMemory(BatchWriter* writer);
+    /// Check if the total memory usage exceeds the limit after 
`BatchWriter::Write()`, and trigger
+    /// flush if needed.
+    Status OnWriteCompleted(BatchWriter* writer);
+
+ private:
+    struct Candidate {
+        BatchWriter* writer = nullptr;
+        uint64_t memory = 0;
+    };
+
+    void UpdateWriterMemory(BatchWriter* writer);
+    Candidate PickLargest() const;
+    Status ShrinkToLimit();
+
+    const uint64_t memory_limit_;
+    uint64_t total_memory_ = 0;
+    std::unordered_map<BatchWriter*, uint64_t> writer_memory_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/memory/writer_memory_manager_test.cpp 
b/src/paimon/core/memory/writer_memory_manager_test.cpp
new file mode 100644
index 0000000..1a91bf6
--- /dev/null
+++ b/src/paimon/core/memory/writer_memory_manager_test.cpp
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/memory/writer_memory_manager.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/core/io/compact_increment.h"
+#include "paimon/core/io/data_increment.h"
+#include "paimon/core/utils/batch_writer.h"
+#include "paimon/core/utils/commit_increment.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+namespace {
+
+class FakeBatchWriter : public BatchWriter {
+ public:
+    FakeBatchWriter(const std::string& name, std::vector<std::string>* 
flush_history)
+        : name_(name), flush_history_(flush_history) {}
+
+    void SetMemoryUsage(uint64_t memory_usage) {
+        memory_usage_ = memory_usage;
+    }
+
+    void SetFlushReductions(std::vector<uint64_t> flush_reductions) {
+        flush_reductions_ = std::move(flush_reductions);
+    }
+
+    uint64_t GetMemoryUsage() const override {
+        return memory_usage_;
+    }
+
+    Status FlushMemory() override {
+        uint64_t reduction = memory_usage_;
+        if (flush_calls_ < static_cast<int32_t>(flush_reductions_.size())) {
+            reduction = flush_reductions_[flush_calls_];
+        }
+        reduction = std::min(reduction, memory_usage_);
+        memory_usage_ -= reduction;
+
+        if (reduction > 0) {
+            if (flush_history_ != nullptr) {
+                flush_history_->push_back(name_);
+            }
+            ++flush_calls_;
+        }
+        return Status::OK();
+    }
+
+    Status Write(std::unique_ptr<RecordBatch>&& batch) override {
+        (void)batch;
+        return Status::OK();
+    }
+
+    Status Compact(bool full_compaction) override {
+        (void)full_compaction;
+        return Status::OK();
+    }
+
+    Result<CommitIncrement> PrepareCommit(bool wait_compaction) override {
+        (void)wait_compaction;
+        return CommitIncrement(DataIncrement({}, {}, {}), CompactIncrement({}, 
{}, {}), nullptr);
+    }
+
+    Result<bool> CompactNotCompleted() override {
+        return false;
+    }
+
+    Status Sync() override {
+        return Status::OK();
+    }
+
+    Status Close() override {
+        return Status::OK();
+    }
+
+    std::shared_ptr<Metrics> GetMetrics() const override {
+        return nullptr;
+    }
+
+ private:
+    std::string name_;
+    std::vector<std::string>* flush_history_;
+    std::vector<uint64_t> flush_reductions_;
+    uint64_t memory_usage_ = 0;
+    int32_t flush_calls_ = 0;
+};
+
+}  // namespace
+
+TEST(WriterMemoryManagerTest, DoesNotFlushWhenMemoryIsBelowLimit) {
+    WriterMemoryManager manager(/*memory_limit=*/100);
+    std::vector<std::string> flush_history;
+    FakeBatchWriter writer("writer", &flush_history);
+    manager.RegisterWriter(&writer);
+
+    writer.SetMemoryUsage(40);
+    ASSERT_OK(manager.OnWriteCompleted(&writer));
+    ASSERT_TRUE(flush_history.empty());
+}
+
+TEST(WriterMemoryManagerTest, UnregisterWriterRemovesMemoryFromLedger) {
+    WriterMemoryManager manager(/*memory_limit=*/100);
+    std::vector<std::string> flush_history;
+    FakeBatchWriter writer_a("writer_a", &flush_history);
+    manager.RegisterWriter(&writer_a);
+    FakeBatchWriter writer_b("writer_b", &flush_history);
+    manager.RegisterWriter(&writer_b);
+
+    writer_a.SetMemoryUsage(80);
+    manager.RefreshWriterMemory(&writer_a);
+    manager.UnregisterWriter(&writer_a);
+
+    writer_b.SetMemoryUsage(30);
+    ASSERT_OK(manager.OnWriteCompleted(&writer_b));
+    ASSERT_TRUE(flush_history.empty());
+}
+
+TEST(WriterMemoryManagerTest, RefreshWriterMemoryUpdatesLedgerWithoutFlushing) 
{
+    WriterMemoryManager manager(/*memory_limit=*/80);
+    std::vector<std::string> flush_history;
+    FakeBatchWriter writer_a("writer_a", &flush_history);
+    manager.RegisterWriter(&writer_a);
+    FakeBatchWriter writer_b("writer_b", &flush_history);
+    manager.RegisterWriter(&writer_b);
+
+    writer_a.SetMemoryUsage(60);
+    manager.RefreshWriterMemory(&writer_a);
+    writer_b.SetMemoryUsage(30);
+    manager.RefreshWriterMemory(&writer_b);
+
+    writer_a.SetMemoryUsage(10);
+    ASSERT_OK(manager.OnWriteCompleted(&writer_a));
+    ASSERT_TRUE(flush_history.empty());
+}
+
+TEST(WriterMemoryManagerTest, FlushWriterMemoryWithMultipleWriters) {
+    WriterMemoryManager manager(/*memory_limit=*/100);
+    std::vector<std::string> flush_history;
+    FakeBatchWriter writer_a("writer_a", &flush_history);
+    manager.RegisterWriter(&writer_a);
+    FakeBatchWriter writer_b("writer_b", &flush_history);
+    manager.RegisterWriter(&writer_b);
+    FakeBatchWriter writer_c("writer_c", &flush_history);
+    manager.RegisterWriter(&writer_c);
+
+    writer_a.SetMemoryUsage(60);
+    ASSERT_OK(manager.OnWriteCompleted(&writer_a));
+    writer_b.SetMemoryUsage(30);
+    ASSERT_OK(manager.OnWriteCompleted(&writer_b));
+    writer_c.SetMemoryUsage(50);
+    ASSERT_OK(manager.OnWriteCompleted(&writer_c));
+    writer_a.SetMemoryUsage(30);
+    ASSERT_OK(manager.OnWriteCompleted(&writer_a));
+    writer_c.SetMemoryUsage(45);
+    ASSERT_OK(manager.OnWriteCompleted(&writer_c));
+
+    ASSERT_EQ(flush_history, std::vector<std::string>({"writer_a", "writer_c", 
"writer_c"}));
+    ASSERT_EQ(writer_a.GetMemoryUsage(), 30);
+    ASSERT_EQ(writer_b.GetMemoryUsage(), 30);
+    ASSERT_EQ(writer_c.GetMemoryUsage(), 0);
+}
+
+TEST(WriterMemoryManagerTest, ReclaimsCallerWhenCallerIsLargestWriter) {
+    WriterMemoryManager manager(/*memory_limit=*/100);
+    std::vector<std::string> flush_history;
+    FakeBatchWriter writer_a("writer_a", &flush_history);
+    manager.RegisterWriter(&writer_a);
+    FakeBatchWriter writer_b("writer_b", &flush_history);
+    manager.RegisterWriter(&writer_b);
+
+    writer_a.SetMemoryUsage(20);
+    manager.RefreshWriterMemory(&writer_a);
+
+    writer_b.SetMemoryUsage(90);
+    ASSERT_OK(manager.OnWriteCompleted(&writer_b));
+
+    ASSERT_EQ(flush_history, std::vector<std::string>({"writer_b"}));
+    ASSERT_EQ(writer_a.GetMemoryUsage(), 20);
+    ASSERT_EQ(writer_b.GetMemoryUsage(), 0);
+}
+
+TEST(WriterMemoryManagerTest, ContinuesReclaimingUntilBelowGlobalLimit) {
+    WriterMemoryManager manager(/*memory_limit=*/61);
+    std::vector<std::string> flush_history;
+    FakeBatchWriter writer_a("writer_a", &flush_history);
+    manager.RegisterWriter(&writer_a);
+    FakeBatchWriter writer_b("writer_b", &flush_history);
+    manager.RegisterWriter(&writer_b);
+
+    writer_a.SetMemoryUsage(90);
+    writer_a.SetFlushReductions({20, 20, 20});
+    manager.RefreshWriterMemory(&writer_a);
+
+    writer_b.SetMemoryUsage(60);
+    writer_b.SetFlushReductions({30});
+    ASSERT_OK(manager.OnWriteCompleted(&writer_b));
+
+    ASSERT_EQ(flush_history,
+              std::vector<std::string>({"writer_a", "writer_a", "writer_b", 
"writer_a"}));
+    ASSERT_EQ(writer_a.GetMemoryUsage(), 30);
+    ASSERT_EQ(writer_b.GetMemoryUsage(), 30);
+}
+
+TEST(WriterMemoryManagerTest, 
ReturnsConfigurationErrorWhenNoWriterCanReleaseEnoughMemory) {
+    WriterMemoryManager manager(/*memory_limit=*/100);
+    std::vector<std::string> flush_history;
+    FakeBatchWriter writer_a("writer_a", &flush_history);
+    manager.RegisterWriter(&writer_a);
+    FakeBatchWriter writer_b("writer_b", &flush_history);
+    manager.RegisterWriter(&writer_b);
+
+    writer_b.SetMemoryUsage(20);
+    manager.RefreshWriterMemory(&writer_b);
+
+    writer_a.SetMemoryUsage(120);
+    writer_a.SetFlushReductions({10, 0});
+    ASSERT_NOK_WITH_MSG(
+        manager.OnWriteCompleted(&writer_a),
+        "Before flushing memory, writer had 110 bytes of memory allocated, 
After flushing memory, "
+        "writer still has 110 bytes of memory allocated, this might be a 
bug.");
+    ASSERT_EQ(flush_history, std::vector<std::string>({"writer_a"}));
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/partition/partition_info.h 
b/src/paimon/core/partition/partition_info.h
new file mode 100644
index 0000000..f1dbc09
--- /dev/null
+++ b/src/paimon/core/partition/partition_info.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/type.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/predicate/predicate_filter.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/casting/cast_executor.h"
+
+namespace paimon {
+// ExistFieldInfo = PartitionInfo + NonPartitionInfo
+// ExistFieldInfo equals the intersection of data scheme and read schema
+// ExistFieldInfo + NonExistFieldInfo = read schema
+struct PartitionInfo {
+    // the intersection of partition schema & read schema
+    std::vector<DataField> partition_read_schema;
+
+    // indicates the idx in read schema (not data schema)
+    std::vector<int32_t> idx_in_target_read_schema;
+
+    // indicates the idx in partition schema (in BinaryRow)
+    std::vector<int32_t> idx_in_partition;
+
+    // partition predicate
+    std::shared_ptr<Predicate> partition_filter;
+};
+
+struct NonPartitionInfo {
+    // the intersection of non-partition schema and read schema
+    std::vector<DataField> non_partition_read_schema;
+    std::vector<DataField> non_partition_data_schema;
+
+    // indicates the idx in read schema (not in data schema)
+    std::vector<int32_t> idx_in_target_read_schema;
+
+    // non-partition predicate
+    std::shared_ptr<Predicate> non_partition_filter;
+    std::vector<std::shared_ptr<CastExecutor>> cast_executors;
+};
+
+struct NonExistFieldInfo {
+    // the fields in read schema but not in data schema
+    std::vector<DataField> non_exist_read_schema;
+
+    // indicates the idx in read schema (not data schema)
+    std::vector<int32_t> idx_in_target_read_schema;
+};
+
+struct ExistFieldInfo {
+    // the fields in both read schema and data schema
+    std::vector<DataField> exist_read_schema;
+    std::vector<DataField> exist_data_schema;
+
+    // indicates the idx in read schema (not data schema)
+    std::vector<int32_t> idx_in_target_read_schema;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/partition/partition_statistics.h 
b/src/paimon/core/partition/partition_statistics.h
new file mode 100644
index 0000000..67a3ac2
--- /dev/null
+++ b/src/paimon/core/partition/partition_statistics.h
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "paimon/common/utils/jsonizable.h"
+#include "paimon/common/utils/rapidjson_util.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+
+class PartitionStatistics : public Jsonizable<PartitionStatistics> {
+ public:
+    using SpecType = std::map<std::string, std::string>;
+    PartitionStatistics(const SpecType& spec, int64_t record_count, int64_t 
file_size_in_bytes,
+                        int64_t file_count, int64_t last_file_creation_time, 
int32_t total_buckets)
+        : spec_(spec),
+          record_count_(record_count),
+          file_size_in_bytes_(file_size_in_bytes),
+          file_count_(file_count),
+          last_file_creation_time_(last_file_creation_time),
+          total_buckets_(total_buckets) {}
+
+    const SpecType& Spec() const {
+        return spec_;
+    }
+    int64_t RecordCount() const {
+        return record_count_;
+    }
+    int64_t FileSizeInBytes() const {
+        return file_size_in_bytes_;
+    }
+    int64_t FileCount() const {
+        return file_count_;
+    }
+    int64_t LastFileCreationTime() const {
+        return last_file_creation_time_;
+    }
+    int32_t TotalBuckets() const {
+        return total_buckets_;
+    }
+
+    rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+        noexcept(false) override {
+        rapidjson::Value obj(rapidjson::kObjectType);
+        obj.AddMember(rapidjson::StringRef(FIELD_SPEC),
+                      RapidJsonUtil::SerializeValue(spec_, allocator).Move(), 
*allocator);
+        obj.AddMember(rapidjson::StringRef(FIELD_RECORD_COUNT),
+                      RapidJsonUtil::SerializeValue(record_count_, 
allocator).Move(), *allocator);
+        obj.AddMember(rapidjson::StringRef(FIELD_FILE_SIZE_IN_BYTES),
+                      RapidJsonUtil::SerializeValue(file_size_in_bytes_, 
allocator).Move(),
+                      *allocator);
+        obj.AddMember(rapidjson::StringRef(FIELD_FILE_COUNT),
+                      RapidJsonUtil::SerializeValue(file_count_, 
allocator).Move(), *allocator);
+        obj.AddMember(rapidjson::StringRef(FIELD_LAST_FILE_CREATION_TIME),
+                      RapidJsonUtil::SerializeValue(last_file_creation_time_, 
allocator).Move(),
+                      *allocator);
+        obj.AddMember(rapidjson::StringRef(FIELD_TOTAL_BUCKETS),
+                      RapidJsonUtil::SerializeValue(total_buckets_, 
allocator).Move(), *allocator);
+        return obj;
+    }
+
+    void FromJson(const rapidjson::Value& obj) noexcept(false) override {
+        spec_ = RapidJsonUtil::DeserializeKeyValue<SpecType>(obj, FIELD_SPEC);
+        record_count_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, 
FIELD_RECORD_COUNT);
+        file_size_in_bytes_ =
+            RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, 
FIELD_FILE_SIZE_IN_BYTES);
+        file_count_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, 
FIELD_FILE_COUNT);
+        last_file_creation_time_ =
+            RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, 
FIELD_LAST_FILE_CREATION_TIME);
+        total_buckets_ = RapidJsonUtil::DeserializeKeyValue<int32_t>(obj, 
FIELD_TOTAL_BUCKETS);
+    }
+
+    bool operator==(const PartitionStatistics& rhs) const {
+        return record_count_ == rhs.record_count_ &&
+               file_size_in_bytes_ == rhs.file_size_in_bytes_ && file_count_ 
== rhs.file_count_ &&
+               last_file_creation_time_ == rhs.last_file_creation_time_ &&
+               total_buckets_ == rhs.total_buckets_ && spec_ == rhs.spec_;
+    }
+
+    std::string ToString() const {
+        std::ostringstream oss;
+        oss << "{spec={";
+        bool first = true;
+        for (const auto& kv : spec_) {
+            if (!first) oss << ", ";
+            first = false;
+            oss << kv.first << ":" << kv.second;
+        }
+        oss << "}, recordCount=" << record_count_ << ", fileSizeInBytes=" << 
file_size_in_bytes_
+            << ", fileCount=" << file_count_
+            << ", lastFileCreationTime=" << last_file_creation_time_
+            << ", totalBuckets=" << total_buckets_ << "}";
+        return oss.str();
+    }
+
+ private:
+    JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(PartitionStatistics);
+
+ private:
+    static constexpr const char* FIELD_SPEC = "spec";
+    static constexpr const char* FIELD_RECORD_COUNT = "recordCount";
+    static constexpr const char* FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes";
+    static constexpr const char* FIELD_FILE_COUNT = "fileCount";
+    static constexpr const char* FIELD_LAST_FILE_CREATION_TIME = 
"lastFileCreationTime";
+    static constexpr const char* FIELD_TOTAL_BUCKETS = "totalBuckets";
+
+    SpecType spec_;
+    int64_t record_count_ = 0;
+    int64_t file_size_in_bytes_ = 0;
+    int64_t file_count_ = 0;
+    int64_t last_file_creation_time_ = 0;
+    int32_t total_buckets_ = 0;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/partition/partition_statistics_test.cpp 
b/src/paimon/core/partition/partition_statistics_test.cpp
new file mode 100644
index 0000000..d77276d
--- /dev/null
+++ b/src/paimon/core/partition/partition_statistics_test.cpp
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/partition/partition_statistics.h"
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class PartitionStatisticsTest : public testing::Test {
+ public:
+    std::string ReplaceAll(const std::string& str) {
+        std::string replaced_str = StringUtils::Replace(str, " ", "");
+        replaced_str = StringUtils::Replace(replaced_str, "\t", "");
+        replaced_str = StringUtils::Replace(replaced_str, "\n", "");
+        return replaced_str;
+    }
+};
+
+TEST_F(PartitionStatisticsTest, TestJsonizable) {
+    std::string json_str = R"({
+        "spec": {
+            "f1": "10",
+            "f2": "20"
+        },
+        "recordCount": 4,
+        "fileSizeInBytes": 1118,
+        "fileCount": 2,
+        "lastFileCreationTime": 1724090888727,
+        "totalBuckets": 1
+    })";
+
+    ASSERT_OK_AND_ASSIGN(PartitionStatistics partition_statistics,
+                         PartitionStatistics::FromJsonString(json_str));
+
+    PartitionStatistics expected_partition_statistics(
+        /*spec=*/{{"f1", "10"}, {"f2", "20"}}, /*record_count=*/4, 
/*file_size_in_bytes=*/1118,
+        /*file_count=*/2, /*last_file_creation_time=*/1724090888727, 
/*total_buckets=*/1);
+    ASSERT_EQ(expected_partition_statistics, partition_statistics);
+
+    ASSERT_OK_AND_ASSIGN(std::string new_json_str, 
partition_statistics.ToJsonString());
+    ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(new_json_str));
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/tag/tag.cpp b/src/paimon/core/tag/tag.cpp
new file mode 100644
index 0000000..ebf7773
--- /dev/null
+++ b/src/paimon/core/tag/tag.cpp
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/tag/tag.h"
+
+#include <cassert>
+
+#include "paimon/common/utils/rapidjson_util.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+
+Tag::Tag(const std::optional<int32_t>& version, const int64_t id, const 
int64_t schema_id,
+         const std::string& base_manifest_list,
+         const std::optional<int64_t>& base_manifest_list_size,
+         const std::string& delta_manifest_list,
+         const std::optional<int64_t>& delta_manifest_list_size,
+         const std::optional<std::string>& changelog_manifest_list,
+         const std::optional<int64_t>& changelog_manifest_list_size,
+         const std::optional<std::string>& index_manifest, const std::string& 
commit_user,
+         const int64_t commit_identifier, const CommitKind commit_kind, const 
int64_t time_millis,
+         const std::optional<std::map<int32_t, int64_t>>& log_offsets,
+         const std::optional<int64_t>& total_record_count,
+         const std::optional<int64_t>& delta_record_count,
+         const std::optional<int64_t>& changelog_record_count,
+         const std::optional<int64_t>& watermark, const 
std::optional<std::string>& statistics,
+         const std::optional<std::map<std::string, std::string>>& properties,
+         const std::optional<int64_t>& next_row_id,
+         const std::optional<std::vector<int64_t>>& tag_create_time,
+         const std::optional<double_t>& tag_time_retained)
+    : Snapshot(version, id, schema_id, base_manifest_list, 
base_manifest_list_size,
+               delta_manifest_list, delta_manifest_list_size, 
changelog_manifest_list,
+               changelog_manifest_list_size, index_manifest, commit_user, 
commit_identifier,
+               commit_kind, time_millis, log_offsets, total_record_count, 
delta_record_count,
+               changelog_record_count, watermark, statistics, properties, 
next_row_id),
+      tag_create_time_(tag_create_time),
+      tag_time_retained_(tag_time_retained) {}
+
+bool Tag::operator==(const Tag& other) const {
+    if (this == &other) {
+        return true;
+    }
+    return Snapshot::operator==(other) && tag_create_time_ == 
other.tag_create_time_ &&
+           tag_time_retained_ == other.tag_time_retained_;
+}
+
+bool Tag::TEST_Equal(const Tag& other) const {
+    if (this == &other) {
+        return true;
+    }
+
+    return Snapshot::TEST_Equal(other) && tag_create_time_ == 
other.tag_create_time_ &&
+           tag_time_retained_ == other.tag_time_retained_;
+}
+
+Result<Snapshot> Tag::TrimToSnapshot() const {
+    return Snapshot(Version(), Id(), SchemaId(), BaseManifestList(), 
BaseManifestListSize(),
+                    DeltaManifestList(), DeltaManifestListSize(), 
ChangelogManifestList(),
+                    ChangelogManifestListSize(), IndexManifest(), 
CommitUser(), CommitIdentifier(),
+                    GetCommitKind(), TimeMillis(), LogOffsets(), 
TotalRecordCount(),
+                    DeltaRecordCount(), ChangelogRecordCount(), Watermark(), 
Statistics(),
+                    Properties(), NextRowId());
+}
+
+rapidjson::Value Tag::ToJson(rapidjson::Document::AllocatorType* allocator) 
const noexcept(false) {
+    rapidjson::Value obj(rapidjson::kObjectType);
+    obj = Snapshot::ToJson(allocator);
+    if (tag_create_time_ != std::nullopt) {
+        obj.AddMember(rapidjson::StringRef(FIELD_TAG_CREATE_TIME),
+                      RapidJsonUtil::SerializeValue(tag_create_time_.value(), 
allocator).Move(),
+                      *allocator);
+    }
+    if (tag_time_retained_ != std::nullopt) {
+        obj.AddMember(rapidjson::StringRef(FIELD_TAG_TIME_RETAINED),
+                      
RapidJsonUtil::SerializeValue(tag_time_retained_.value(), allocator).Move(),
+                      *allocator);
+    }
+    return obj;
+}
+
+void Tag::FromJson(const rapidjson::Value& obj) noexcept(false) {
+    Snapshot::FromJson(obj);
+    tag_create_time_ = 
RapidJsonUtil::DeserializeKeyValue<std::optional<std::vector<int64_t>>>(
+        obj, FIELD_TAG_CREATE_TIME);
+    tag_time_retained_ =
+        RapidJsonUtil::DeserializeKeyValue<std::optional<double_t>>(obj, 
FIELD_TAG_TIME_RETAINED);
+}
+
+Result<Tag> Tag::FromPath(const std::shared_ptr<FileSystem>& fs, const 
std::string& path) {
+    std::string json_str;
+    PAIMON_RETURN_NOT_OK(fs->ReadFile(path, &json_str));
+    Tag tag;
+    PAIMON_RETURN_NOT_OK(RapidJsonUtil::FromJsonString(json_str, &tag));
+    return tag;
+}
+}  // namespace paimon
diff --git a/src/paimon/core/tag/tag.h b/src/paimon/core/tag/tag.h
new file mode 100644
index 0000000..23508e5
--- /dev/null
+++ b/src/paimon/core/tag/tag.h
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+
+#include "paimon/common/utils/jsonizable.h"
+#include "paimon/core/snapshot.h"
+#include "paimon/result.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+class FileSystem;
+
+/// Snapshot with tagCreateTime and tagTimeRetained.
+class Tag : public Snapshot {
+ public:
+    static constexpr char FIELD_TAG_CREATE_TIME[] = "tagCreateTime";
+    static constexpr char FIELD_TAG_TIME_RETAINED[] = "tagTimeRetained";
+
+    JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(Tag);
+
+    Tag(const std::optional<int32_t>& version, int64_t id, int64_t schema_id,
+        const std::string& base_manifest_list,
+        const std::optional<int64_t>& base_manifest_list_size,
+        const std::string& delta_manifest_list,
+        const std::optional<int64_t>& delta_manifest_list_size,
+        const std::optional<std::string>& changelog_manifest_list,
+        const std::optional<int64_t>& changelog_manifest_list_size,
+        const std::optional<std::string>& index_manifest, const std::string& 
commit_user,
+        int64_t commit_identifier, CommitKind commit_kind, int64_t time_millis,
+        const std::optional<std::map<int32_t, int64_t>>& log_offsets,
+        const std::optional<int64_t>& total_record_count,
+        const std::optional<int64_t>& delta_record_count,
+        const std::optional<int64_t>& changelog_record_count,
+        const std::optional<int64_t>& watermark, const 
std::optional<std::string>& statistics,
+        const std::optional<std::map<std::string, std::string>>& properties,
+        const std::optional<int64_t>& next_row_id,
+        const std::optional<std::vector<int64_t>>& tag_create_time,
+        const std::optional<double_t>& tag_time_retained);
+
+    bool operator==(const Tag& other) const;
+    bool TEST_Equal(const Tag& other) const;
+
+    std::optional<std::vector<int64_t>> TagCreateTime() const {
+        return tag_create_time_;
+    }
+
+    std::optional<double_t> TagTimeRetained() const {
+        return tag_time_retained_;
+    }
+
+    Result<Snapshot> TrimToSnapshot() const;
+
+    rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+        noexcept(false) override;
+
+    void FromJson(const rapidjson::Value& obj) noexcept(false) override;
+
+    static Result<Tag> FromPath(const std::shared_ptr<FileSystem>& fs, const 
std::string& path);
+
+ private:
+    std::optional<std::vector<int64_t>> tag_create_time_;
+    std::optional<double_t> tag_time_retained_;
+};
+}  // namespace paimon
diff --git a/src/paimon/core/tag/tag_test.cpp b/src/paimon/core/tag/tag_test.cpp
new file mode 100644
index 0000000..fb371e8
--- /dev/null
+++ b/src/paimon/core/tag/tag_test.cpp
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/tag/tag.h"
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class TagTest : public testing::Test {
+ public:
+    static std::string ReplaceAll(const std::string& str, const bool 
serialized) {
+        std::string replaced_str = StringUtils::Replace(str, " ", "");
+        replaced_str = StringUtils::Replace(replaced_str, "\t", "");
+        replaced_str = StringUtils::Replace(replaced_str, "\n", "");
+        if (serialized) {
+            replaced_str = StringUtils::Replace(replaced_str, ".0", 
".000000000");
+        }
+        return replaced_str;
+    }
+};
+
+TEST_F(TagTest, TestSimple) {
+    const std::map<int32_t, int64_t> log_offset = {{25, 30}};
+    const std::map<std::string, std::string> properties = {{"key1", "value1"}, 
{"key2", "value2"}};
+    const auto tag_create_time = std::vector<int64_t>({2026, 1, 2, 3, 4, 5, 
6});
+    const Tag tag(
+        /*version=*/5, /*id=*/10, /*schema_id=*/15, 
/*base_manifest_list=*/"base_manifest_list", 10,
+        /*delta_manifest_list=*/"delta_manifest_list", 20,
+        /*changelog_manifest_list=*/"changelog_manifest_list", 30,
+        /*index_manifest=*/"index_manifest",
+        /*commit_user=*/"commit_user_01", /*commit_identifier=*/20,
+        /*commit_kind=*/Snapshot::CommitKind::Compact(), /*time_millis=*/1234, 
log_offset,
+        /*total_record_count=*/35,
+        /*delta_record_count=*/40, /*changelog_record_count=*/45, 
/*watermark=*/50,
+        /*statistics=*/"statistic_test", properties, /*next_row_id=*/0,
+        /*tag_create_time=*/tag_create_time, /*tag_time_retained=*/5.0);
+    ASSERT_EQ(5, tag.Version());
+    ASSERT_EQ(10, tag.Id());
+    ASSERT_EQ(15, tag.SchemaId());
+    ASSERT_EQ("base_manifest_list", tag.BaseManifestList());
+    ASSERT_EQ(10, tag.BaseManifestListSize().value());
+    ASSERT_EQ("delta_manifest_list", tag.DeltaManifestList());
+    ASSERT_EQ(20, tag.DeltaManifestListSize().value());
+    ASSERT_EQ("changelog_manifest_list", tag.ChangelogManifestList().value());
+    ASSERT_EQ(30, tag.ChangelogManifestListSize().value());
+    ASSERT_EQ("index_manifest", tag.IndexManifest().value());
+    ASSERT_EQ("commit_user_01", tag.CommitUser());
+    ASSERT_EQ(20, tag.CommitIdentifier());
+    ASSERT_EQ(Snapshot::CommitKind::Compact(), tag.GetCommitKind());
+    ASSERT_EQ(1234, tag.TimeMillis());
+    ASSERT_EQ(log_offset, tag.LogOffsets().value());
+    ASSERT_EQ(35, tag.TotalRecordCount().value());
+    ASSERT_EQ(40, tag.DeltaRecordCount().value());
+    ASSERT_EQ(45, tag.ChangelogRecordCount().value());
+    ASSERT_EQ(50, tag.Watermark().value());
+    ASSERT_EQ("statistic_test", tag.Statistics().value());
+    ASSERT_EQ(properties, tag.Properties().value());
+    ASSERT_EQ(0, tag.NextRowId().value());
+    ASSERT_EQ(tag_create_time, tag.TagCreateTime().value());
+    ASSERT_EQ(5.0, tag.TagTimeRetained().value());
+}
+
+TEST_F(TagTest, TestFromPath) {
+    const std::string data_path = paimon::test::GetDataDir() +
+                                  
"/orc/append_table_with_tag.db/append_table_with_tag/tag/tag-1";
+    const auto fs = std::make_shared<LocalFileSystem>();
+    ASSERT_OK_AND_ASSIGN(Tag tag, Tag::FromPath(fs, data_path));
+    ASSERT_EQ(3, tag.Version());
+    ASSERT_EQ(1, tag.Id());
+    ASSERT_EQ(0, tag.SchemaId());
+    ASSERT_EQ("manifest-list-616d1847-a02c-495f-9cca-2c8b7def0fec-0", 
tag.BaseManifestList());
+    ASSERT_EQ(std::nullopt, tag.BaseManifestListSize());
+    ASSERT_EQ("manifest-list-616d1847-a02c-495f-9cca-2c8b7def0fec-1", 
tag.DeltaManifestList());
+    ASSERT_EQ(std::nullopt, tag.DeltaManifestListSize());
+    ASSERT_EQ(std::nullopt, tag.ChangelogManifestList());
+    ASSERT_EQ(std::nullopt, tag.ChangelogManifestListSize());
+    ASSERT_EQ(std::nullopt, tag.IndexManifest());
+    ASSERT_EQ("b02e4322-9c5f-41e1-a560-c0156fdf7b9c", tag.CommitUser());
+    ASSERT_EQ(9223372036854775807ll, tag.CommitIdentifier());
+    ASSERT_EQ(Snapshot::CommitKind::Append(), tag.GetCommitKind());
+    ASSERT_EQ(1721614343270ll, tag.TimeMillis());
+    ASSERT_EQ((std::map<int32_t, int64_t>()), tag.LogOffsets().value());
+    ASSERT_EQ(5, tag.TotalRecordCount().value());
+    ASSERT_EQ(5, tag.DeltaRecordCount().value());
+    ASSERT_EQ(0, tag.ChangelogRecordCount().value());
+    ASSERT_EQ(std::nullopt, tag.Watermark());
+    ASSERT_EQ(std::nullopt, tag.Statistics());
+    ASSERT_EQ(std::nullopt, tag.Properties());
+    ASSERT_EQ(std::nullopt, tag.NextRowId());
+    ASSERT_EQ(std::vector<int64_t>({2026, 2, 4, 6, 8, 10, 12}), 
tag.TagCreateTime());
+    ASSERT_EQ(3.0, tag.TagTimeRetained());
+}
+
+TEST_F(TagTest, TestJsonizable) {
+    const std::string json_str = R"({
+        "version" : 3,
+        "id" : 1,
+        "schemaId" : 0,
+        "baseManifestList" : 
"manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-0",
+        "baseManifestListSize" : 20,
+        "deltaManifestList" : 
"manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-1",
+        "deltaManifestListSize" : 50,
+        "changelogManifestList" : null,
+        "commitUser" : "0e4d92f7-53b0-40d6-a7c0-102bf3801e6a",
+        "commitIdentifier" : 9223372036854775807,
+        "commitKind" : "OVERWRITE",
+        "timeMillis" : 1711692199281,
+        "logOffsets" : { },
+        "totalRecordCount" : 3,
+        "deltaRecordCount" : 3,
+        "changelogRecordCount" : 0,
+        "tagCreateTime" : [ 2026, 1, 3, 5, 7, 9, 11 ],
+        "tagTimeRetained" : 4.000000000
+    })";
+
+    Tag tag;
+    ASSERT_OK(RapidJsonUtil::FromJsonString(json_str, &tag));
+
+    const Tag expected_tag(
+        /*version=*/3, /*id=*/1, /*schema_id=*/0, /*base_manifest_list=*/
+        "manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-0", 
/*base_manifest_list_size=*/20,
+        
/*delta_manifest_list=*/"manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-1",
+        /*delta_manifest_list_size=*/50, 
/*changelog_manifest_list=*/std::nullopt,
+        /*changelog_manifest_list_size=*/std::nullopt, 
/*index_manifest=*/std::nullopt,
+        /*commit_user=*/"0e4d92f7-53b0-40d6-a7c0-102bf3801e6a",
+        /*commit_identifier=*/9223372036854775807ll,
+        /*commit_kind=*/Snapshot::CommitKind::Overwrite(), 
/*time_millis=*/1711692199281ll,
+        /*log_offsets=*/std::map<int32_t, int64_t>(),
+        /*total_record_count=*/3, /*delta_record_count=*/3, 
/*changelog_record_count=*/0,
+        /*watermark=*/std::nullopt, /*statistics=*/std::nullopt, 
/*properties=*/std::nullopt,
+        /*next_row_id=*/std::nullopt,
+        /*tag_create_time=*/std::vector<int64_t>({2026, 1, 3, 5, 7, 9, 11}),
+        /*tag_time_retained=*/4.0);
+    ASSERT_EQ(expected_tag, tag);
+
+    ASSERT_OK_AND_ASSIGN(std::string new_json_str, tag.ToJsonString());
+    ASSERT_EQ(ReplaceAll(json_str, false), ReplaceAll(new_json_str, true));
+}
+
+TEST_F(TagTest, TestSerializeAndDeserialize) {
+    const auto se_and_de = [&](const std::string& data_path) {
+        auto fs = std::make_shared<LocalFileSystem>();
+        std::string json_str;
+        ASSERT_OK(fs->ReadFile(data_path, &json_str));
+        ASSERT_OK_AND_ASSIGN(Tag tag, Tag::FromPath(fs, data_path));
+        ASSERT_OK_AND_ASSIGN(std::string se_json_str, tag.ToJsonString());
+        ASSERT_EQ(ReplaceAll(json_str, false), ReplaceAll(se_json_str, true));
+    };
+    auto se_and_de_from_str = [&](const std::string& json_str) {
+        Tag tag;
+        ASSERT_OK(RapidJsonUtil::FromJsonString(json_str, &tag));
+        ASSERT_OK_AND_ASSIGN(std::string se_json_str, tag.ToJsonString());
+        ASSERT_EQ(ReplaceAll(json_str, false), ReplaceAll(se_json_str, true));
+    };
+    {
+        const std::string data_path =
+            paimon::test::GetDataDir() +
+            
"/orc/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/tag/tag-1";
+        se_and_de(data_path);
+    }
+    {
+        // with tagCreateTime
+        const std::string json_str = R"({
+          "version" : 3,
+          "id" : 10,
+          "schemaId" : 2,
+          "baseManifestList" : "base-manifest-list-1",
+          "baseManifestListSize" : 100,
+          "deltaManifestList" : "delta-manifest-list-2",
+          "deltaManifestListSize" : 200,
+          "changelogManifestList" : null,
+          "commitUser" : "commit-usr-3",
+          "commitIdentifier" : 12,
+          "commitKind" : "APPEND",
+          "timeMillis" : 1749724197266,
+          "logOffsets" : {
+              "0" : 1,
+              "1" : 3
+          },
+          "totalRecordCount" : 1024,
+          "deltaRecordCount" : 4096,
+          "watermark" : 1749724196266,
+          "statistics" : "statistics-4",
+          "properties" : {
+             "key0" : "value0",
+             "key1" : "value1"
+          },
+          "tagCreateTime": [ 2026, 2, 4, 6, 8, 10, 12 ]
+        })";
+        se_and_de_from_str(json_str);
+    }
+    {
+        // with tagTimeRetained
+        const std::string json_str = R"({
+          "version" : 3,
+          "id" : 10,
+          "schemaId" : 2,
+          "baseManifestList" : "base-manifest-list-1",
+          "baseManifestListSize" : 100,
+          "deltaManifestList" : "delta-manifest-list-2",
+          "deltaManifestListSize" : 200,
+          "changelogManifestList" : null,
+          "commitUser" : "commit-usr-3",
+          "commitIdentifier" : 12,
+          "commitKind" : "APPEND",
+          "timeMillis" : 1749724197266,
+          "logOffsets" : {
+              "0" : 1,
+              "1" : 3
+          },
+          "totalRecordCount" : 1024,
+          "deltaRecordCount" : 4096,
+          "watermark" : 1749724196266,
+          "statistics" : "statistics-4",
+          "properties" : {
+             "key0" : "value0",
+             "key1" : "value1"
+          },
+          "tagTimeRetained" : 2.000000000
+        })";
+        se_and_de_from_str(json_str);
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/view/view.h b/src/paimon/core/view/view.h
new file mode 100644
index 0000000..f689a23
--- /dev/null
+++ b/src/paimon/core/view/view.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/schema/schema.h"
+#include "paimon/status.h"
+#include "paimon/type_fwd.h"
+#include "paimon/visibility.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+/// Interface for view definition.
+class PAIMON_EXPORT View {
+ public:
+    View() = default;
+    virtual ~View() = default;
+
+    /// A name to identify this view.
+    virtual std::string Name() const = 0;
+
+    /// Full name of the view, default is database.tableName.
+    virtual std::string FullName() const {
+        return Name();
+    }
+
+    /// Returns the view representation.
+    virtual std::string Query() const = 0;
+
+    /// Loads the schema of view.
+    virtual std::shared_ptr<Schema> GetSchema() const = 0;
+};
+
+}  // namespace paimon

Reply via email to