This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new abd2615 feat: Migrate SST block infrastructure and file reader/writer
(#44)
abd2615 is described below
commit abd261542b0301d7da786e57acafb9efbe2e7274
Author: lxy <[email protected]>
AuthorDate: Thu Jun 4 09:45:45 2026 +0800
feat: Migrate SST block infrastructure and file reader/writer (#44)
---
src/paimon/common/sst/block_aligned_type.h | 46 ++++
src/paimon/common/sst/block_cache.h | 112 +++++++++
src/paimon/common/sst/block_cache_test.cpp | 356 ++++++++++++++++++++++++++++
src/paimon/common/sst/block_entry.h | 32 +++
src/paimon/common/sst/block_handle.cpp | 56 +++++
src/paimon/common/sst/block_handle.h | 53 +++++
src/paimon/common/sst/block_iterator.cpp | 103 ++++++++
src/paimon/common/sst/block_iterator.h | 56 +++++
src/paimon/common/sst/block_reader.cpp | 58 +++++
src/paimon/common/sst/block_reader.h | 90 +++++++
src/paimon/common/sst/block_trailer.cpp | 51 ++++
src/paimon/common/sst/block_trailer.h | 53 +++++
src/paimon/common/sst/block_writer.cpp | 70 ++++++
src/paimon/common/sst/block_writer.h | 92 +++++++
src/paimon/common/sst/bloom_filter_handle.h | 63 +++++
src/paimon/common/sst/sst_file_io_test.cpp | 305 ++++++++++++++++++++++++
src/paimon/common/sst/sst_file_reader.cpp | 195 +++++++++++++++
src/paimon/common/sst/sst_file_reader.h | 99 ++++++++
src/paimon/common/sst/sst_file_utils.h | 44 ++++
src/paimon/common/sst/sst_file_writer.cpp | 152 ++++++++++++
src/paimon/common/sst/sst_file_writer.h | 91 +++++++
21 files changed, 2177 insertions(+)
diff --git a/src/paimon/common/sst/block_aligned_type.h
b/src/paimon/common/sst/block_aligned_type.h
new file mode 100644
index 0000000..0c4e7d4
--- /dev/null
+++ b/src/paimon/common/sst/block_aligned_type.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// Enumeration for stream seek origin positions.
+enum class PAIMON_EXPORT BlockAlignedType { ALIGNED = 0, UNALIGNED = 1 };
+
+inline Result<BlockAlignedType> From(int8_t v) {
+ if (v == 0) {
+ return BlockAlignedType::ALIGNED;
+ } else if (v == 1) {
+ return BlockAlignedType::UNALIGNED;
+ } else {
+ return Status::Invalid("Invalid block aligned type: " +
std::to_string(v));
+ }
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_cache.h
b/src/paimon/common/sst/block_cache.h
new file mode 100644
index 0000000..4fd8bb2
--- /dev/null
+++ b/src/paimon/common/sst/block_cache.h
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+
+#include "paimon/common/io/cache/cache_manager.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+namespace paimon {
+
+class PAIMON_EXPORT BlockCache {
+ public:
+ BlockCache(const std::string& file_path, const
std::shared_ptr<InputStream>& in,
+ const std::shared_ptr<CacheManager>& cache_manager,
+ const std::shared_ptr<MemoryPool>& pool)
+ : pool_(pool), file_path_(file_path), in_(in),
cache_manager_(cache_manager) {}
+
+ ~BlockCache() {
+ Close();
+ }
+
+ Result<MemorySegment> GetBlock(
+ int64_t position, int32_t length, bool is_index,
+ std::function<Result<MemorySegment>(const MemorySegment&)>
decompress_func) {
+ auto key = CacheKey::ForPosition(file_path_, position, length,
is_index);
+ auto it = blocks_.find(key);
+ if (it == blocks_.end() || it->second.GetAccessCount() ==
CacheManager::REFRESH_COUNT) {
+ PAIMON_ASSIGN_OR_RAISE(
+ MemorySegment segment,
+ cache_manager_->GetPage(
+ key,
+ [&](const std::shared_ptr<paimon::CacheKey>&) ->
Result<MemorySegment> {
+ PAIMON_ASSIGN_OR_RAISE(MemorySegment compress_data,
+ ReadFrom(position, length));
+ if (!decompress_func) {
+ return compress_data;
+ }
+ return decompress_func(compress_data);
+ },
+ [this](const std::shared_ptr<CacheKey>& evicted_key) {
+ blocks_.erase(evicted_key);
+ }));
+ auto container = CacheManager::SegmentContainer(segment);
+ const auto& result_segment = container.Access();
+ blocks_.insert_or_assign(key, container);
+ return result_segment;
+ }
+ return it->second.Access();
+ }
+
+ /// Returns the number of entries in the local blocks_ cache.
+ size_t BlocksSize() const {
+ return blocks_.size();
+ }
+
+ /// Returns true if the local blocks_ cache contains an entry for the
given position/length.
+ bool ContainsBlock(int64_t position, int32_t length, bool is_index) const {
+ auto key = CacheKey::ForPosition(file_path_, position, length,
is_index);
+ return blocks_.find(key) != blocks_.end();
+ }
+
+ void Close() {
+ // Snapshot blocks_ to avoid iterator invalidation from `InvalidPage`
callback.
+ auto copied_blocks = blocks_;
+ for (const auto& [key, _] : copied_blocks) {
+ cache_manager_->InvalidPage(key);
+ }
+ // Some entries may remain in blocks_ if they were already evicted
from the
+ // LRU cache (InvalidPage is a no-op for missing keys), so clear
explicitly.
+ blocks_.clear();
+ }
+
+ private:
+ Result<MemorySegment> ReadFrom(int64_t offset, int32_t length) {
+ PAIMON_RETURN_NOT_OK(in_->Seek(offset, SeekOrigin::FS_SEEK_SET));
+ auto segment = MemorySegment::AllocateHeapMemory(length, pool_.get());
+ PAIMON_RETURN_NOT_OK(in_->Read(segment.MutableData(), length));
+ return segment;
+ }
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+ std::string file_path_;
+ std::shared_ptr<InputStream> in_;
+
+ std::shared_ptr<CacheManager> cache_manager_;
+ std::unordered_map<std::shared_ptr<CacheKey>,
CacheManager::SegmentContainer, CacheKeyHash,
+ CacheKeyEqual>
+ blocks_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_cache_test.cpp
b/src/paimon/common/sst/block_cache_test.cpp
new file mode 100644
index 0000000..7417d91
--- /dev/null
+++ b/src/paimon/common/sst/block_cache_test.cpp
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_cache.h"
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/io/cache/cache_manager.h"
+#include "paimon/common/io/cache/lru_cache.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+
+class BlockCacheTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ dir_ = UniqueTestDirectory::Create();
+ fs_ = dir_->GetFileSystem();
+ pool_ = GetDefaultPool();
+ }
+
+ void TearDown() override {}
+
+ Status WriteTestFile(const std::string& path, int32_t num_blocks, int32_t
block_size) const {
+ PAIMON_ASSIGN_OR_RAISE(auto out, fs_->Create(path, false));
+ for (int32_t i = 0; i < num_blocks; i++) {
+ auto segment = MemorySegment::AllocateHeapMemory(block_size,
pool_.get());
+ std::memset(segment.MutableData(), i & 0xFF, block_size);
+ PAIMON_RETURN_NOT_OK(out->Write(segment.MutableData(),
block_size));
+ }
+ PAIMON_RETURN_NOT_OK(out->Flush());
+ PAIMON_RETURN_NOT_OK(out->Close());
+ return Status::OK();
+ }
+
+ Result<MemorySegment> GetBlock(int32_t block_id, int32_t block_size,
BlockCache* block_cache,
+ bool is_index = false) const {
+ return block_cache->GetBlock(/*position=*/block_id * block_size,
/*length=*/block_size,
+ is_index,
+ /*decompress_func=*/nullptr);
+ }
+
+ bool ContainsBlock(int32_t block_id, int32_t block_size, BlockCache*
block_cache,
+ bool is_index = false) const {
+ return block_cache->ContainsBlock(/*position=*/block_id * block_size,
/*length=*/block_size,
+ is_index);
+ }
+
+ private:
+ std::unique_ptr<UniqueTestDirectory> dir_;
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<MemoryPool> pool_;
+};
+
+/// Verifies that the first GetBlock call reads from IO and subsequent calls
return from the
+/// local blocks_ cache without re-reading.
+TEST_F(BlockCacheTest, TestBasicCacheHit) {
+ const int32_t block_size = 64;
+ const int32_t num_blocks = 4;
+ auto file_path = dir_->Str() + "/basic_hit.data";
+ ASSERT_OK(WriteTestFile(file_path, num_blocks, block_size));
+
+ auto cache_manager = std::make_shared<CacheManager>(block_size *
num_blocks * 2, 0.0);
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in,
fs_->Open(file_path));
+ BlockCache block_cache(file_path, in, cache_manager, pool_);
+
+ // Initially blocks_ is empty
+ ASSERT_EQ(block_cache.BlocksSize(), 0);
+
+ // First access: populates both blocks_ and LRU
+ ASSERT_OK_AND_ASSIGN(auto seg1, GetBlock(0, block_size, &block_cache));
+ ASSERT_EQ(seg1.Size(), block_size);
+ ASSERT_EQ(seg1.Get(0), static_cast<char>(0));
+ ASSERT_EQ(block_cache.BlocksSize(), 1);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 1);
+
+ // Second access: returns from blocks_, no new LRU entry
+ ASSERT_OK_AND_ASSIGN(auto seg2, GetBlock(0, block_size, &block_cache));
+ ASSERT_EQ(seg2.Size(), block_size);
+ ASSERT_EQ(block_cache.BlocksSize(), 1);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 1);
+
+ // Load a different block
+ ASSERT_OK_AND_ASSIGN(auto seg3, GetBlock(1, block_size, &block_cache));
+ ASSERT_EQ(seg3.Get(0), static_cast<char>(1));
+ ASSERT_EQ(block_cache.BlocksSize(), 2);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+ ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+}
+
+/// Verifies that when LRU evicts an entry due to capacity pressure, the
eviction callback
+/// removes the corresponding entry from BlockCache's blocks_ map.
+TEST_F(BlockCacheTest, TestLruEvictionSyncsWithBlocks) {
+ const int32_t block_size = 100;
+ auto file_path = dir_->Str() + "/eviction.data";
+ ASSERT_OK(WriteTestFile(file_path, 5, block_size));
+
+ // Cache can hold at most 2 blocks (200 bytes)
+ auto cache_manager = std::make_shared<CacheManager>(block_size * 2, 0.0);
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in,
fs_->Open(file_path));
+ BlockCache block_cache(file_path, in, cache_manager, pool_);
+
+ // Load block 0 at position 0
+ ASSERT_OK_AND_ASSIGN(auto seg0, GetBlock(0, block_size, &block_cache));
+ ASSERT_EQ(seg0.Get(0), static_cast<char>(0));
+ ASSERT_EQ(block_cache.BlocksSize(), 1);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+
+ // Load block 1 at position block_size
+ ASSERT_OK_AND_ASSIGN(auto seg1, GetBlock(1, block_size, &block_cache));
+ ASSERT_EQ(seg1.Get(0), static_cast<char>(1));
+ ASSERT_EQ(block_cache.BlocksSize(), 2);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+ ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+
+ // Load block 2: evicts block 0 (LRU) from both LRU and blocks_
+ ASSERT_OK_AND_ASSIGN(auto seg2, GetBlock(2, block_size, &block_cache));
+ ASSERT_EQ(seg2.Get(0), static_cast<char>(2));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+ ASSERT_EQ(block_cache.BlocksSize(), 2);
+ // block 0 should be evicted from blocks_
+ ASSERT_FALSE(ContainsBlock(0, block_size, &block_cache));
+ // block 1 and block 2 should remain
+ ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+ ASSERT_TRUE(ContainsBlock(2, block_size, &block_cache));
+
+ // Re-access block 0: triggers fresh IO read, evicts block 1 (now LRU)
+ ASSERT_OK_AND_ASSIGN(auto seg0_reloaded, GetBlock(0, block_size,
&block_cache));
+ ASSERT_EQ(seg0_reloaded.Get(0), static_cast<char>(0));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+ ASSERT_EQ(block_cache.BlocksSize(), 2);
+ // block 1 should now be evicted
+ ASSERT_FALSE(ContainsBlock(1, block_size, &block_cache));
+ // block 0 and block 2 should remain
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+ ASSERT_TRUE(ContainsBlock(2, block_size, &block_cache));
+}
+
+/// Verifies that Close() invalidates all entries from both blocks_ and the
LRU cache.
+TEST_F(BlockCacheTest, TestClose) {
+ const int32_t block_size = 64;
+ auto file_path = dir_->Str() + "/close.data";
+ ASSERT_OK(WriteTestFile(file_path, 3, block_size));
+
+ auto cache_manager = std::make_shared<CacheManager>(block_size * 10, 0.0);
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in,
fs_->Open(file_path));
+ BlockCache block_cache(file_path, in, cache_manager, pool_);
+
+ // Load 3 blocks and verify blocks_ keys
+ for (int32_t i = 0; i < 3; i++) {
+ ASSERT_OK_AND_ASSIGN(auto seg, GetBlock(i, block_size, &block_cache));
+ ASSERT_EQ(seg.Get(0), static_cast<char>(i));
+ }
+ ASSERT_EQ(block_cache.BlocksSize(), 3);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+ ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+ ASSERT_TRUE(ContainsBlock(2, block_size, &block_cache));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 3);
+
+ block_cache.Close();
+
+ // After Close, both blocks_ and LRU should be empty
+ ASSERT_EQ(block_cache.BlocksSize(), 0);
+ ASSERT_FALSE(ContainsBlock(0, block_size, &block_cache));
+ ASSERT_FALSE(ContainsBlock(1, block_size, &block_cache));
+ ASSERT_FALSE(ContainsBlock(2, block_size, &block_cache));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 0);
+}
+
+/// Verifies that two BlockCache instances sharing the same CacheManager have
independent blocks_
+/// maps, but eviction in the shared LRU only affects the owning BlockCache's
blocks_.
+TEST_F(BlockCacheTest, TestSharedCacheManagerEvictionIsolation) {
+ const int32_t block_size = 100;
+ auto file_path_a = dir_->Str() + "/file_a.data";
+ auto file_path_b = dir_->Str() + "/file_b.data";
+ ASSERT_OK(WriteTestFile(file_path_a, 3, block_size));
+ ASSERT_OK(WriteTestFile(file_path_b, 3, block_size));
+
+ // Shared cache can hold 3 blocks total
+ auto cache_manager = std::make_shared<CacheManager>(block_size * 3, 0.0);
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in_a,
fs_->Open(file_path_a));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in_b,
fs_->Open(file_path_b));
+ BlockCache cache_a(file_path_a, in_a, cache_manager, pool_);
+ BlockCache cache_b(file_path_b, in_b, cache_manager, pool_);
+
+ // Load 2 blocks from file_a
+ ASSERT_OK_AND_ASSIGN(auto seg_a0, GetBlock(0, block_size, &cache_a));
+ ASSERT_OK_AND_ASSIGN(auto seg_a1, GetBlock(1, block_size, &cache_a));
+ ASSERT_EQ(cache_a.BlocksSize(), 2);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &cache_a));
+ ASSERT_TRUE(ContainsBlock(1, block_size, &cache_a));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+
+ // Load 1 block from file_b (total 3, at capacity)
+ ASSERT_OK_AND_ASSIGN(auto seg_b0, GetBlock(0, block_size, &cache_b));
+ ASSERT_EQ(cache_b.BlocksSize(), 1);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &cache_b));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 3);
+
+ // Load another block from file_b: should evict file_a's block 0 (the LRU
entry)
+ ASSERT_OK_AND_ASSIGN(auto seg_b1, GetBlock(1, block_size, &cache_b));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 3);
+
+ // cache_b should have 2 entries
+ ASSERT_EQ(cache_b.BlocksSize(), 2);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &cache_b));
+ ASSERT_TRUE(ContainsBlock(1, block_size, &cache_b));
+
+ // cache_a's block 0 was evicted by LRU callback, only block 1 remains
+ ASSERT_EQ(cache_a.BlocksSize(), 1);
+ ASSERT_FALSE(ContainsBlock(0, block_size, &cache_a));
+ ASSERT_TRUE(ContainsBlock(1, block_size, &cache_a));
+
+ // Re-access file_a's block 0: triggers fresh IO read, evicts file_a's
block 1 (now LRU)
+ ASSERT_OK_AND_ASSIGN(auto seg_a0_reloaded, GetBlock(0, block_size,
&cache_a));
+ ASSERT_EQ(seg_a0_reloaded.Get(0), static_cast<char>(0));
+ ASSERT_EQ(cache_a.BlocksSize(), 1);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &cache_a));
+ ASSERT_FALSE(ContainsBlock(1, block_size, &cache_a));
+
+ // cache_b should be unaffected
+ ASSERT_EQ(cache_b.BlocksSize(), 2);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &cache_b));
+ ASSERT_TRUE(ContainsBlock(1, block_size, &cache_b));
+
+ cache_a.Close();
+ cache_b.Close();
+}
+
+/// Verifies the REFRESH_COUNT mechanism interacts correctly with LRU eviction
ordering.
+/// After refreshing a block (re-inserting into LRU front), it should not be
the first to be
+/// evicted when capacity pressure occurs.
+TEST_F(BlockCacheTest, TestRefreshPreventsEviction) {
+ const int32_t block_size = 100;
+ auto file_path = dir_->Str() + "/refresh_eviction.data";
+ ASSERT_OK(WriteTestFile(file_path, 4, block_size));
+
+ // Cache can hold 2 blocks
+ auto cache_manager = std::make_shared<CacheManager>(block_size * 2, 0.0);
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in,
fs_->Open(file_path));
+ BlockCache block_cache(file_path, in, cache_manager, pool_);
+
+ // Load block 0 and block 1
+ ASSERT_OK_AND_ASSIGN(auto seg0, GetBlock(0, block_size, &block_cache));
+ ASSERT_OK_AND_ASSIGN(auto seg1, GetBlock(1, block_size, &block_cache));
+ ASSERT_EQ(block_cache.BlocksSize(), 2);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+ ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+
+ // Access block 0 REFRESH_COUNT times to trigger a refresh (moves it to
LRU front)
+ for (int32_t i = 1; i < CacheManager::REFRESH_COUNT; i++) {
+ ASSERT_OK_AND_ASSIGN(seg0, GetBlock(0, block_size, &block_cache));
+ }
+ // This 11th access triggers refresh, moving block 0 to LRU front
+ ASSERT_OK_AND_ASSIGN(seg0, GetBlock(0, block_size, &block_cache));
+
+ // blocks_ should still have both entries after refresh
+ ASSERT_EQ(block_cache.BlocksSize(), 2);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+ ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+
+ // Load block 2: should evict block 1 (not block 0, since block 0 was just
refreshed)
+ ASSERT_OK_AND_ASSIGN(auto seg2, GetBlock(2, block_size, &block_cache));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+ ASSERT_EQ(block_cache.BlocksSize(), 2);
+ // block 0 should still be in blocks_ (was refreshed to LRU front)
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+ // block 1 should be evicted from blocks_
+ ASSERT_FALSE(ContainsBlock(1, block_size, &block_cache));
+ // block 2 should be in blocks_
+ ASSERT_TRUE(ContainsBlock(2, block_size, &block_cache));
+
+ // Block 0 should still be accessible from blocks_ cache
+ ASSERT_OK_AND_ASSIGN(seg0, GetBlock(0, block_size, &block_cache));
+ ASSERT_EQ(seg0.Get(0), static_cast<char>(0));
+
+ // Block 1 was evicted, re-accessing triggers IO read
+ ASSERT_OK_AND_ASSIGN(seg1, GetBlock(1, block_size, &block_cache));
+ ASSERT_EQ(seg1.Get(0), static_cast<char>(1));
+}
+
+TEST_F(BlockCacheTest, TestIndexAndDataCache) {
+ const int32_t block_size = 100;
+ const int32_t num_blocks = 6;
+ auto file_path = dir_->Str() + "/index_and_data.data";
+ ASSERT_OK(WriteTestFile(file_path, num_blocks, block_size));
+
+ // index cache max weight = 100
+ // data cache max weight = 300
+ auto cache_manager = std::make_shared<CacheManager>(block_size * 4, 0.25);
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in,
fs_->Open(file_path));
+ BlockCache block_cache(file_path, in, cache_manager, pool_);
+
+ // First access for seg0, index
+ ASSERT_OK_AND_ASSIGN(auto seg0, GetBlock(0, block_size, &block_cache,
/*is_index=*/true));
+ ASSERT_EQ(seg0.Get(0), static_cast<char>(0));
+ ASSERT_EQ(block_cache.BlocksSize(), 1);
+ ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache, /*is_index=*/true));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 0);
+ ASSERT_EQ(cache_manager->IndexCache()->Size(), 1);
+
+ // Second access for seg1, index, seg0 will be evicted
+ ASSERT_OK_AND_ASSIGN(auto seg1, GetBlock(1, block_size, &block_cache,
/*is_index=*/true));
+ ASSERT_EQ(seg1.Get(0), static_cast<char>(1));
+ ASSERT_EQ(block_cache.BlocksSize(), 1);
+ ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache, /*is_index=*/true));
+ ASSERT_FALSE(ContainsBlock(0, block_size, &block_cache,
/*is_index=*/true));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 0);
+ ASSERT_EQ(cache_manager->IndexCache()->Size(), 1);
+
+ // Fills data cache
+ for (int32_t i = 2; i < 5; i++) {
+ ASSERT_OK_AND_ASSIGN(auto seg, GetBlock(i, block_size, &block_cache,
/*is_index=*/false));
+ ASSERT_EQ(seg.Get(0), static_cast<char>(i));
+ ASSERT_EQ(block_cache.BlocksSize(), 1 + i - 1);
+ ASSERT_TRUE(ContainsBlock(i, block_size, &block_cache,
/*is_index=*/false));
+ ASSERT_EQ(cache_manager->IndexCache()->Size(), 1);
+ ASSERT_EQ(cache_manager->DataCache()->Size(), i - 1);
+ }
+
+ ASSERT_OK_AND_ASSIGN(auto seg5, GetBlock(5, block_size, &block_cache,
/*is_index=*/false));
+ ASSERT_EQ(seg5.Get(0), static_cast<char>(5));
+ ASSERT_EQ(block_cache.BlocksSize(), 4);
+ ASSERT_TRUE(ContainsBlock(5, block_size, &block_cache,
/*is_index=*/false));
+ ASSERT_TRUE(ContainsBlock(4, block_size, &block_cache,
/*is_index=*/false));
+ ASSERT_TRUE(ContainsBlock(3, block_size, &block_cache,
/*is_index=*/false));
+ ASSERT_FALSE(ContainsBlock(2, block_size, &block_cache,
/*is_index=*/false));
+ ASSERT_EQ(cache_manager->DataCache()->Size(), 3);
+ ASSERT_EQ(cache_manager->IndexCache()->Size(), 1);
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/sst/block_entry.h
b/src/paimon/common/sst/block_entry.h
new file mode 100644
index 0000000..d7eb231
--- /dev/null
+++ b/src/paimon/common/sst/block_entry.h
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+struct BlockEntry {
+ BlockEntry(const MemorySlice& _key, const MemorySlice& _value) :
key(_key), value(_value) {}
+
+ MemorySlice key;
+ MemorySlice value;
+};
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_handle.cpp
b/src/paimon/common/sst/block_handle.cpp
new file mode 100644
index 0000000..6b02259
--- /dev/null
+++ b/src/paimon/common/sst/block_handle.cpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_handle.h"
+
+#include "paimon/common/memory/memory_slice_output.h"
+
+namespace paimon {
+
+Result<BlockHandle> BlockHandle::ReadBlockHandle(MemorySliceInput* input) {
+ PAIMON_ASSIGN_OR_RAISE(int64_t offset, input->ReadVarLenLong());
+ PAIMON_ASSIGN_OR_RAISE(int32_t size, input->ReadVarLenInt());
+ return BlockHandle(offset, size);
+}
+
+BlockHandle::BlockHandle(int64_t offset, int32_t size) : offset_(offset),
size_(size) {}
+
+int64_t BlockHandle::Offset() const {
+ return offset_;
+}
+
+int32_t BlockHandle::Size() const {
+ return size_;
+}
+
+int32_t BlockHandle::GetFullBlockSize() const {
+ return size_ + MAX_ENCODED_LENGTH;
+}
+
+std::string BlockHandle::ToString() const {
+ return "BlockHandle{offset=" + std::to_string(offset_) + ", size=" +
std::to_string(size_) +
+ "}";
+}
+
+Result<MemorySlice> BlockHandle::WriteBlockHandle(MemoryPool* pool) {
+ MemorySliceOutput output(MAX_ENCODED_LENGTH, pool);
+ PAIMON_RETURN_NOT_OK(output.WriteVarLenLong(offset_));
+ PAIMON_RETURN_NOT_OK(output.WriteVarLenInt(size_));
+ return output.ToSlice();
+}
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_handle.h
b/src/paimon/common/sst/block_handle.h
new file mode 100644
index 0000000..c00a5af
--- /dev/null
+++ b/src/paimon/common/sst/block_handle.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+class PAIMON_EXPORT BlockHandle {
+ public:
+ static Result<BlockHandle> ReadBlockHandle(MemorySliceInput* input);
+
+ public:
+ BlockHandle(int64_t offset, int32_t size);
+ ~BlockHandle() = default;
+
+ int64_t Offset() const;
+ int32_t Size() const;
+ int32_t GetFullBlockSize() const;
+
+ std::string ToString() const;
+ Result<MemorySlice> WriteBlockHandle(MemoryPool* pool);
+
+ public:
+ // max len for varlong is 9 bytes, max len for varint is 5 bytes
+ static constexpr int32_t MAX_ENCODED_LENGTH = 9 + 5;
+
+ private:
+ int64_t offset_;
+ int32_t size_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_iterator.cpp
b/src/paimon/common/sst/block_iterator.cpp
new file mode 100644
index 0000000..55d0ab9
--- /dev/null
+++ b/src/paimon/common/sst/block_iterator.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_iterator.h"
+
+#include "paimon/common/sst/block_reader.h"
+
+namespace paimon {
+BlockIterator::BlockIterator(const std::shared_ptr<BlockReader>& reader)
+ : input_(reader->BlockInput()), reader_(reader) {}
+
+bool BlockIterator::HasNext() const {
+ return polled_position_ >= 0 || input_.IsReadable();
+}
+
+Result<BlockEntry> BlockIterator::Next() {
+ if (!HasNext()) {
+ return Status::Invalid("no such element");
+ }
+ if (polled_position_ >= 0) {
+ PAIMON_RETURN_NOT_OK(input_.SetPosition(polled_position_));
+ polled_position_ = -1;
+ return ReadEntry();
+ }
+ return ReadEntry();
+}
+
+Result<BlockEntry> BlockIterator::ReadEntry() {
+ PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt());
+ auto key = input_.ReadSliceView(key_length);
+ PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt());
+ auto value = input_.ReadSliceView(value_length);
+ return BlockEntry(key, value);
+}
+
+Result<MemorySlice> BlockIterator::SkipKeyAndReadValue() {
+ if (polled_position_ >= 0) {
+ PAIMON_RETURN_NOT_OK(input_.SetPosition(polled_position_));
+ polled_position_ = -1;
+ }
+ PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt());
+ PAIMON_RETURN_NOT_OK(input_.SetPosition(input_.Position() + key_length));
+ PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt());
+ return input_.ReadSliceView(value_length);
+}
+
+Result<MemorySlice> BlockIterator::ReadKeyAndSkipValue() {
+ PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt());
+ auto key = input_.ReadSliceView(key_length);
+ PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt());
+ PAIMON_RETURN_NOT_OK(input_.SetPosition(input_.Position() + value_length));
+ return key;
+}
+
+Result<bool> BlockIterator::SeekTo(const MemorySlice& target_key) {
+ int32_t left = 0;
+ int32_t right = reader_->RecordCount() - 1;
+ polled_position_ = -1;
+
+ while (left <= right) {
+ int32_t mid = left + (right - left) / 2;
+
+ int32_t entry_position = reader_->SeekTo(mid);
+ PAIMON_RETURN_NOT_OK(input_.SetPosition(entry_position));
+
+ PAIMON_ASSIGN_OR_RAISE(MemorySlice mid_key, ReadKeyAndSkipValue());
+ PAIMON_ASSIGN_OR_RAISE(int32_t compare, reader_->Comparator()(mid_key,
target_key));
+
+ if (compare == 0) {
+ polled_position_ = entry_position;
+ return true;
+ } else if (compare > 0) {
+ // mid_key > target_key, this could be the first key >= target_key
+ polled_position_ = entry_position;
+ right = mid - 1;
+ } else {
+ // mid_key < target_key, need to look at larger keys
+ // Don't reset polled_position_ here - keep the last position
where key > target
+ left = mid + 1;
+ }
+ }
+
+ // If we exit the loop without finding exact match, polled_position_
points to
+ // the first entry with key >= target_key (if any), or -1 if all keys <
target_key
+ return false;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_iterator.h
b/src/paimon/common/sst/block_iterator.h
new file mode 100644
index 0000000..bfc8970
--- /dev/null
+++ b/src/paimon/common/sst/block_iterator.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/common/sst/block_entry.h"
+
+namespace paimon {
+class BlockReader;
+
+class PAIMON_EXPORT BlockIterator {
+ public:
+ explicit BlockIterator(const std::shared_ptr<BlockReader>& reader);
+
+ bool HasNext() const;
+
+ Result<BlockEntry> Next();
+
+ /// Read only the value MemorySlice from the current position, skipping
the key.
+ /// Used in fast-path iteration where no key comparison is needed.
+ /// @note Public function, conceptually similar to `Next()` but optimized
to skip key parsing.
+ Result<MemorySlice> SkipKeyAndReadValue();
+
+ Result<bool> SeekTo(const MemorySlice& target_key);
+
+ private:
+ /// Read only the key MemorySlice from the current position, skipping the
value.
+ /// @note Inner function, only called by `SeekTo`.
+ Result<MemorySlice> ReadKeyAndSkipValue();
+
+ Result<BlockEntry> ReadEntry();
+
+ MemorySliceInput input_;
+ /// Position of the entry that should be returned by Next() after SeekTo.
+ /// -1 means no pending entry.
+ int32_t polled_position_ = -1;
+ std::shared_ptr<BlockReader> reader_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_reader.cpp
b/src/paimon/common/sst/block_reader.cpp
new file mode 100644
index 0000000..c280be0
--- /dev/null
+++ b/src/paimon/common/sst/block_reader.cpp
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_reader.h"
+
+#include "paimon/common/sst/block_trailer.h"
+namespace paimon {
+
+Result<std::shared_ptr<BlockReader>> BlockReader::Create(const MemorySlice&
block,
+
MemorySlice::SliceComparator comparator) {
+ PAIMON_ASSIGN_OR_RAISE(BlockAlignedType type,
From(block.ReadByte(block.Length() - 1)));
+ const auto trailer_len = BlockTrailer::ENCODED_LENGTH;
+ int32_t size = block.ReadInt(block.Length() - trailer_len);
+ if (type == BlockAlignedType::ALIGNED) {
+ auto data = block.Slice(0, block.Length() - trailer_len);
+ return std::make_shared<AlignedBlockReader>(data, size,
std::move(comparator));
+ } else {
+ int32_t index_length = size * 4;
+ int32_t index_offset = block.Length() - trailer_len - index_length;
+ auto data = block.Slice(0, index_offset);
+ auto index = block.Slice(index_offset, index_length);
+ return std::make_shared<UnAlignedBlockReader>(data, index,
std::move(comparator));
+ }
+}
+
+std::unique_ptr<BlockIterator> BlockReader::Iterator() {
+ std::shared_ptr<BlockReader> ptr = shared_from_this();
+ return std::make_unique<BlockIterator>(ptr);
+}
+
+MemorySliceInput BlockReader::BlockInput() {
+ return block_.ToInput();
+}
+
+int32_t BlockReader::RecordCount() const {
+ return record_count_;
+}
+
+const MemorySlice::SliceComparator& BlockReader::Comparator() const {
+ return comparator_;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_reader.h
b/src/paimon/common/sst/block_reader.h
new file mode 100644
index 0000000..e8c7ffe
--- /dev/null
+++ b/src/paimon/common/sst/block_reader.h
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/common/sst/block_aligned_type.h"
+#include "paimon/common/sst/block_iterator.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+namespace paimon {
+class BlockIterator;
+
+/// Reader for a block.
+class PAIMON_EXPORT BlockReader : public
std::enable_shared_from_this<BlockReader> {
+ public:
+ virtual ~BlockReader() = default;
+
+ static Result<std::shared_ptr<BlockReader>> Create(const MemorySlice&
block,
+
MemorySlice::SliceComparator comparator);
+
+ virtual int32_t SeekTo(int32_t record_position) = 0;
+
+ int32_t RecordCount() const;
+ const MemorySlice::SliceComparator& Comparator() const;
+
+ std::unique_ptr<BlockIterator> Iterator();
+ MemorySliceInput BlockInput();
+
+ protected:
+ BlockReader(const MemorySlice& block, int32_t record_count,
+ MemorySlice::SliceComparator comparator)
+ : block_(block), comparator_(std::move(comparator)),
record_count_(record_count) {}
+
+ private:
+ MemorySlice block_;
+ MemorySlice::SliceComparator comparator_;
+ int32_t record_count_;
+};
+
+class AlignedBlockReader : public BlockReader {
+ public:
+ AlignedBlockReader(const MemorySlice& block, int32_t record_size,
+ MemorySlice::SliceComparator comparator)
+ : BlockReader(block, block.Length() / record_size,
std::move(comparator)),
+ record_size_(record_size) {}
+
+ int32_t SeekTo(int32_t record_position) override {
+ return record_size_ * record_position;
+ }
+
+ private:
+ int32_t record_size_;
+};
+
+class UnAlignedBlockReader : public BlockReader {
+ public:
+ UnAlignedBlockReader(const MemorySlice& data, const MemorySlice& index,
+ MemorySlice::SliceComparator comparator)
+ : BlockReader(data, index.Length() / 4, std::move(comparator)),
index_(index) {}
+
+ int32_t SeekTo(int32_t record_position) override {
+ return index_.ReadInt(record_position * 4);
+ }
+
+ private:
+ MemorySlice index_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_trailer.cpp
b/src/paimon/common/sst/block_trailer.cpp
new file mode 100644
index 0000000..5649102
--- /dev/null
+++ b/src/paimon/common/sst/block_trailer.cpp
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_trailer.h"
+
+#include "fmt/format.h"
+#include "paimon/common/memory/memory_slice_output.h"
+
+namespace paimon {
+
+std::unique_ptr<BlockTrailer> BlockTrailer::ReadBlockTrailer(MemorySliceInput*
input) {
+ auto compress = input->ReadUnsignedByte();
+ auto crc32c = input->ReadInt();
+ return std::make_unique<BlockTrailer>(compress, crc32c);
+}
+
+int32_t BlockTrailer::Crc32c() const {
+ return crc32c_;
+}
+
+int8_t BlockTrailer::CompressionType() const {
+ return compression_type_;
+}
+
+std::string BlockTrailer::ToString() const {
+ return fmt::format("BlockTrailer{{compression_type={}, crc32c_={:#x}}}",
+ std::to_string(compression_type_),
static_cast<uint32_t>(crc32c_));
+}
+
+MemorySlice BlockTrailer::WriteBlockTrailer(MemoryPool* pool) {
+ MemorySliceOutput output(ENCODED_LENGTH, pool);
+ output.WriteValue(compression_type_);
+ output.WriteValue(crc32c_);
+ return output.ToSlice();
+}
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_trailer.h
b/src/paimon/common/sst/block_trailer.h
new file mode 100644
index 0000000..a4c8aa5
--- /dev/null
+++ b/src/paimon/common/sst/block_trailer.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Trailer of a block.
+class PAIMON_EXPORT BlockTrailer {
+ public:
+ static std::unique_ptr<BlockTrailer> ReadBlockTrailer(MemorySliceInput*
input);
+
+ public:
+ BlockTrailer(int8_t compression_type, int32_t crc32c)
+ : crc32c_(crc32c), compression_type_(compression_type) {}
+
+ ~BlockTrailer() = default;
+
+ int32_t Crc32c() const;
+ int8_t CompressionType() const;
+
+ std::string ToString() const;
+ MemorySlice WriteBlockTrailer(MemoryPool* pool);
+
+ public:
+ static constexpr int32_t ENCODED_LENGTH = 5;
+
+ private:
+ int32_t crc32c_;
+ int8_t compression_type_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_writer.cpp
b/src/paimon/common/sst/block_writer.cpp
new file mode 100644
index 0000000..4be50fe
--- /dev/null
+++ b/src/paimon/common/sst/block_writer.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_writer.h"
+
+#include "paimon/common/sst/block_aligned_type.h"
+
+namespace paimon {
+
+Status BlockWriter::Write(std::shared_ptr<Bytes>& key, std::shared_ptr<Bytes>&
value) {
+ int32_t start_position = block_->Size();
+ PAIMON_RETURN_NOT_OK(block_->WriteVarLenInt(key->size()));
+ block_->WriteBytes(key);
+ PAIMON_RETURN_NOT_OK(block_->WriteVarLenInt(value->size()));
+ block_->WriteBytes(value);
+ int32_t end_position = block_->Size();
+ positions_.push_back(start_position);
+ if (aligned_) {
+ int32_t current_size = end_position - start_position;
+ if (aligned_size_ == 0) {
+ aligned_size_ = current_size;
+ } else {
+ aligned_ = aligned_size_ == current_size;
+ }
+ }
+ return Status::OK();
+}
+
+void BlockWriter::Reset() {
+ positions_.clear();
+ block_ = std::make_shared<MemorySliceOutput>(size_, pool_.get());
+ aligned_size_ = 0;
+ aligned_ = true;
+}
+
+Result<MemorySlice> BlockWriter::Finish() {
+ if (positions_.size() == 0) {
+ // Do not use alignment mode, as it is impossible to calculate how
many records are
+ // inside when reading
+ aligned_ = false;
+ }
+ if (aligned_) {
+ block_->WriteValue(aligned_size_);
+ } else {
+ for (const auto& position : positions_) {
+ block_->WriteValue(position);
+ }
+ block_->WriteValue(static_cast<int32_t>(positions_.size()));
+ }
+ block_->WriteValue(aligned_ ? static_cast<char>(BlockAlignedType::ALIGNED)
+ :
static_cast<char>(BlockAlignedType::UNALIGNED));
+ return block_->ToSlice();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/sst/block_writer.h
b/src/paimon/common/sst/block_writer.h
new file mode 100644
index 0000000..8e9d407
--- /dev/null
+++ b/src/paimon/common/sst/block_writer.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/memory/memory_slice_output.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+///
+/// Writer to build a Block. A block is designed for storing and
random-accessing k-v pairs. The
+/// layout is as below:
+///
+/// <pre>
+/// +---------------+
+/// | Block Trailer |
+/// +------------------------------------------------+
+/// | Block CRC32C | Compression |
+/// +------------------------------------------------+
+/// +---------------+
+/// | Block Data |
+/// +---------------+--------------------------------+----+
+/// | key len | key bytes | value len | value bytes | |
+/// +------------------------------------------------+ |
+/// | key len | key bytes | value len | value bytes | +-> Key-Value
pairs
+/// +------------------------------------------------+ |
+/// | ... ... | |
+/// +------------------------------------------------+----+
+/// | entry pos | entry pos | ... | entry pos | +-> optional,
for unaligned block
+/// +------------------------------------------------+----+
+/// | entry num / entry size | aligned type |
+/// +------------------------------------------------+
+/// </pre>
+///
+class PAIMON_EXPORT BlockWriter {
+ public:
+ BlockWriter(int32_t size, const std::shared_ptr<MemoryPool>& pool, bool
aligned = true)
+ : size_(size), pool_(pool), aligned_(aligned) {
+ block_ = std::make_shared<MemorySliceOutput>(size, pool_.get());
+ aligned_size_ = 0;
+ }
+
+ ~BlockWriter() = default;
+
+ Status Write(std::shared_ptr<Bytes>& key, std::shared_ptr<Bytes>& value);
+
+ void Reset();
+
+ int32_t Size() const {
+ return positions_.size();
+ }
+
+ int32_t Memory() const {
+ int32_t memory = block_->Size() + 5;
+ if (!aligned_) {
+ memory += positions_.size() * 4;
+ }
+ return memory;
+ }
+
+ Result<MemorySlice> Finish();
+
+ private:
+ const int32_t size_;
+ const std::shared_ptr<MemoryPool> pool_;
+
+ std::vector<int32_t> positions_;
+ std::shared_ptr<MemorySliceOutput> block_;
+ bool aligned_;
+ int32_t aligned_size_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/sst/bloom_filter_handle.h
b/src/paimon/common/sst/bloom_filter_handle.h
new file mode 100644
index 0000000..9796bcb
--- /dev/null
+++ b/src/paimon/common/sst/bloom_filter_handle.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+class PAIMON_EXPORT BloomFilterHandle {
+ public:
+ BloomFilterHandle(int64_t offset, int32_t size, int64_t expected_entries)
+ : offset_(offset), size_(size), expected_entries_(expected_entries) {}
+
+ BloomFilterHandle() = default;
+ ~BloomFilterHandle() = default;
+
+ int64_t Offset() const {
+ return offset_;
+ }
+
+ int32_t Size() const {
+ return size_;
+ }
+
+ int64_t ExpectedEntries() const {
+ return expected_entries_;
+ }
+
+ std::string ToString() const {
+ return "BloomFilterHandle{offset=" + std::to_string(offset_) +
+ ", size=" + std::to_string(size_) +
+ ", expected_entries=" + std::to_string(expected_entries_) + "}";
+ }
+
+ public:
+ static constexpr int32_t MAX_ENCODED_LENGTH = 9 + 5;
+
+ private:
+ int64_t offset_;
+ int32_t size_;
+ int64_t expected_entries_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/sst/sst_file_io_test.cpp
b/src/paimon/common/sst/sst_file_io_test.cpp
new file mode 100644
index 0000000..25053e1
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_io_test.cpp
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdint>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/factories/io_hook.h"
+#include "paimon/common/lookup/sort/sort_lookup_store_footer.h"
+#include "paimon/common/sst/sst_file_reader.h"
+#include "paimon/common/sst/sst_file_writer.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/defs.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/io_exception_helper.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon {
+class Predicate;
+} // namespace paimon
+
+namespace paimon::test {
+struct SstFileParam {
+ std::string file_path;
+ BlockCompressionType type;
+};
+
+class SstFileIOTest : public ::testing::TestWithParam<SstFileParam> {
+ public:
+ void SetUp() override {
+ dir_ = paimon::test::UniqueTestDirectory::Create();
+ fs_ = dir_->GetFileSystem();
+ pool_ = GetDefaultPool();
+ comparator_ = [](const MemorySlice& a, const MemorySlice& b) ->
Result<int32_t> {
+ std::string_view va = a.ReadStringView();
+ std::string_view vb = b.ReadStringView();
+ if (va == vb) {
+ return 0;
+ }
+ return va > vb ? 1 : -1;
+ };
+ }
+
+ void TearDown() override {
+ ASSERT_OK(fs_->Delete(dir_->Str()));
+ }
+
+ protected:
+ std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+ std::shared_ptr<paimon::FileSystem> fs_;
+ std::shared_ptr<paimon::MemoryPool> pool_;
+
+ MemorySlice::SliceComparator comparator_;
+ std::shared_ptr<CacheManager> cache_manager_ =
std::make_shared<CacheManager>(1024 * 1024, 0.0);
+};
+
+TEST_P(SstFileIOTest, TestSimple) {
+ auto param = GetParam();
+ auto index_path = dir_->Str() + "/sst_file_test.data";
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<BlockCompressionFactory> factory,
+ BlockCompressionFactory::Create(param.type));
+
+ // write content
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs_->Create(index_path, /*overwrite=*/false));
+
+ // write data
+ auto bf = BloomFilter::Create(30, 0.01);
+ auto seg_for_bf = MemorySegment::AllocateHeapMemory(bf->ByteLength(),
pool_.get());
+ ASSERT_OK(bf->SetMemorySegment(seg_for_bf));
+ auto writer = std::make_shared<SstFileWriter>(out, bf, 50, factory, pool_);
+ std::set<int32_t> value_hash;
+ // k1-k5
+ for (size_t i = 1; i <= 5; i++) {
+ std::string key = "k" + std::to_string(i);
+ std::string value = std::to_string(i);
+ ASSERT_OK(writer->Write(std::make_shared<Bytes>(key, pool_.get()),
+ std::make_shared<Bytes>(value, pool_.get())));
+ auto bytes = std::make_shared<Bytes>(key, pool_.get());
+ value_hash.insert(MurmurHashUtils::HashBytes(bytes));
+ }
+ // k910-k920
+ for (size_t i = 10; i <= 20; i++) {
+ std::string key = "k9" + std::to_string(i);
+ std::string value = "looooooooooong-值-" + std::to_string(i);
+ ASSERT_OK(writer->Write(std::make_shared<Bytes>(key, pool_.get()),
+ std::make_shared<Bytes>(value, pool_.get())));
+ auto bytes = std::make_shared<Bytes>(key, pool_.get());
+ value_hash.insert(MurmurHashUtils::HashBytes(bytes));
+ }
+ ASSERT_OK(writer->Flush());
+
+ ASSERT_EQ(6, writer->IndexWriter()->Size());
+
+ ASSERT_OK_AND_ASSIGN(auto bloom_filter_handle, writer->WriteBloomFilter());
+ ASSERT_OK_AND_ASSIGN(auto index_block_handle, writer->WriteIndexBlock());
+ SortLookupStoreFooter footer(index_block_handle, bloom_filter_handle);
+ auto slice = footer.WriteSortLookupStoreFooter(pool_.get());
+ ASSERT_OK(writer->WriteSlice(slice));
+
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+
+ // bloom filter test
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in,
fs_->Open(index_path));
+ auto entries = bloom_filter_handle->ExpectedEntries();
+ auto offset = bloom_filter_handle->Offset();
+ auto size = bloom_filter_handle->Size();
+ ASSERT_OK(in->Seek(offset, SeekOrigin::FS_SEEK_SET));
+ auto bloom_filer_bytes = Bytes::AllocateBytes(size, pool_.get());
+ ASSERT_OK(in->Read(bloom_filer_bytes->data(), bloom_filer_bytes->size()));
+ auto seg = MemorySegment::Wrap(std::move(bloom_filer_bytes));
+ auto bloom_filter = std::make_shared<BloomFilter>(entries, size);
+ ASSERT_OK(bloom_filter->SetMemorySegment(seg));
+ for (const auto& value : value_hash) {
+ ASSERT_TRUE(bloom_filter->TestHash(value));
+ }
+
+ // test read
+ ASSERT_OK_AND_ASSIGN(in, fs_->Open(index_path));
+ auto block_cache = std::make_shared<BlockCache>(index_path, in,
cache_manager_, pool_);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader, SstFileReader::CreateForSortLookupStore(in, comparator_,
block_cache, pool_));
+
+ // not exist key
+ std::string k0 = "k0";
+ ASSERT_FALSE(reader->Lookup(std::make_shared<Bytes>(k0,
pool_.get())).value());
+
+ // k4
+ std::string k4 = "k4";
+ ASSERT_OK_AND_ASSIGN(auto v4, reader->Lookup(std::make_shared<Bytes>(k4,
pool_.get())));
+ ASSERT_TRUE(v4);
+ std::string string4{v4->data(), v4->size()};
+ ASSERT_EQ("4", string4);
+
+ // not exist key
+ std::string k55 = "k55";
+ ASSERT_FALSE(reader->Lookup(std::make_shared<Bytes>(k55,
pool_.get())).value());
+
+ // k915
+ std::string k915 = "k915";
+ ASSERT_OK_AND_ASSIGN(auto v15,
reader->Lookup(std::make_shared<Bytes>(k915, pool_.get())));
+ ASSERT_TRUE(v15);
+ std::string string15{v15->data(), v15->size()};
+ ASSERT_EQ("looooooooooong-值-15", string15);
+}
+
+TEST_P(SstFileIOTest, TestJavaCompatibility) {
+ auto param = GetParam();
+
+ // key range [1_000_000, 2_000_000], value is equal to the key
+ std::string file = GetDataDir() + "/sst/" + param.file_path;
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, fs_->Open(file));
+
+ // test read
+ auto block_cache = std::make_shared<BlockCache>(file, in, cache_manager_,
pool_);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader, SstFileReader::CreateForSortLookupStore(in, comparator_,
block_cache, pool_));
+ // not exist key
+ std::string k0 = "10000";
+ ASSERT_FALSE(reader->Lookup(std::make_shared<Bytes>(k0,
pool_.get())).value());
+
+ // k1314520
+ std::string k1314520 = "1314520";
+ ASSERT_OK_AND_ASSIGN(auto v1314520,
+ reader->Lookup(std::make_shared<Bytes>(k1314520,
pool_.get())));
+ ASSERT_TRUE(v1314520);
+ std::string string1314520{v1314520->data(), v1314520->size()};
+ ASSERT_EQ("1314520", string1314520);
+
+ // not exist key
+ std::string k13145200 = "13145200";
+ ASSERT_FALSE(reader->Lookup(std::make_shared<Bytes>(k13145200,
pool_.get())).value());
+
+ std::string k1314521 = "1314521";
+ ASSERT_OK_AND_ASSIGN(auto v1314521,
+ reader->Lookup(std::make_shared<Bytes>(k1314521,
pool_.get())));
+ ASSERT_TRUE(v1314521);
+ std::string string1314521{v1314521->data(), v1314521->size()};
+ ASSERT_EQ("1314521", string1314521);
+
+ std::string k1999999 = "1999999";
+ ASSERT_OK_AND_ASSIGN(auto v1999999,
+ reader->Lookup(std::make_shared<Bytes>(k1999999,
pool_.get())));
+ ASSERT_TRUE(v1999999);
+ std::string string1999999{v1999999->data(), v1999999->size()};
+ ASSERT_EQ("1999999", string1999999);
+}
+
+TEST_F(SstFileIOTest, TestIOException) {
+ bool run_complete = false;
+ auto io_hook = paimon::IOHook::GetInstance();
+ for (size_t i = 0; i < 200; i++) {
+ auto test_dir = paimon::test::UniqueTestDirectory::Create();
+ ASSERT_TRUE(test_dir);
+ paimon::ScopeGuard guard([&io_hook]() { io_hook->Clear(); });
+ io_hook->Reset(i, paimon::IOHook::Mode::RETURN_ERROR);
+
+ auto index_path = test_dir->Str() + "/sst_io_exception_test.data";
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<BlockCompressionFactory> factory,
+
BlockCompressionFactory::Create(BlockCompressionType::ZSTD));
+
+ // write
+ auto out_result = fs_->Create(index_path, /*overwrite=*/false);
+ CHECK_HOOK_STATUS(out_result.status(), i);
+ std::shared_ptr<OutputStream> out = std::move(out_result).value();
+
+ auto bf = BloomFilter::Create(30, 0.01);
+ MemorySegment seg_for_bf =
MemorySegment::AllocateHeapMemory(bf->ByteLength(), pool_.get());
+ ASSERT_OK(bf->SetMemorySegment(seg_for_bf));
+ auto writer = std::make_shared<SstFileWriter>(out, bf, 50, factory,
pool_);
+
+ bool write_failed = false;
+ for (size_t j = 1; j <= 5; j++) {
+ std::string key = "k" + std::to_string(j);
+ std::string value = std::to_string(j);
+ auto write_status = writer->Write(std::make_shared<Bytes>(key,
pool_.get()),
+ std::make_shared<Bytes>(value,
pool_.get()));
+ if (!write_status.ok()) {
+ CHECK_HOOK_STATUS(write_status, i);
+ write_failed = true;
+ break;
+ }
+ }
+ if (write_failed) {
+ continue;
+ }
+
+ CHECK_HOOK_STATUS(writer->Flush(), i);
+
+ auto bloom_filter_handle_result = writer->WriteBloomFilter();
+ CHECK_HOOK_STATUS(bloom_filter_handle_result.status(), i);
+ auto index_block_handle_result = writer->WriteIndexBlock();
+ CHECK_HOOK_STATUS(index_block_handle_result.status(), i);
+ SortLookupStoreFooter test_footer(index_block_handle_result.value(),
+ bloom_filter_handle_result.value());
+ auto test_slice = test_footer.WriteSortLookupStoreFooter(pool_.get());
+ CHECK_HOOK_STATUS(writer->WriteSlice(test_slice), i);
+
+ CHECK_HOOK_STATUS(out->Flush(), i);
+ CHECK_HOOK_STATUS(out->Close(), i);
+
+ // read
+ auto in_result = fs_->Open(index_path);
+ CHECK_HOOK_STATUS(in_result.status(), i);
+ std::shared_ptr<InputStream> in = std::move(in_result).value();
+
+ auto block_cache = std::make_shared<BlockCache>(index_path, in,
cache_manager_, pool_);
+ auto reader_result =
+ SstFileReader::CreateForSortLookupStore(in, comparator_,
block_cache, pool_);
+ CHECK_HOOK_STATUS(reader_result.status(), i);
+ std::shared_ptr<SstFileReader> reader =
std::move(reader_result).value();
+
+ std::string k4 = "k4";
+ auto v4_result = reader->Lookup(std::make_shared<Bytes>(k4,
pool_.get()));
+ CHECK_HOOK_STATUS(v4_result.status(), i);
+ ASSERT_TRUE(v4_result.value());
+ std::string string4{v4_result.value()->data(),
v4_result.value()->size()};
+ ASSERT_EQ("4", string4);
+
+ ASSERT_OK(reader->Close());
+ run_complete = true;
+ break;
+ }
+ ASSERT_TRUE(run_complete);
+}
+
+INSTANTIATE_TEST_SUITE_P(Group, SstFileIOTest,
+
::testing::Values(SstFileParam{"none/79d01717-8380-4504-86e1-387e6c058d0a",
+
BlockCompressionType::NONE},
+
SstFileParam{"zstd/83d05c53-2353-4160-b756-d50dd851b474",
+
BlockCompressionType::ZSTD},
+
SstFileParam{"lz4/10540951-41d3-4216-aa2c-b15dfd25eb75",
+
BlockCompressionType::LZ4}));
+
+} // namespace paimon::test
diff --git a/src/paimon/common/sst/sst_file_reader.cpp
b/src/paimon/common/sst/sst_file_reader.cpp
new file mode 100644
index 0000000..9639bcd
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_reader.cpp
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/sst/sst_file_reader.h"
+
+#include "fmt/format.h"
+#include "paimon/common/lookup/sort/sort_lookup_store_footer.h"
+#include "paimon/common/sst/sst_file_utils.h"
+#include "paimon/common/utils/crc32c.h"
+#include "paimon/common/utils/murmurhash_utils.h"
+namespace paimon {
+
+Result<std::shared_ptr<SstFileReader>> SstFileReader::Create(
+ const BlockHandle& index_block_handle,
+ const std::optional<BloomFilterHandle>& bloom_filter_handle,
+ MemorySlice::SliceComparator comparator, const
std::shared_ptr<BlockCache>& block_cache,
+ const std::shared_ptr<MemoryPool>& pool) {
+ // read bloom filter directly now
+ std::shared_ptr<BloomFilter> bloom_filter = nullptr;
+ if (bloom_filter_handle.has_value() &&
+ (bloom_filter_handle->ExpectedEntries() || bloom_filter_handle->Size()
||
+ bloom_filter_handle->Offset())) {
+ bloom_filter =
std::make_shared<BloomFilter>(bloom_filter_handle->ExpectedEntries(),
+
bloom_filter_handle->Size());
+ PAIMON_ASSIGN_OR_RAISE(
+ MemorySegment bloom_filter_data,
+ block_cache->GetBlock(bloom_filter_handle->Offset(),
bloom_filter_handle->Size(),
+ /*is_index=*/true,
/*decompress_func=*/nullptr));
+
PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(bloom_filter_data));
+ }
+
+ // create index block reader
+ PAIMON_ASSIGN_OR_RAISE(
+ MemorySegment trailer_data,
+ block_cache->GetBlock(index_block_handle.Offset() +
index_block_handle.Size(),
+ BlockTrailer::ENCODED_LENGTH, /*is_index=*/true,
+ /*decompress_func=*/nullptr));
+ auto trailer_slice = MemorySlice::Wrap(trailer_data);
+ auto trailer_input = trailer_slice.ToInput();
+ std::shared_ptr<BlockTrailer> trailer =
BlockTrailer::ReadBlockTrailer(&trailer_input);
+ PAIMON_ASSIGN_OR_RAISE(
+ MemorySegment block_data,
+ block_cache->GetBlock(index_block_handle.Offset(),
index_block_handle.Size(), true,
+ [pool, trailer](const MemorySegment& seg) ->
Result<MemorySegment> {
+ return DecompressBlock(seg, trailer, pool);
+ }));
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BlockReader> reader,
+ BlockReader::Create(MemorySlice::Wrap(block_data),
comparator));
+ return std::shared_ptr<SstFileReader>(
+ new SstFileReader(pool, block_cache, bloom_filter, reader,
comparator));
+}
+
+Result<std::shared_ptr<SstFileReader>> SstFileReader::CreateForSortLookupStore(
+ const std::shared_ptr<InputStream>& in, MemorySlice::SliceComparator
comparator,
+ const std::shared_ptr<BlockCache>& block_cache, const
std::shared_ptr<MemoryPool>& pool) {
+ PAIMON_ASSIGN_OR_RAISE(uint64_t file_len, in->Length());
+ PAIMON_RETURN_NOT_OK(
+ in->Seek(file_len - SortLookupStoreFooter::ENCODED_LENGTH,
SeekOrigin::FS_SEEK_SET));
+ auto footer_bytes =
Bytes::AllocateBytes(SortLookupStoreFooter::ENCODED_LENGTH, pool.get());
+ PAIMON_RETURN_NOT_OK(in->Read(footer_bytes->data(), footer_bytes->size()));
+ auto footer_segment = MemorySegment::Wrap(std::move(footer_bytes));
+ auto footer_slice = MemorySlice::Wrap(footer_segment);
+ auto footer_input = footer_slice.ToInput();
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<SortLookupStoreFooter> read_footer,
+
SortLookupStoreFooter::ReadSortLookupStoreFooter(&footer_input));
+ return SstFileReader::Create(read_footer->GetIndexBlockHandle(),
+ read_footer->GetBloomFilterHandle(),
std::move(comparator),
+ block_cache, pool);
+}
+
+SstFileReader::SstFileReader(const std::shared_ptr<MemoryPool>& pool,
+ const std::shared_ptr<BlockCache>& block_cache,
+ const std::shared_ptr<BloomFilter>& bloom_filter,
+ const std::shared_ptr<BlockReader>&
index_block_reader,
+ MemorySlice::SliceComparator comparator)
+ : pool_(pool),
+ block_cache_(block_cache),
+ bloom_filter_(bloom_filter),
+ index_block_reader_(index_block_reader),
+ comparator_(std::move(comparator)) {}
+
+std::unique_ptr<BlockIterator> SstFileReader::CreateIndexIterator() {
+ return index_block_reader_->Iterator();
+}
+
+Result<std::shared_ptr<Bytes>> SstFileReader::Lookup(const
std::shared_ptr<Bytes>& key) {
+ if (bloom_filter_.get() &&
!bloom_filter_->TestHash(MurmurHashUtils::HashBytes(key))) {
+ return std::shared_ptr<Bytes>();
+ }
+ auto key_slice = MemorySlice::Wrap(key);
+ // seek the index to the block containing the key
+ auto index_block_iterator = index_block_reader_->Iterator();
+ PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool _,
index_block_iterator->SeekTo(key_slice));
+ // if indexIterator does not have a next, it means the key does not exist
in this iterator
+ if (index_block_iterator->HasNext()) {
+ // seek the current iterator to the key
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<BlockIterator> current,
+ GetNextBlock(index_block_iterator));
+ PAIMON_ASSIGN_OR_RAISE(bool success, current->SeekTo(key_slice));
+ if (success) {
+ PAIMON_ASSIGN_OR_RAISE(BlockEntry ret, current->Next());
+ return ret.value.CopyBytes(pool_.get());
+ }
+ }
+ return std::shared_ptr<Bytes>();
+}
+
+Result<std::unique_ptr<BlockIterator>> SstFileReader::GetNextBlock(
+ std::unique_ptr<BlockIterator>& index_iterator) {
+ PAIMON_ASSIGN_OR_RAISE(BlockEntry block_entry, index_iterator->Next());
+ auto block_input = block_entry.value.ToInput();
+ PAIMON_ASSIGN_OR_RAISE(BlockHandle block_handle,
BlockHandle::ReadBlockHandle(&block_input));
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BlockReader> reader,
ReadBlock(block_handle, false));
+ return reader->Iterator();
+}
+
+Result<std::shared_ptr<BlockReader>> SstFileReader::ReadBlock(const
BlockHandle& handle,
+ bool index) {
+ PAIMON_ASSIGN_OR_RAISE(
+ MemorySegment trailer_data,
+ block_cache_->GetBlock(handle.Offset() + handle.Size(),
BlockTrailer::ENCODED_LENGTH,
+ /*is_index=*/true,
/*decompress_func=*/nullptr));
+ auto trailer_slice = MemorySlice::Wrap(trailer_data);
+ auto trailer_input = trailer_slice.ToInput();
+ std::shared_ptr<paimon::BlockTrailer> trailer =
BlockTrailer::ReadBlockTrailer(&trailer_input);
+ PAIMON_ASSIGN_OR_RAISE(
+ MemorySegment block_data,
+ block_cache_->GetBlock(handle.Offset(), handle.Size(), index,
+ [this, trailer](const MemorySegment& seg) ->
Result<MemorySegment> {
+ return DecompressBlock(seg, trailer, pool_);
+ }));
+ return BlockReader::Create(MemorySlice::Wrap(block_data), comparator_);
+}
+
+Result<MemorySegment> SstFileReader::DecompressBlock(const MemorySegment&
compressed_data,
+ const
std::shared_ptr<BlockTrailer>& trailer,
+ const
std::shared_ptr<MemoryPool>& pool) {
+ // check crc32c
+ auto crc32c_code = CRC32C::calculate(compressed_data.Data(),
compressed_data.Size());
+ auto compression_val =
+ static_cast<char>(static_cast<int32_t>(trailer->CompressionType()) &
0xFF);
+ crc32c_code = CRC32C::calculate(&compression_val, 1, crc32c_code);
+ if (trailer->Crc32c() != static_cast<int32_t>(crc32c_code)) {
+ return Status::Invalid(fmt::format("Expected crc32c({:#x}) but found
crc32c({:#x})",
+
static_cast<uint32_t>(trailer->Crc32c()), crc32c_code));
+ }
+
+ // decompress data
+ PAIMON_ASSIGN_OR_RAISE(BlockCompressionType compress_type,
+ SstFileUtils::From(trailer->CompressionType()));
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BlockCompressionFactory> factory,
+ BlockCompressionFactory::Create(compress_type));
+ if (!factory || factory->GetCompressionType() ==
BlockCompressionType::NONE) {
+ return compressed_data;
+ } else {
+ auto decompressor = factory->GetDecompressor();
+ auto slice = MemorySlice::Wrap(compressed_data);
+ auto input = slice.ToInput();
+ PAIMON_ASSIGN_OR_RAISE(int32_t uncompressed_size,
input.ReadVarLenInt());
+ auto output = MemorySegment::AllocateHeapMemory(uncompressed_size,
pool.get());
+
+ PAIMON_ASSIGN_OR_RAISE(
+ int32_t actual_uncompressed_size,
+ decompressor->Decompress(compressed_data.Data() +
input.Position(), input.Available(),
+ output.MutableData(), output.Size()));
+ if (actual_uncompressed_size != output.Size()) {
+ return Status::Invalid(fmt::format(
+ "Invalid data: expect uncompressed size {}, actual
uncompressed size {}",
+ output.Size(), actual_uncompressed_size));
+ }
+ return output;
+ }
+}
+
+Status SstFileReader::Close() {
+ // TODO(xinyu.lxy): support close FileBasedBloomFilter
+ block_cache_->Close();
+ return Status::OK();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/sst/sst_file_reader.h
b/src/paimon/common/sst/sst_file_reader.h
new file mode 100644
index 0000000..c977468
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_reader.h
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/compression/block_compression_factory.h"
+#include "paimon/common/sst/block_cache.h"
+#include "paimon/common/sst/block_handle.h"
+#include "paimon/common/sst/block_iterator.h"
+#include "paimon/common/sst/block_reader.h"
+#include "paimon/common/sst/block_trailer.h"
+#include "paimon/common/sst/bloom_filter_handle.h"
+#include "paimon/common/utils/bit_set.h"
+#include "paimon/common/utils/bloom_filter.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class SstFileIterator;
+
+/// An SST File Reader which serves point queries and range queries. Users can
call
+/// CreateIterator() to create a file iterator and then use seek and read
methods to do range
+/// queries. Note that this class is NOT thread-safe.
+class PAIMON_EXPORT SstFileReader {
+ public:
+ static Result<std::shared_ptr<SstFileReader>> Create(
+ const BlockHandle& index_block_handle,
+ const std::optional<BloomFilterHandle>& bloom_filter_handle,
+ MemorySlice::SliceComparator comparator, const
std::shared_ptr<BlockCache>& block_cache,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ /// Create an SstFileReader by reading the SortLookupStoreFooter from the
given InputStream.
+ /// This method encapsulates the common pattern of reading the footer,
parsing it, and
+ /// creating the reader, which avoids code duplication across callers.
+ static Result<std::shared_ptr<SstFileReader>> CreateForSortLookupStore(
+ const std::shared_ptr<InputStream>& input,
MemorySlice::SliceComparator comparator,
+ const std::shared_ptr<BlockCache>& block_cache, const
std::shared_ptr<MemoryPool>& pool);
+
+ ~SstFileReader() {
+ [[maybe_unused]] Status _ = Close();
+ }
+
+ /// Create an iterator for the index block.
+ std::unique_ptr<BlockIterator> CreateIndexIterator();
+
+ /// Lookup the specified key in the file.
+ ///
+ /// @param key serialized key
+ /// @return corresponding serialized value, nullptr if not found.
+ Result<std::shared_ptr<Bytes>> Lookup(const std::shared_ptr<Bytes>& key);
+
+ Result<std::unique_ptr<BlockIterator>> GetNextBlock(
+ std::unique_ptr<BlockIterator>& index_iterator);
+
+ /// @param handle The block handle.
+ /// @param index Whether read the block as an index.
+ /// @return The reader of the target block.
+ Result<std::shared_ptr<BlockReader>> ReadBlock(const BlockHandle& handle,
bool index);
+
+ Status Close();
+
+ private:
+ static Result<MemorySegment> DecompressBlock(const MemorySegment&
compressed_data,
+ const
std::shared_ptr<BlockTrailer>& trailer,
+ const
std::shared_ptr<MemoryPool>& pool);
+
+ SstFileReader(const std::shared_ptr<MemoryPool>& pool,
+ const std::shared_ptr<BlockCache>& block_cache,
+ const std::shared_ptr<BloomFilter>& bloom_filter,
+ const std::shared_ptr<BlockReader>& index_block_reader,
+ MemorySlice::SliceComparator comparator);
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+ std::shared_ptr<BlockCache> block_cache_;
+ std::shared_ptr<BloomFilter> bloom_filter_;
+ std::shared_ptr<BlockReader> index_block_reader_;
+ MemorySlice::SliceComparator comparator_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/sst/sst_file_utils.h
b/src/paimon/common/sst/sst_file_utils.h
new file mode 100644
index 0000000..24c6178
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_utils.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <sstream>
+
+#include "fmt/format.h"
+#include "paimon/common/compression/block_compression_type.h"
+#include "paimon/common/memory/memory_slice.h"
+
+namespace paimon {
+
+/// Utils for sst file.
+class SstFileUtils {
+ public:
+ static Result<BlockCompressionType> From(int8_t v) {
+ if (v == 0) {
+ return BlockCompressionType::NONE;
+ } else if (v == 1) {
+ return BlockCompressionType::ZSTD;
+ } else if (v == 2) {
+ return BlockCompressionType::LZ4;
+ }
+ return Status::Invalid(
+ fmt::format("not support compression type code {}",
static_cast<int32_t>(v)));
+ }
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/sst/sst_file_writer.cpp
b/src/paimon/common/sst/sst_file_writer.cpp
new file mode 100644
index 0000000..003a386
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_writer.cpp
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/sst_file_writer.h"
+
+#include "paimon/common/sst/sst_file_utils.h"
+#include "paimon/common/utils/crc32c.h"
+#include "paimon/common/utils/murmurhash_utils.h"
+
+namespace paimon {
+SstFileWriter::SstFileWriter(const std::shared_ptr<OutputStream>& out,
+ const std::shared_ptr<BloomFilter>& bloom_filter,
int32_t block_size,
+ const std::shared_ptr<BlockCompressionFactory>&
factory,
+ const std::shared_ptr<MemoryPool>& pool)
+ : pool_(pool), out_(out), bloom_filter_(bloom_filter),
block_size_(block_size) {
+ data_block_writer_ =
+ std::make_unique<BlockWriter>(static_cast<int32_t>(block_size * 1.1),
pool);
+ index_block_writer_ =
+ std::make_unique<BlockWriter>(BlockHandle::MAX_ENCODED_LENGTH * 1024,
pool);
+ compression_type_ = factory->GetCompressionType();
+ compressor_ = factory->GetCompressor();
+}
+
+Status SstFileWriter::Write(std::shared_ptr<Bytes>&& key,
std::shared_ptr<Bytes>&& value) {
+ PAIMON_RETURN_NOT_OK(data_block_writer_->Write(key, value));
+ last_key_ = key;
+ if (data_block_writer_->Memory() > block_size_) {
+ PAIMON_RETURN_NOT_OK(Flush());
+ }
+ if (bloom_filter_) {
+ // Double-check that bloom_filter_ is valid
+ if (!bloom_filter_->GetBitSet()) {
+ return Status::Invalid("Bloom filter bit set is null");
+ }
+
PAIMON_RETURN_NOT_OK(bloom_filter_->AddHash(MurmurHashUtils::HashBytes(key)));
+ }
+ return Status::OK();
+}
+
+Status SstFileWriter::Flush() {
+ if (data_block_writer_->Size() == 0) {
+ return Status::OK();
+ }
+
+ PAIMON_ASSIGN_OR_RAISE(BlockHandle handle,
FlushBlockWriter(data_block_writer_.get()));
+
+ PAIMON_ASSIGN_OR_RAISE(MemorySlice slice,
handle.WriteBlockHandle(pool_.get()));
+ auto value = slice.CopyBytes(pool_.get());
+ PAIMON_RETURN_NOT_OK(index_block_writer_->Write(last_key_, value));
+ return Status::OK();
+}
+
+Result<BlockHandle> SstFileWriter::WriteIndexBlock() {
+ return FlushBlockWriter(index_block_writer_.get());
+}
+
+Result<std::optional<BloomFilterHandle>> SstFileWriter::WriteBloomFilter() {
+ if (!bloom_filter_) {
+ return std::optional<BloomFilterHandle>();
+ }
+ auto bf_slice = bloom_filter_->GetBitSet()->ToSlice();
+ auto data = bf_slice.ReadStringView();
+ PAIMON_ASSIGN_OR_RAISE(int64_t bloom_filter_pos, out_->GetPos());
+ BloomFilterHandle handle(bloom_filter_pos, data.size(),
bloom_filter_->ExpectedEntries());
+
+ PAIMON_RETURN_NOT_OK(WriteBytes(data.data(), data.size()));
+
+ return std::optional<BloomFilterHandle>(handle);
+}
+
+Status SstFileWriter::WriteSlice(const MemorySlice& slice) {
+ auto data = slice.ReadStringView();
+ PAIMON_RETURN_NOT_OK(WriteBytes(data.data(), data.size()));
+ return Status::OK();
+}
+
+Result<BlockHandle> SstFileWriter::FlushBlockWriter(BlockWriter* writer) {
+ PAIMON_ASSIGN_OR_RAISE(MemorySlice memory_slice, writer->Finish());
+
+ auto view = memory_slice.ReadStringView();
+
+ std::shared_ptr<Bytes> buffer;
+ BlockCompressionType compression_type = BlockCompressionType::NONE;
+ if (compressor_.get()) {
+ auto new_size = compressor_->GetMaxCompressedSize(view.size());
+ // 5 bytes for original length
+ buffer = std::make_shared<Bytes>(new_size + 5, pool_.get());
+ PAIMON_ASSIGN_OR_RAISE(int32_t offset, WriteVarLenInt(buffer->data(),
view.size()));
+ PAIMON_ASSIGN_OR_RAISE(int32_t actual_size,
compressor_->Compress(view.data(), view.size(),
+
buffer->data() + offset,
+
buffer->size() - offset));
+ actual_size += offset;
+ // Don't use the compressed data if compressed less than 12.5%,
+ if (static_cast<size_t>(actual_size) < view.size() - (view.size() /
8)) {
+ compression_type = compression_type_;
+ view = std::string_view{buffer->data(),
static_cast<size_t>(actual_size)};
+ }
+ }
+
+ auto crc32c = CRC32C::calculate(view.data(), view.size());
+ auto compression_val =
static_cast<char>(static_cast<int32_t>(compression_type) & 0xFF);
+ crc32c = CRC32C::calculate(&compression_val, 1, crc32c);
+ auto trailer = BlockTrailer(static_cast<int8_t>(compression_type), crc32c);
+ auto trailer_memory_slice = trailer.WriteBlockTrailer(pool_.get());
+ PAIMON_ASSIGN_OR_RAISE(int64_t block_pos, out_->GetPos());
+ BlockHandle block_handle(block_pos, view.size());
+
+ // 1. write data
+ PAIMON_RETURN_NOT_OK(WriteBytes(view.data(), view.size()));
+
+ // 2. write trailer
+ auto trailer_data = trailer_memory_slice.ReadStringView();
+ PAIMON_RETURN_NOT_OK(WriteBytes(trailer_data.data(), trailer_data.size()));
+
+ writer->Reset();
+
+ return block_handle;
+}
+
+Status SstFileWriter::WriteBytes(const char* data, size_t size) {
+ PAIMON_RETURN_NOT_OK(out_->Write(data, size));
+ return Status::OK();
+}
+
+Result<int32_t> SstFileWriter::WriteVarLenInt(char* bytes, int32_t value) {
+ if (value < 0) {
+ return Status::Invalid("negative value: v=" + std::to_string(value));
+ }
+ int32_t i = 0;
+ while ((value & ~0x7F) != 0) {
+ bytes[i++] = (static_cast<char>((value & 0x7F) | 0x80));
+ value >>= 7;
+ }
+ bytes[i++] = static_cast<char>(value);
+ return i;
+}
+} // namespace paimon
diff --git a/src/paimon/common/sst/sst_file_writer.h
b/src/paimon/common/sst/sst_file_writer.h
new file mode 100644
index 0000000..15ffd1a
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_writer.h
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+
+#include "paimon/common/compression/block_compression_factory.h"
+#include "paimon/common/sst/block_handle.h"
+#include "paimon/common/sst/block_trailer.h"
+#include "paimon/common/sst/block_writer.h"
+#include "paimon/common/sst/bloom_filter_handle.h"
+#include "paimon/common/utils/bit_set.h"
+#include "paimon/common/utils/bloom_filter.h"
+#include "paimon/common/utils/murmurhash_utils.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class Array;
+} // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+
+/// The writer for writing SST Files. SST Files are row-oriented and designed
to serve frequent
+/// point queries and range queries by key.
+class PAIMON_EXPORT SstFileWriter {
+ public:
+ SstFileWriter(const std::shared_ptr<OutputStream>& out,
+ const std::shared_ptr<BloomFilter>& bloom_filter, int32_t
block_size,
+ const std::shared_ptr<BlockCompressionFactory>& factory,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ ~SstFileWriter() = default;
+
+ Status Write(std::shared_ptr<Bytes>&& key, std::shared_ptr<Bytes>&& value);
+
+ Status Flush();
+
+ Result<BlockHandle> WriteIndexBlock();
+
+ // When bloom-filter is disabled, return nullptr.
+ Result<std::optional<BloomFilterHandle>> WriteBloomFilter();
+
+ Status WriteSlice(const MemorySlice& slice);
+
+ private:
+ Result<BlockHandle> FlushBlockWriter(BlockWriter* writer);
+
+ Status WriteBytes(const char* data, size_t size);
+
+ Result<int32_t> WriteVarLenInt(char* bytes, int32_t value);
+
+ // api for testing
+ BlockWriter* IndexWriter() const {
+ return index_block_writer_.get();
+ }
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+
+ std::shared_ptr<OutputStream> out_;
+ std::shared_ptr<BloomFilter> bloom_filter_;
+
+ BlockCompressionType compression_type_;
+ std::shared_ptr<BlockCompressor> compressor_;
+
+ std::shared_ptr<Bytes> last_key_;
+
+ int32_t block_size_;
+ std::unique_ptr<BlockWriter> data_block_writer_;
+ std::unique_ptr<BlockWriter> index_block_writer_;
+};
+} // namespace paimon