This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ee4a84a feat(core): introduce storage metadata components (#62)
ee4a84a is described below
commit ee4a84a3f1d5afabab7f5c15f0ec6316910c64e2
Author: Zhang Jiawei <[email protected]>
AuthorDate: Tue Jun 9 15:45:25 2026 +0800
feat(core): introduce storage metadata components (#62)
---
src/paimon/core/disk/file_channel_manager.h | 92 ++++++++
src/paimon/core/disk/file_io_channel.cpp | 76 +++++++
src/paimon/core/disk/file_io_channel.h | 78 +++++++
src/paimon/core/disk/io_manager.h | 77 +++++++
src/paimon/core/disk/io_manager_test.cpp | 81 +++++++
src/paimon/core/memory/writer_memory_manager.cpp | 112 ++++++++++
src/paimon/core/memory/writer_memory_manager.h | 62 ++++++
.../core/memory/writer_memory_manager_test.cpp | 248 +++++++++++++++++++++
src/paimon/core/partition/partition_info.h | 80 +++++++
src/paimon/core/partition/partition_statistics.h | 140 ++++++++++++
.../core/partition/partition_statistics_test.cpp | 65 ++++++
src/paimon/core/tag/tag.cpp | 116 ++++++++++
src/paimon/core/tag/tag.h | 89 ++++++++
src/paimon/core/tag/tag_test.cpp | 247 ++++++++++++++++++++
src/paimon/core/view/view.h | 58 +++++
15 files changed, 1621 insertions(+)
diff --git a/src/paimon/core/disk/file_channel_manager.h
b/src/paimon/core/disk/file_channel_manager.h
new file mode 100644
index 0000000..6c03efc
--- /dev/null
+++ b/src/paimon/core/disk/file_channel_manager.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <random>
+#include <string>
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/uuid.h"
+#include "paimon/core/disk/file_io_channel.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class FileChannelManager {
+ public:
+ static Result<std::unique_ptr<FileChannelManager>> Create(
+ const std::string& tmp_dir, const std::string& prefix,
+ const std::shared_ptr<FileSystem>& file_system) {
+ std::string uuid;
+ if (!UUID::Generate(&uuid)) {
+ return Status::Invalid("Failed to generate UUID for
FileChannelManager.");
+ }
+ std::string spill_dir = PathUtil::JoinPath(tmp_dir, "paimon-" + prefix
+ "-" + uuid);
+
+ PAIMON_RETURN_NOT_OK(file_system->Mkdirs(spill_dir));
+
+ std::random_device rd;
+ std::mt19937 random(rd());
+
+ return std::unique_ptr<FileChannelManager>(
+ new FileChannelManager(spill_dir, std::move(random), file_system));
+ }
+
+ ~FileChannelManager() {
+ if (!spill_dir_.empty() && fs_ != nullptr) {
+ [[maybe_unused]] auto status = fs_->Delete(spill_dir_,
/*recursive=*/true);
+ }
+ }
+
+ FileChannelManager(const FileChannelManager&) = delete;
+ FileChannelManager& operator=(const FileChannelManager&) = delete;
+
+ FileIOChannel::ID CreateChannel() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return FileIOChannel::ID(spill_dir_, &random_);
+ }
+
+ FileIOChannel::ID CreateChannel(const std::string& prefix) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return FileIOChannel::ID(spill_dir_, prefix, &random_);
+ }
+
+ std::shared_ptr<FileIOChannel::Enumerator> CreateChannelEnumerator() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return std::make_shared<FileIOChannel::Enumerator>(spill_dir_,
&random_);
+ }
+
+ const std::string& GetSpillDir() const {
+ return spill_dir_;
+ }
+
+ private:
+ FileChannelManager(const std::string& spill_dir, std::mt19937&& random,
+ const std::shared_ptr<FileSystem>& fs)
+ : spill_dir_(spill_dir), random_(std::move(random)), fs_(fs) {}
+ std::string spill_dir_;
+ std::mutex mutex_;
+ std::mt19937 random_;
+ std::shared_ptr<FileSystem> fs_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/disk/file_io_channel.cpp
b/src/paimon/core/disk/file_io_channel.cpp
new file mode 100644
index 0000000..8276461
--- /dev/null
+++ b/src/paimon/core/disk/file_io_channel.cpp
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/disk/file_io_channel.h"
+
+#include <iomanip>
+#include <sstream>
+#include <utility>
+
+#include "paimon/common/utils/path_util.h"
+
+namespace paimon {
+std::string FileIOChannel::GenerateRandomHexString(std::mt19937* random) {
+ std::uniform_int_distribution<int32_t> dist(0, 255);
+ std::ostringstream hex_stream;
+ hex_stream << std::hex << std::setfill('0');
+ for (int32_t i = 0; i < kRandomBytesLength; ++i) {
+ hex_stream << std::setw(2) << dist(*random);
+ }
+ return hex_stream.str();
+}
+
+FileIOChannel::ID::ID(const std::string& path) : path_(path) {}
+
+FileIOChannel::ID::ID(const std::string& base_path, std::mt19937* random)
+ : path_(PathUtil::JoinPath(base_path, GenerateRandomHexString(random) +
".channel")) {}
+
+FileIOChannel::ID::ID(const std::string& base_path, const std::string& prefix,
std::mt19937* random)
+ : path_(PathUtil::JoinPath(base_path,
+ prefix + "-" + GenerateRandomHexString(random)
+ ".channel")) {}
+
+const std::string& FileIOChannel::ID::GetPath() const {
+ return path_;
+}
+
+bool FileIOChannel::ID::operator==(const ID& other) const {
+ return path_ == other.path_;
+}
+
+bool FileIOChannel::ID::operator!=(const ID& other) const {
+ return !(*this == other);
+}
+
+size_t FileIOChannel::ID::Hash::operator()(const ID& id) const {
+ return std::hash<std::string>{}(id.path_);
+}
+
+FileIOChannel::Enumerator::Enumerator(const std::string& base_path,
std::mt19937* random)
+ : path_(base_path), name_prefix_(GenerateRandomHexString(random)) {}
+
+FileIOChannel::ID FileIOChannel::Enumerator::Next() {
+ std::ostringstream filename;
+ filename << name_prefix_ << "." << std::setfill('0') << std::setw(6) <<
(local_counter_++)
+ << ".channel";
+
+ std::string full_path = PathUtil::JoinPath(path_, filename.str());
+ return ID(full_path);
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/disk/file_io_channel.h
b/src/paimon/core/disk/file_io_channel.h
new file mode 100644
index 0000000..652056c
--- /dev/null
+++ b/src/paimon/core/disk/file_io_channel.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <random>
+#include <string>
+
+#include "paimon/visibility.h"
+
+namespace paimon {
+class PAIMON_EXPORT FileIOChannel {
+ public:
+ class PAIMON_EXPORT ID {
+ public:
+ ID() = default;
+
+ explicit ID(const std::string& path);
+
+ ID(const std::string& base_path, std::mt19937* random);
+
+ ID(const std::string& base_path, const std::string& prefix,
std::mt19937* random);
+
+ const std::string& GetPath() const;
+
+ bool operator==(const ID& other) const;
+
+ bool operator!=(const ID& other) const;
+
+ struct Hash {
+ size_t operator()(const ID& id) const;
+ };
+
+ private:
+ std::string path_;
+ };
+
+ private:
+ static constexpr int32_t kRandomBytesLength = 16;
+ static std::string GenerateRandomHexString(std::mt19937* random);
+
+ public:
+ class PAIMON_EXPORT Enumerator {
+ public:
+ Enumerator(const std::string& base_path, std::mt19937* random);
+
+ ID Next();
+
+ private:
+ std::string path_;
+ std::string name_prefix_;
+ uint64_t local_counter_{0};
+ };
+};
+
+struct FileChannelInfo {
+ FileIOChannel::ID channel_id;
+ int64_t file_size;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/disk/io_manager.h
b/src/paimon/core/disk/io_manager.h
new file mode 100644
index 0000000..4b79771
--- /dev/null
+++ b/src/paimon/core/disk/io_manager.h
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include "paimon/core/disk/file_channel_manager.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class IOManager {
+ public:
+ IOManager(const std::string& tmp_dir, const std::shared_ptr<FileSystem>&
file_system)
+ : tmp_dir_(tmp_dir), file_system_(file_system) {}
+
+ const std::string& GetTempDir() const {
+ return tmp_dir_;
+ }
+
+ Result<FileIOChannel::ID> CreateChannel() {
+ PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager());
+ return manager->CreateChannel();
+ }
+
+ Result<FileIOChannel::ID> CreateChannel(const std::string& prefix) {
+ PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager());
+ return manager->CreateChannel(prefix);
+ }
+
+ Result<std::shared_ptr<FileIOChannel::Enumerator>>
CreateChannelEnumerator() {
+ PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager());
+ return manager->CreateChannelEnumerator();
+ }
+
+ Result<std::string> GetSpillDir() {
+ PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager());
+ return manager->GetSpillDir();
+ }
+
+ private:
+ Result<FileChannelManager*> GetFileChannelManager() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (file_channel_manager_ == nullptr) {
+ PAIMON_ASSIGN_OR_RAISE(
+ file_channel_manager_,
+ FileChannelManager::Create(tmp_dir_, kDirNamePrefix,
file_system_));
+ }
+ return file_channel_manager_.get();
+ }
+
+ static constexpr char kDirNamePrefix[] = "io";
+ std::string tmp_dir_;
+ std::shared_ptr<FileSystem> file_system_;
+ std::mutex mutex_;
+ std::unique_ptr<FileChannelManager> file_channel_manager_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/disk/io_manager_test.cpp
b/src/paimon/core/disk/io_manager_test.cpp
new file mode 100644
index 0000000..3290042
--- /dev/null
+++ b/src/paimon/core/disk/io_manager_test.cpp
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/disk/io_manager.h"
+
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(IOManagerTest, CreateShouldReturnManagerWithGivenTempDir) {
+ auto tmp_dir = UniqueTestDirectory::Create();
+
+ auto manager = std::make_unique<IOManager>(tmp_dir->Str(),
tmp_dir->GetFileSystem());
+ ASSERT_NE(manager, nullptr);
+ ASSERT_EQ(manager->GetTempDir(), tmp_dir->Str());
+}
+
+TEST(IOManagerTest, CreateChannelShouldReturnValidAndUniquePaths) {
+ auto tmp_dir = UniqueTestDirectory::Create();
+ auto manager = std::make_shared<IOManager>(tmp_dir->Str(),
tmp_dir->GetFileSystem());
+ const std::string prefix = "spill";
+
+ ASSERT_OK_AND_ASSIGN(auto channel1, manager->CreateChannel());
+ ASSERT_TRUE(StringUtils::StartsWith(channel1.GetPath(), tmp_dir->Str() +
"/paimon-io-"));
+ ASSERT_TRUE(StringUtils::EndsWith(channel1.GetPath(), ".channel"));
+ ASSERT_EQ(PathUtil::GetName(channel1.GetPath()).size(), 32 +
std::string(".channel").size());
+
+ ASSERT_OK_AND_ASSIGN(auto channel2, manager->CreateChannel(prefix));
+ ASSERT_TRUE(StringUtils::StartsWith(PathUtil::GetName(channel2.GetPath()),
prefix + "-"));
+}
+
+TEST(IOManagerTest,
CreateChannelEnumeratorShouldReturnSequentialAndUniquePaths) {
+ auto tmp_dir = UniqueTestDirectory::Create();
+ auto manager = std::make_shared<IOManager>(tmp_dir->Str(),
tmp_dir->GetFileSystem());
+
+ ASSERT_OK_AND_ASSIGN(auto enumerator, manager->CreateChannelEnumerator());
+
+ for (int i = 0; i < 10; ++i) {
+ auto channel_id = enumerator->Next();
+ ASSERT_TRUE(StringUtils::StartsWith(channel_id.GetPath(),
tmp_dir->Str() + "/paimon-io-"));
+ std::string counter_str = std::to_string(i);
+ std::string padded_counter = std::string(6 - counter_str.size(), '0')
+ counter_str;
+ ASSERT_TRUE(StringUtils::EndsWith(channel_id.GetPath(), "." +
padded_counter + ".channel"));
+ }
+}
+
+TEST(IOManagerTest, GetSpillDirShouldReturnPaimonIoSubdirectory) {
+ auto tmp_dir = UniqueTestDirectory::Create();
+ auto manager = std::make_shared<IOManager>(tmp_dir->Str(),
tmp_dir->GetFileSystem());
+
+ ASSERT_OK_AND_ASSIGN(std::string spill_dir, manager->GetSpillDir());
+ ASSERT_TRUE(StringUtils::StartsWith(spill_dir, tmp_dir->Str() +
"/paimon-io-"));
+ ASSERT_FALSE(StringUtils::EndsWith(spill_dir, "/"));
+
+ ASSERT_OK_AND_ASSIGN(auto channel, manager->CreateChannel());
+ ASSERT_TRUE(StringUtils::StartsWith(channel.GetPath(), spill_dir + "/"));
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/memory/writer_memory_manager.cpp
b/src/paimon/core/memory/writer_memory_manager.cpp
new file mode 100644
index 0000000..3c4ea0c
--- /dev/null
+++ b/src/paimon/core/memory/writer_memory_manager.cpp
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/memory/writer_memory_manager.h"
+
+#include <cassert>
+
+#include "fmt/format.h"
+#include "paimon/core/utils/batch_writer.h"
+
+namespace paimon {
+
+void WriterMemoryManager::RegisterWriter(BatchWriter* writer) {
+ UpdateWriterMemory(writer);
+}
+
+void WriterMemoryManager::UnregisterWriter(BatchWriter* writer) {
+ auto iter = writer_memory_.find(writer);
+ if (iter == writer_memory_.end()) {
+ return;
+ }
+
+ assert(total_memory_ >= iter->second);
+ total_memory_ -= iter->second;
+ writer_memory_.erase(iter);
+}
+
+void WriterMemoryManager::RefreshWriterMemory(BatchWriter* writer) {
+ UpdateWriterMemory(writer);
+}
+
+Status WriterMemoryManager::OnWriteCompleted(BatchWriter* writer) {
+ UpdateWriterMemory(writer);
+ if (total_memory_ < memory_limit_) {
+ return Status::OK();
+ }
+
+ return ShrinkToLimit();
+}
+
+void WriterMemoryManager::UpdateWriterMemory(BatchWriter* writer) {
+ uint64_t current_memory_usage = writer->GetMemoryUsage();
+ auto [iter, inserted] = writer_memory_.emplace(writer,
current_memory_usage);
+ if (inserted) {
+ total_memory_ += current_memory_usage;
+ } else {
+ uint64_t previous_memory = iter->second;
+ if (current_memory_usage >= previous_memory) {
+ total_memory_ += (current_memory_usage - previous_memory);
+ } else {
+ assert(total_memory_ >= previous_memory - current_memory_usage);
+ total_memory_ -= (previous_memory - current_memory_usage);
+ }
+ iter->second = current_memory_usage;
+ }
+}
+
+WriterMemoryManager::Candidate WriterMemoryManager::PickLargest() const {
+ Candidate candidate;
+ for (const auto& [writer, memory] : writer_memory_) {
+ if (memory > candidate.memory) {
+ candidate = {writer, memory};
+ }
+ }
+ return candidate;
+}
+
+Status WriterMemoryManager::ShrinkToLimit() {
+ while (true) {
+ if (total_memory_ < memory_limit_) {
+ return Status::OK();
+ }
+
+ Candidate picked = PickLargest();
+ if (picked.memory == 0) {
+ return Status::Invalid(
+ fmt::format("Unable to release memory to below the
write-buffer-size limit ({} "
+ "bytes), this might be a bug.",
+ memory_limit_));
+ }
+ BatchWriter* candidate = picked.writer;
+ uint64_t before_memory = picked.memory;
+ PAIMON_RETURN_NOT_OK(candidate->FlushMemory());
+
+ UpdateWriterMemory(candidate);
+ uint64_t after_memory = candidate->GetMemoryUsage();
+ if (after_memory >= before_memory) {
+ return Status::Invalid(fmt::format(
+ "Before flushing memory, writer had {} bytes of memory
allocated, After flushing "
+ "memory, writer still has {} bytes of memory allocated, this
might be a bug.",
+ before_memory, after_memory));
+ }
+ }
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/memory/writer_memory_manager.h
b/src/paimon/core/memory/writer_memory_manager.h
new file mode 100644
index 0000000..811667c
--- /dev/null
+++ b/src/paimon/core/memory/writer_memory_manager.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <unordered_map>
+
+#include "paimon/status.h"
+
+namespace paimon {
+
+class BatchWriter;
+
+/// Coordinates global write-buffer memory across managed writers. Used in
`AbstractFileStoreWrite`.
+/// @note This class is not thread-safe.
+class WriterMemoryManager {
+ public:
+ explicit WriterMemoryManager(uint64_t memory_limit) :
memory_limit_(memory_limit) {}
+
+ /// Register a writer when create a new `BatchWriter` in
`AbstractFileStoreWrite::GetWriter()`
+ void RegisterWriter(BatchWriter* writer);
+ /// Unregister a writer when the `BatchWriter` has been erased in
`AbstractFileStoreWrite`
+ void UnregisterWriter(BatchWriter* writer);
+ /// Refresh the memory usage of a writer when after
`BatchWriter::PrepareCommit()`
+ void RefreshWriterMemory(BatchWriter* writer);
+ /// Check if the total memory usage exceeds the limit after
`BatchWriter::Write()`, and trigger
+ /// flush if needed.
+ Status OnWriteCompleted(BatchWriter* writer);
+
+ private:
+ struct Candidate {
+ BatchWriter* writer = nullptr;
+ uint64_t memory = 0;
+ };
+
+ void UpdateWriterMemory(BatchWriter* writer);
+ Candidate PickLargest() const;
+ Status ShrinkToLimit();
+
+ const uint64_t memory_limit_;
+ uint64_t total_memory_ = 0;
+ std::unordered_map<BatchWriter*, uint64_t> writer_memory_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/memory/writer_memory_manager_test.cpp
b/src/paimon/core/memory/writer_memory_manager_test.cpp
new file mode 100644
index 0000000..1a91bf6
--- /dev/null
+++ b/src/paimon/core/memory/writer_memory_manager_test.cpp
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/memory/writer_memory_manager.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/core/io/compact_increment.h"
+#include "paimon/core/io/data_increment.h"
+#include "paimon/core/utils/batch_writer.h"
+#include "paimon/core/utils/commit_increment.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+namespace {
+
+class FakeBatchWriter : public BatchWriter {
+ public:
+ FakeBatchWriter(const std::string& name, std::vector<std::string>*
flush_history)
+ : name_(name), flush_history_(flush_history) {}
+
+ void SetMemoryUsage(uint64_t memory_usage) {
+ memory_usage_ = memory_usage;
+ }
+
+ void SetFlushReductions(std::vector<uint64_t> flush_reductions) {
+ flush_reductions_ = std::move(flush_reductions);
+ }
+
+ uint64_t GetMemoryUsage() const override {
+ return memory_usage_;
+ }
+
+ Status FlushMemory() override {
+ uint64_t reduction = memory_usage_;
+ if (flush_calls_ < static_cast<int32_t>(flush_reductions_.size())) {
+ reduction = flush_reductions_[flush_calls_];
+ }
+ reduction = std::min(reduction, memory_usage_);
+ memory_usage_ -= reduction;
+
+ if (reduction > 0) {
+ if (flush_history_ != nullptr) {
+ flush_history_->push_back(name_);
+ }
+ ++flush_calls_;
+ }
+ return Status::OK();
+ }
+
+ Status Write(std::unique_ptr<RecordBatch>&& batch) override {
+ (void)batch;
+ return Status::OK();
+ }
+
+ Status Compact(bool full_compaction) override {
+ (void)full_compaction;
+ return Status::OK();
+ }
+
+ Result<CommitIncrement> PrepareCommit(bool wait_compaction) override {
+ (void)wait_compaction;
+ return CommitIncrement(DataIncrement({}, {}, {}), CompactIncrement({},
{}, {}), nullptr);
+ }
+
+ Result<bool> CompactNotCompleted() override {
+ return false;
+ }
+
+ Status Sync() override {
+ return Status::OK();
+ }
+
+ Status Close() override {
+ return Status::OK();
+ }
+
+ std::shared_ptr<Metrics> GetMetrics() const override {
+ return nullptr;
+ }
+
+ private:
+ std::string name_;
+ std::vector<std::string>* flush_history_;
+ std::vector<uint64_t> flush_reductions_;
+ uint64_t memory_usage_ = 0;
+ int32_t flush_calls_ = 0;
+};
+
+} // namespace
+
+TEST(WriterMemoryManagerTest, DoesNotFlushWhenMemoryIsBelowLimit) {
+ WriterMemoryManager manager(/*memory_limit=*/100);
+ std::vector<std::string> flush_history;
+ FakeBatchWriter writer("writer", &flush_history);
+ manager.RegisterWriter(&writer);
+
+ writer.SetMemoryUsage(40);
+ ASSERT_OK(manager.OnWriteCompleted(&writer));
+ ASSERT_TRUE(flush_history.empty());
+}
+
+TEST(WriterMemoryManagerTest, UnregisterWriterRemovesMemoryFromLedger) {
+ WriterMemoryManager manager(/*memory_limit=*/100);
+ std::vector<std::string> flush_history;
+ FakeBatchWriter writer_a("writer_a", &flush_history);
+ manager.RegisterWriter(&writer_a);
+ FakeBatchWriter writer_b("writer_b", &flush_history);
+ manager.RegisterWriter(&writer_b);
+
+ writer_a.SetMemoryUsage(80);
+ manager.RefreshWriterMemory(&writer_a);
+ manager.UnregisterWriter(&writer_a);
+
+ writer_b.SetMemoryUsage(30);
+ ASSERT_OK(manager.OnWriteCompleted(&writer_b));
+ ASSERT_TRUE(flush_history.empty());
+}
+
+TEST(WriterMemoryManagerTest, RefreshWriterMemoryUpdatesLedgerWithoutFlushing)
{
+ WriterMemoryManager manager(/*memory_limit=*/80);
+ std::vector<std::string> flush_history;
+ FakeBatchWriter writer_a("writer_a", &flush_history);
+ manager.RegisterWriter(&writer_a);
+ FakeBatchWriter writer_b("writer_b", &flush_history);
+ manager.RegisterWriter(&writer_b);
+
+ writer_a.SetMemoryUsage(60);
+ manager.RefreshWriterMemory(&writer_a);
+ writer_b.SetMemoryUsage(30);
+ manager.RefreshWriterMemory(&writer_b);
+
+ writer_a.SetMemoryUsage(10);
+ ASSERT_OK(manager.OnWriteCompleted(&writer_a));
+ ASSERT_TRUE(flush_history.empty());
+}
+
+TEST(WriterMemoryManagerTest, FlushWriterMemoryWithMultipleWriters) {
+ WriterMemoryManager manager(/*memory_limit=*/100);
+ std::vector<std::string> flush_history;
+ FakeBatchWriter writer_a("writer_a", &flush_history);
+ manager.RegisterWriter(&writer_a);
+ FakeBatchWriter writer_b("writer_b", &flush_history);
+ manager.RegisterWriter(&writer_b);
+ FakeBatchWriter writer_c("writer_c", &flush_history);
+ manager.RegisterWriter(&writer_c);
+
+ writer_a.SetMemoryUsage(60);
+ ASSERT_OK(manager.OnWriteCompleted(&writer_a));
+ writer_b.SetMemoryUsage(30);
+ ASSERT_OK(manager.OnWriteCompleted(&writer_b));
+ writer_c.SetMemoryUsage(50);
+ ASSERT_OK(manager.OnWriteCompleted(&writer_c));
+ writer_a.SetMemoryUsage(30);
+ ASSERT_OK(manager.OnWriteCompleted(&writer_a));
+ writer_c.SetMemoryUsage(45);
+ ASSERT_OK(manager.OnWriteCompleted(&writer_c));
+
+ ASSERT_EQ(flush_history, std::vector<std::string>({"writer_a", "writer_c",
"writer_c"}));
+ ASSERT_EQ(writer_a.GetMemoryUsage(), 30);
+ ASSERT_EQ(writer_b.GetMemoryUsage(), 30);
+ ASSERT_EQ(writer_c.GetMemoryUsage(), 0);
+}
+
+TEST(WriterMemoryManagerTest, ReclaimsCallerWhenCallerIsLargestWriter) {
+ WriterMemoryManager manager(/*memory_limit=*/100);
+ std::vector<std::string> flush_history;
+ FakeBatchWriter writer_a("writer_a", &flush_history);
+ manager.RegisterWriter(&writer_a);
+ FakeBatchWriter writer_b("writer_b", &flush_history);
+ manager.RegisterWriter(&writer_b);
+
+ writer_a.SetMemoryUsage(20);
+ manager.RefreshWriterMemory(&writer_a);
+
+ writer_b.SetMemoryUsage(90);
+ ASSERT_OK(manager.OnWriteCompleted(&writer_b));
+
+ ASSERT_EQ(flush_history, std::vector<std::string>({"writer_b"}));
+ ASSERT_EQ(writer_a.GetMemoryUsage(), 20);
+ ASSERT_EQ(writer_b.GetMemoryUsage(), 0);
+}
+
+TEST(WriterMemoryManagerTest, ContinuesReclaimingUntilBelowGlobalLimit) {
+ WriterMemoryManager manager(/*memory_limit=*/61);
+ std::vector<std::string> flush_history;
+ FakeBatchWriter writer_a("writer_a", &flush_history);
+ manager.RegisterWriter(&writer_a);
+ FakeBatchWriter writer_b("writer_b", &flush_history);
+ manager.RegisterWriter(&writer_b);
+
+ writer_a.SetMemoryUsage(90);
+ writer_a.SetFlushReductions({20, 20, 20});
+ manager.RefreshWriterMemory(&writer_a);
+
+ writer_b.SetMemoryUsage(60);
+ writer_b.SetFlushReductions({30});
+ ASSERT_OK(manager.OnWriteCompleted(&writer_b));
+
+ ASSERT_EQ(flush_history,
+ std::vector<std::string>({"writer_a", "writer_a", "writer_b",
"writer_a"}));
+ ASSERT_EQ(writer_a.GetMemoryUsage(), 30);
+ ASSERT_EQ(writer_b.GetMemoryUsage(), 30);
+}
+
+TEST(WriterMemoryManagerTest,
ReturnsConfigurationErrorWhenNoWriterCanReleaseEnoughMemory) {
+ WriterMemoryManager manager(/*memory_limit=*/100);
+ std::vector<std::string> flush_history;
+ FakeBatchWriter writer_a("writer_a", &flush_history);
+ manager.RegisterWriter(&writer_a);
+ FakeBatchWriter writer_b("writer_b", &flush_history);
+ manager.RegisterWriter(&writer_b);
+
+ writer_b.SetMemoryUsage(20);
+ manager.RefreshWriterMemory(&writer_b);
+
+ writer_a.SetMemoryUsage(120);
+ writer_a.SetFlushReductions({10, 0});
+ ASSERT_NOK_WITH_MSG(
+ manager.OnWriteCompleted(&writer_a),
+ "Before flushing memory, writer had 110 bytes of memory allocated,
After flushing memory, "
+ "writer still has 110 bytes of memory allocated, this might be a
bug.");
+ ASSERT_EQ(flush_history, std::vector<std::string>({"writer_a"}));
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/partition/partition_info.h
b/src/paimon/core/partition/partition_info.h
new file mode 100644
index 0000000..f1dbc09
--- /dev/null
+++ b/src/paimon/core/partition/partition_info.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/type.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/predicate/predicate_filter.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/casting/cast_executor.h"
+
+namespace paimon {
+// ExistFieldInfo = PartitionInfo + NonPartitionInfo
+// ExistFieldInfo equals the intersection of data scheme and read schema
+// ExistFieldInfo + NonExistFieldInfo = read schema
+struct PartitionInfo {
+ // the intersection of partition schema & read schema
+ std::vector<DataField> partition_read_schema;
+
+ // indicates the idx in read schema (not data schema)
+ std::vector<int32_t> idx_in_target_read_schema;
+
+ // indicates the idx in partition schema (in BinaryRow)
+ std::vector<int32_t> idx_in_partition;
+
+ // partition predicate
+ std::shared_ptr<Predicate> partition_filter;
+};
+
+struct NonPartitionInfo {
+ // the intersection of non-partition schema and read schema
+ std::vector<DataField> non_partition_read_schema;
+ std::vector<DataField> non_partition_data_schema;
+
+ // indicates the idx in read schema (not in data schema)
+ std::vector<int32_t> idx_in_target_read_schema;
+
+ // non-partition predicate
+ std::shared_ptr<Predicate> non_partition_filter;
+ std::vector<std::shared_ptr<CastExecutor>> cast_executors;
+};
+
+struct NonExistFieldInfo {
+ // the fields in read schema but not in data schema
+ std::vector<DataField> non_exist_read_schema;
+
+ // indicates the idx in read schema (not data schema)
+ std::vector<int32_t> idx_in_target_read_schema;
+};
+
+struct ExistFieldInfo {
+ // the fields in both read schema and data schema
+ std::vector<DataField> exist_read_schema;
+ std::vector<DataField> exist_data_schema;
+
+ // indicates the idx in read schema (not data schema)
+ std::vector<int32_t> idx_in_target_read_schema;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/partition/partition_statistics.h
b/src/paimon/core/partition/partition_statistics.h
new file mode 100644
index 0000000..67a3ac2
--- /dev/null
+++ b/src/paimon/core/partition/partition_statistics.h
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "paimon/common/utils/jsonizable.h"
+#include "paimon/common/utils/rapidjson_util.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+
+class PartitionStatistics : public Jsonizable<PartitionStatistics> {
+ public:
+ using SpecType = std::map<std::string, std::string>;
+ PartitionStatistics(const SpecType& spec, int64_t record_count, int64_t
file_size_in_bytes,
+ int64_t file_count, int64_t last_file_creation_time,
int32_t total_buckets)
+ : spec_(spec),
+ record_count_(record_count),
+ file_size_in_bytes_(file_size_in_bytes),
+ file_count_(file_count),
+ last_file_creation_time_(last_file_creation_time),
+ total_buckets_(total_buckets) {}
+
+ const SpecType& Spec() const {
+ return spec_;
+ }
+ int64_t RecordCount() const {
+ return record_count_;
+ }
+ int64_t FileSizeInBytes() const {
+ return file_size_in_bytes_;
+ }
+ int64_t FileCount() const {
+ return file_count_;
+ }
+ int64_t LastFileCreationTime() const {
+ return last_file_creation_time_;
+ }
+ int32_t TotalBuckets() const {
+ return total_buckets_;
+ }
+
+ rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator)
const
+ noexcept(false) override {
+ rapidjson::Value obj(rapidjson::kObjectType);
+ obj.AddMember(rapidjson::StringRef(FIELD_SPEC),
+ RapidJsonUtil::SerializeValue(spec_, allocator).Move(),
*allocator);
+ obj.AddMember(rapidjson::StringRef(FIELD_RECORD_COUNT),
+ RapidJsonUtil::SerializeValue(record_count_,
allocator).Move(), *allocator);
+ obj.AddMember(rapidjson::StringRef(FIELD_FILE_SIZE_IN_BYTES),
+ RapidJsonUtil::SerializeValue(file_size_in_bytes_,
allocator).Move(),
+ *allocator);
+ obj.AddMember(rapidjson::StringRef(FIELD_FILE_COUNT),
+ RapidJsonUtil::SerializeValue(file_count_,
allocator).Move(), *allocator);
+ obj.AddMember(rapidjson::StringRef(FIELD_LAST_FILE_CREATION_TIME),
+ RapidJsonUtil::SerializeValue(last_file_creation_time_,
allocator).Move(),
+ *allocator);
+ obj.AddMember(rapidjson::StringRef(FIELD_TOTAL_BUCKETS),
+ RapidJsonUtil::SerializeValue(total_buckets_,
allocator).Move(), *allocator);
+ return obj;
+ }
+
+ void FromJson(const rapidjson::Value& obj) noexcept(false) override {
+ spec_ = RapidJsonUtil::DeserializeKeyValue<SpecType>(obj, FIELD_SPEC);
+ record_count_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj,
FIELD_RECORD_COUNT);
+ file_size_in_bytes_ =
+ RapidJsonUtil::DeserializeKeyValue<int64_t>(obj,
FIELD_FILE_SIZE_IN_BYTES);
+ file_count_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj,
FIELD_FILE_COUNT);
+ last_file_creation_time_ =
+ RapidJsonUtil::DeserializeKeyValue<int64_t>(obj,
FIELD_LAST_FILE_CREATION_TIME);
+ total_buckets_ = RapidJsonUtil::DeserializeKeyValue<int32_t>(obj,
FIELD_TOTAL_BUCKETS);
+ }
+
+ bool operator==(const PartitionStatistics& rhs) const {
+ return record_count_ == rhs.record_count_ &&
+ file_size_in_bytes_ == rhs.file_size_in_bytes_ && file_count_
== rhs.file_count_ &&
+ last_file_creation_time_ == rhs.last_file_creation_time_ &&
+ total_buckets_ == rhs.total_buckets_ && spec_ == rhs.spec_;
+ }
+
+ std::string ToString() const {
+ std::ostringstream oss;
+ oss << "{spec={";
+ bool first = true;
+ for (const auto& kv : spec_) {
+ if (!first) oss << ", ";
+ first = false;
+ oss << kv.first << ":" << kv.second;
+ }
+ oss << "}, recordCount=" << record_count_ << ", fileSizeInBytes=" <<
file_size_in_bytes_
+ << ", fileCount=" << file_count_
+ << ", lastFileCreationTime=" << last_file_creation_time_
+ << ", totalBuckets=" << total_buckets_ << "}";
+ return oss.str();
+ }
+
+ private:
+ JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(PartitionStatistics);
+
+ private:
+ static constexpr const char* FIELD_SPEC = "spec";
+ static constexpr const char* FIELD_RECORD_COUNT = "recordCount";
+ static constexpr const char* FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes";
+ static constexpr const char* FIELD_FILE_COUNT = "fileCount";
+ static constexpr const char* FIELD_LAST_FILE_CREATION_TIME =
"lastFileCreationTime";
+ static constexpr const char* FIELD_TOTAL_BUCKETS = "totalBuckets";
+
+ SpecType spec_;
+ int64_t record_count_ = 0;
+ int64_t file_size_in_bytes_ = 0;
+ int64_t file_count_ = 0;
+ int64_t last_file_creation_time_ = 0;
+ int32_t total_buckets_ = 0;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/partition/partition_statistics_test.cpp
b/src/paimon/core/partition/partition_statistics_test.cpp
new file mode 100644
index 0000000..d77276d
--- /dev/null
+++ b/src/paimon/core/partition/partition_statistics_test.cpp
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/partition/partition_statistics.h"
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class PartitionStatisticsTest : public testing::Test {
+ public:
+ std::string ReplaceAll(const std::string& str) {
+ std::string replaced_str = StringUtils::Replace(str, " ", "");
+ replaced_str = StringUtils::Replace(replaced_str, "\t", "");
+ replaced_str = StringUtils::Replace(replaced_str, "\n", "");
+ return replaced_str;
+ }
+};
+
+TEST_F(PartitionStatisticsTest, TestJsonizable) {
+ std::string json_str = R"({
+ "spec": {
+ "f1": "10",
+ "f2": "20"
+ },
+ "recordCount": 4,
+ "fileSizeInBytes": 1118,
+ "fileCount": 2,
+ "lastFileCreationTime": 1724090888727,
+ "totalBuckets": 1
+ })";
+
+ ASSERT_OK_AND_ASSIGN(PartitionStatistics partition_statistics,
+ PartitionStatistics::FromJsonString(json_str));
+
+ PartitionStatistics expected_partition_statistics(
+ /*spec=*/{{"f1", "10"}, {"f2", "20"}}, /*record_count=*/4,
/*file_size_in_bytes=*/1118,
+ /*file_count=*/2, /*last_file_creation_time=*/1724090888727,
/*total_buckets=*/1);
+ ASSERT_EQ(expected_partition_statistics, partition_statistics);
+
+ ASSERT_OK_AND_ASSIGN(std::string new_json_str,
partition_statistics.ToJsonString());
+ ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(new_json_str));
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/tag/tag.cpp b/src/paimon/core/tag/tag.cpp
new file mode 100644
index 0000000..ebf7773
--- /dev/null
+++ b/src/paimon/core/tag/tag.cpp
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/tag/tag.h"
+
+#include <cassert>
+
+#include "paimon/common/utils/rapidjson_util.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+
+Tag::Tag(const std::optional<int32_t>& version, const int64_t id, const
int64_t schema_id,
+ const std::string& base_manifest_list,
+ const std::optional<int64_t>& base_manifest_list_size,
+ const std::string& delta_manifest_list,
+ const std::optional<int64_t>& delta_manifest_list_size,
+ const std::optional<std::string>& changelog_manifest_list,
+ const std::optional<int64_t>& changelog_manifest_list_size,
+ const std::optional<std::string>& index_manifest, const std::string&
commit_user,
+ const int64_t commit_identifier, const CommitKind commit_kind, const
int64_t time_millis,
+ const std::optional<std::map<int32_t, int64_t>>& log_offsets,
+ const std::optional<int64_t>& total_record_count,
+ const std::optional<int64_t>& delta_record_count,
+ const std::optional<int64_t>& changelog_record_count,
+ const std::optional<int64_t>& watermark, const
std::optional<std::string>& statistics,
+ const std::optional<std::map<std::string, std::string>>& properties,
+ const std::optional<int64_t>& next_row_id,
+ const std::optional<std::vector<int64_t>>& tag_create_time,
+ const std::optional<double_t>& tag_time_retained)
+ : Snapshot(version, id, schema_id, base_manifest_list,
base_manifest_list_size,
+ delta_manifest_list, delta_manifest_list_size,
changelog_manifest_list,
+ changelog_manifest_list_size, index_manifest, commit_user,
commit_identifier,
+ commit_kind, time_millis, log_offsets, total_record_count,
delta_record_count,
+ changelog_record_count, watermark, statistics, properties,
next_row_id),
+ tag_create_time_(tag_create_time),
+ tag_time_retained_(tag_time_retained) {}
+
+bool Tag::operator==(const Tag& other) const {
+ if (this == &other) {
+ return true;
+ }
+ return Snapshot::operator==(other) && tag_create_time_ ==
other.tag_create_time_ &&
+ tag_time_retained_ == other.tag_time_retained_;
+}
+
+bool Tag::TEST_Equal(const Tag& other) const {
+ if (this == &other) {
+ return true;
+ }
+
+ return Snapshot::TEST_Equal(other) && tag_create_time_ ==
other.tag_create_time_ &&
+ tag_time_retained_ == other.tag_time_retained_;
+}
+
+Result<Snapshot> Tag::TrimToSnapshot() const {
+ return Snapshot(Version(), Id(), SchemaId(), BaseManifestList(),
BaseManifestListSize(),
+ DeltaManifestList(), DeltaManifestListSize(),
ChangelogManifestList(),
+ ChangelogManifestListSize(), IndexManifest(),
CommitUser(), CommitIdentifier(),
+ GetCommitKind(), TimeMillis(), LogOffsets(),
TotalRecordCount(),
+ DeltaRecordCount(), ChangelogRecordCount(), Watermark(),
Statistics(),
+ Properties(), NextRowId());
+}
+
+rapidjson::Value Tag::ToJson(rapidjson::Document::AllocatorType* allocator)
const noexcept(false) {
+ rapidjson::Value obj(rapidjson::kObjectType);
+ obj = Snapshot::ToJson(allocator);
+ if (tag_create_time_ != std::nullopt) {
+ obj.AddMember(rapidjson::StringRef(FIELD_TAG_CREATE_TIME),
+ RapidJsonUtil::SerializeValue(tag_create_time_.value(),
allocator).Move(),
+ *allocator);
+ }
+ if (tag_time_retained_ != std::nullopt) {
+ obj.AddMember(rapidjson::StringRef(FIELD_TAG_TIME_RETAINED),
+
RapidJsonUtil::SerializeValue(tag_time_retained_.value(), allocator).Move(),
+ *allocator);
+ }
+ return obj;
+}
+
+void Tag::FromJson(const rapidjson::Value& obj) noexcept(false) {
+ Snapshot::FromJson(obj);
+ tag_create_time_ =
RapidJsonUtil::DeserializeKeyValue<std::optional<std::vector<int64_t>>>(
+ obj, FIELD_TAG_CREATE_TIME);
+ tag_time_retained_ =
+ RapidJsonUtil::DeserializeKeyValue<std::optional<double_t>>(obj,
FIELD_TAG_TIME_RETAINED);
+}
+
+Result<Tag> Tag::FromPath(const std::shared_ptr<FileSystem>& fs, const
std::string& path) {
+ std::string json_str;
+ PAIMON_RETURN_NOT_OK(fs->ReadFile(path, &json_str));
+ Tag tag;
+ PAIMON_RETURN_NOT_OK(RapidJsonUtil::FromJsonString(json_str, &tag));
+ return tag;
+}
+} // namespace paimon
diff --git a/src/paimon/core/tag/tag.h b/src/paimon/core/tag/tag.h
new file mode 100644
index 0000000..23508e5
--- /dev/null
+++ b/src/paimon/core/tag/tag.h
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+
+#include "paimon/common/utils/jsonizable.h"
+#include "paimon/core/snapshot.h"
+#include "paimon/result.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+class FileSystem;
+
+/// Snapshot with tagCreateTime and tagTimeRetained.
+class Tag : public Snapshot {
+ public:
+ static constexpr char FIELD_TAG_CREATE_TIME[] = "tagCreateTime";
+ static constexpr char FIELD_TAG_TIME_RETAINED[] = "tagTimeRetained";
+
+ JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(Tag);
+
+ Tag(const std::optional<int32_t>& version, int64_t id, int64_t schema_id,
+ const std::string& base_manifest_list,
+ const std::optional<int64_t>& base_manifest_list_size,
+ const std::string& delta_manifest_list,
+ const std::optional<int64_t>& delta_manifest_list_size,
+ const std::optional<std::string>& changelog_manifest_list,
+ const std::optional<int64_t>& changelog_manifest_list_size,
+ const std::optional<std::string>& index_manifest, const std::string&
commit_user,
+ int64_t commit_identifier, CommitKind commit_kind, int64_t time_millis,
+ const std::optional<std::map<int32_t, int64_t>>& log_offsets,
+ const std::optional<int64_t>& total_record_count,
+ const std::optional<int64_t>& delta_record_count,
+ const std::optional<int64_t>& changelog_record_count,
+ const std::optional<int64_t>& watermark, const
std::optional<std::string>& statistics,
+ const std::optional<std::map<std::string, std::string>>& properties,
+ const std::optional<int64_t>& next_row_id,
+ const std::optional<std::vector<int64_t>>& tag_create_time,
+ const std::optional<double_t>& tag_time_retained);
+
+ bool operator==(const Tag& other) const;
+ bool TEST_Equal(const Tag& other) const;
+
+ std::optional<std::vector<int64_t>> TagCreateTime() const {
+ return tag_create_time_;
+ }
+
+ std::optional<double_t> TagTimeRetained() const {
+ return tag_time_retained_;
+ }
+
+ Result<Snapshot> TrimToSnapshot() const;
+
+ rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator)
const
+ noexcept(false) override;
+
+ void FromJson(const rapidjson::Value& obj) noexcept(false) override;
+
+ static Result<Tag> FromPath(const std::shared_ptr<FileSystem>& fs, const
std::string& path);
+
+ private:
+ std::optional<std::vector<int64_t>> tag_create_time_;
+ std::optional<double_t> tag_time_retained_;
+};
+} // namespace paimon
diff --git a/src/paimon/core/tag/tag_test.cpp b/src/paimon/core/tag/tag_test.cpp
new file mode 100644
index 0000000..fb371e8
--- /dev/null
+++ b/src/paimon/core/tag/tag_test.cpp
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/tag/tag.h"
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class TagTest : public testing::Test {
+ public:
+ static std::string ReplaceAll(const std::string& str, const bool
serialized) {
+ std::string replaced_str = StringUtils::Replace(str, " ", "");
+ replaced_str = StringUtils::Replace(replaced_str, "\t", "");
+ replaced_str = StringUtils::Replace(replaced_str, "\n", "");
+ if (serialized) {
+ replaced_str = StringUtils::Replace(replaced_str, ".0",
".000000000");
+ }
+ return replaced_str;
+ }
+};
+
+TEST_F(TagTest, TestSimple) {
+ const std::map<int32_t, int64_t> log_offset = {{25, 30}};
+ const std::map<std::string, std::string> properties = {{"key1", "value1"},
{"key2", "value2"}};
+ const auto tag_create_time = std::vector<int64_t>({2026, 1, 2, 3, 4, 5,
6});
+ const Tag tag(
+ /*version=*/5, /*id=*/10, /*schema_id=*/15,
/*base_manifest_list=*/"base_manifest_list", 10,
+ /*delta_manifest_list=*/"delta_manifest_list", 20,
+ /*changelog_manifest_list=*/"changelog_manifest_list", 30,
+ /*index_manifest=*/"index_manifest",
+ /*commit_user=*/"commit_user_01", /*commit_identifier=*/20,
+ /*commit_kind=*/Snapshot::CommitKind::Compact(), /*time_millis=*/1234,
log_offset,
+ /*total_record_count=*/35,
+ /*delta_record_count=*/40, /*changelog_record_count=*/45,
/*watermark=*/50,
+ /*statistics=*/"statistic_test", properties, /*next_row_id=*/0,
+ /*tag_create_time=*/tag_create_time, /*tag_time_retained=*/5.0);
+ ASSERT_EQ(5, tag.Version());
+ ASSERT_EQ(10, tag.Id());
+ ASSERT_EQ(15, tag.SchemaId());
+ ASSERT_EQ("base_manifest_list", tag.BaseManifestList());
+ ASSERT_EQ(10, tag.BaseManifestListSize().value());
+ ASSERT_EQ("delta_manifest_list", tag.DeltaManifestList());
+ ASSERT_EQ(20, tag.DeltaManifestListSize().value());
+ ASSERT_EQ("changelog_manifest_list", tag.ChangelogManifestList().value());
+ ASSERT_EQ(30, tag.ChangelogManifestListSize().value());
+ ASSERT_EQ("index_manifest", tag.IndexManifest().value());
+ ASSERT_EQ("commit_user_01", tag.CommitUser());
+ ASSERT_EQ(20, tag.CommitIdentifier());
+ ASSERT_EQ(Snapshot::CommitKind::Compact(), tag.GetCommitKind());
+ ASSERT_EQ(1234, tag.TimeMillis());
+ ASSERT_EQ(log_offset, tag.LogOffsets().value());
+ ASSERT_EQ(35, tag.TotalRecordCount().value());
+ ASSERT_EQ(40, tag.DeltaRecordCount().value());
+ ASSERT_EQ(45, tag.ChangelogRecordCount().value());
+ ASSERT_EQ(50, tag.Watermark().value());
+ ASSERT_EQ("statistic_test", tag.Statistics().value());
+ ASSERT_EQ(properties, tag.Properties().value());
+ ASSERT_EQ(0, tag.NextRowId().value());
+ ASSERT_EQ(tag_create_time, tag.TagCreateTime().value());
+ ASSERT_EQ(5.0, tag.TagTimeRetained().value());
+}
+
+TEST_F(TagTest, TestFromPath) {
+ const std::string data_path = paimon::test::GetDataDir() +
+
"/orc/append_table_with_tag.db/append_table_with_tag/tag/tag-1";
+ const auto fs = std::make_shared<LocalFileSystem>();
+ ASSERT_OK_AND_ASSIGN(Tag tag, Tag::FromPath(fs, data_path));
+ ASSERT_EQ(3, tag.Version());
+ ASSERT_EQ(1, tag.Id());
+ ASSERT_EQ(0, tag.SchemaId());
+ ASSERT_EQ("manifest-list-616d1847-a02c-495f-9cca-2c8b7def0fec-0",
tag.BaseManifestList());
+ ASSERT_EQ(std::nullopt, tag.BaseManifestListSize());
+ ASSERT_EQ("manifest-list-616d1847-a02c-495f-9cca-2c8b7def0fec-1",
tag.DeltaManifestList());
+ ASSERT_EQ(std::nullopt, tag.DeltaManifestListSize());
+ ASSERT_EQ(std::nullopt, tag.ChangelogManifestList());
+ ASSERT_EQ(std::nullopt, tag.ChangelogManifestListSize());
+ ASSERT_EQ(std::nullopt, tag.IndexManifest());
+ ASSERT_EQ("b02e4322-9c5f-41e1-a560-c0156fdf7b9c", tag.CommitUser());
+ ASSERT_EQ(9223372036854775807ll, tag.CommitIdentifier());
+ ASSERT_EQ(Snapshot::CommitKind::Append(), tag.GetCommitKind());
+ ASSERT_EQ(1721614343270ll, tag.TimeMillis());
+ ASSERT_EQ((std::map<int32_t, int64_t>()), tag.LogOffsets().value());
+ ASSERT_EQ(5, tag.TotalRecordCount().value());
+ ASSERT_EQ(5, tag.DeltaRecordCount().value());
+ ASSERT_EQ(0, tag.ChangelogRecordCount().value());
+ ASSERT_EQ(std::nullopt, tag.Watermark());
+ ASSERT_EQ(std::nullopt, tag.Statistics());
+ ASSERT_EQ(std::nullopt, tag.Properties());
+ ASSERT_EQ(std::nullopt, tag.NextRowId());
+ ASSERT_EQ(std::vector<int64_t>({2026, 2, 4, 6, 8, 10, 12}),
tag.TagCreateTime());
+ ASSERT_EQ(3.0, tag.TagTimeRetained());
+}
+
+TEST_F(TagTest, TestJsonizable) {
+ const std::string json_str = R"({
+ "version" : 3,
+ "id" : 1,
+ "schemaId" : 0,
+ "baseManifestList" :
"manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-0",
+ "baseManifestListSize" : 20,
+ "deltaManifestList" :
"manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-1",
+ "deltaManifestListSize" : 50,
+ "changelogManifestList" : null,
+ "commitUser" : "0e4d92f7-53b0-40d6-a7c0-102bf3801e6a",
+ "commitIdentifier" : 9223372036854775807,
+ "commitKind" : "OVERWRITE",
+ "timeMillis" : 1711692199281,
+ "logOffsets" : { },
+ "totalRecordCount" : 3,
+ "deltaRecordCount" : 3,
+ "changelogRecordCount" : 0,
+ "tagCreateTime" : [ 2026, 1, 3, 5, 7, 9, 11 ],
+ "tagTimeRetained" : 4.000000000
+ })";
+
+ Tag tag;
+ ASSERT_OK(RapidJsonUtil::FromJsonString(json_str, &tag));
+
+ const Tag expected_tag(
+ /*version=*/3, /*id=*/1, /*schema_id=*/0, /*base_manifest_list=*/
+ "manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-0",
/*base_manifest_list_size=*/20,
+
/*delta_manifest_list=*/"manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-1",
+ /*delta_manifest_list_size=*/50,
/*changelog_manifest_list=*/std::nullopt,
+ /*changelog_manifest_list_size=*/std::nullopt,
/*index_manifest=*/std::nullopt,
+ /*commit_user=*/"0e4d92f7-53b0-40d6-a7c0-102bf3801e6a",
+ /*commit_identifier=*/9223372036854775807ll,
+ /*commit_kind=*/Snapshot::CommitKind::Overwrite(),
/*time_millis=*/1711692199281ll,
+ /*log_offsets=*/std::map<int32_t, int64_t>(),
+ /*total_record_count=*/3, /*delta_record_count=*/3,
/*changelog_record_count=*/0,
+ /*watermark=*/std::nullopt, /*statistics=*/std::nullopt,
/*properties=*/std::nullopt,
+ /*next_row_id=*/std::nullopt,
+ /*tag_create_time=*/std::vector<int64_t>({2026, 1, 3, 5, 7, 9, 11}),
+ /*tag_time_retained=*/4.0);
+ ASSERT_EQ(expected_tag, tag);
+
+ ASSERT_OK_AND_ASSIGN(std::string new_json_str, tag.ToJsonString());
+ ASSERT_EQ(ReplaceAll(json_str, false), ReplaceAll(new_json_str, true));
+}
+
+TEST_F(TagTest, TestSerializeAndDeserialize) {
+ const auto se_and_de = [&](const std::string& data_path) {
+ auto fs = std::make_shared<LocalFileSystem>();
+ std::string json_str;
+ ASSERT_OK(fs->ReadFile(data_path, &json_str));
+ ASSERT_OK_AND_ASSIGN(Tag tag, Tag::FromPath(fs, data_path));
+ ASSERT_OK_AND_ASSIGN(std::string se_json_str, tag.ToJsonString());
+ ASSERT_EQ(ReplaceAll(json_str, false), ReplaceAll(se_json_str, true));
+ };
+ auto se_and_de_from_str = [&](const std::string& json_str) {
+ Tag tag;
+ ASSERT_OK(RapidJsonUtil::FromJsonString(json_str, &tag));
+ ASSERT_OK_AND_ASSIGN(std::string se_json_str, tag.ToJsonString());
+ ASSERT_EQ(ReplaceAll(json_str, false), ReplaceAll(se_json_str, true));
+ };
+ {
+ const std::string data_path =
+ paimon::test::GetDataDir() +
+
"/orc/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/tag/tag-1";
+ se_and_de(data_path);
+ }
+ {
+ // with tagCreateTime
+ const std::string json_str = R"({
+ "version" : 3,
+ "id" : 10,
+ "schemaId" : 2,
+ "baseManifestList" : "base-manifest-list-1",
+ "baseManifestListSize" : 100,
+ "deltaManifestList" : "delta-manifest-list-2",
+ "deltaManifestListSize" : 200,
+ "changelogManifestList" : null,
+ "commitUser" : "commit-usr-3",
+ "commitIdentifier" : 12,
+ "commitKind" : "APPEND",
+ "timeMillis" : 1749724197266,
+ "logOffsets" : {
+ "0" : 1,
+ "1" : 3
+ },
+ "totalRecordCount" : 1024,
+ "deltaRecordCount" : 4096,
+ "watermark" : 1749724196266,
+ "statistics" : "statistics-4",
+ "properties" : {
+ "key0" : "value0",
+ "key1" : "value1"
+ },
+ "tagCreateTime": [ 2026, 2, 4, 6, 8, 10, 12 ]
+ })";
+ se_and_de_from_str(json_str);
+ }
+ {
+ // with tagTimeRetained
+ const std::string json_str = R"({
+ "version" : 3,
+ "id" : 10,
+ "schemaId" : 2,
+ "baseManifestList" : "base-manifest-list-1",
+ "baseManifestListSize" : 100,
+ "deltaManifestList" : "delta-manifest-list-2",
+ "deltaManifestListSize" : 200,
+ "changelogManifestList" : null,
+ "commitUser" : "commit-usr-3",
+ "commitIdentifier" : 12,
+ "commitKind" : "APPEND",
+ "timeMillis" : 1749724197266,
+ "logOffsets" : {
+ "0" : 1,
+ "1" : 3
+ },
+ "totalRecordCount" : 1024,
+ "deltaRecordCount" : 4096,
+ "watermark" : 1749724196266,
+ "statistics" : "statistics-4",
+ "properties" : {
+ "key0" : "value0",
+ "key1" : "value1"
+ },
+ "tagTimeRetained" : 2.000000000
+ })";
+ se_and_de_from_str(json_str);
+ }
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/view/view.h b/src/paimon/core/view/view.h
new file mode 100644
index 0000000..f689a23
--- /dev/null
+++ b/src/paimon/core/view/view.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/schema/schema.h"
+#include "paimon/status.h"
+#include "paimon/type_fwd.h"
+#include "paimon/visibility.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+/// Interface for view definition.
+class PAIMON_EXPORT View {
+ public:
+ View() = default;
+ virtual ~View() = default;
+
+ /// A name to identify this view.
+ virtual std::string Name() const = 0;
+
+ /// Full name of the view, default is database.tableName.
+ virtual std::string FullName() const {
+ return Name();
+ }
+
+ /// Returns the view representation.
+ virtual std::string Query() const = 0;
+
+ /// Loads the schema of view.
+ virtual std::shared_ptr<Schema> GetSchema() const = 0;
+};
+
+} // namespace paimon