This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new abd2615  feat: Migrate SST block infrastructure and file reader/writer 
(#44)
abd2615 is described below

commit abd261542b0301d7da786e57acafb9efbe2e7274
Author: lxy <[email protected]>
AuthorDate: Thu Jun 4 09:45:45 2026 +0800

    feat: Migrate SST block infrastructure and file reader/writer (#44)
---
 src/paimon/common/sst/block_aligned_type.h  |  46 ++++
 src/paimon/common/sst/block_cache.h         | 112 +++++++++
 src/paimon/common/sst/block_cache_test.cpp  | 356 ++++++++++++++++++++++++++++
 src/paimon/common/sst/block_entry.h         |  32 +++
 src/paimon/common/sst/block_handle.cpp      |  56 +++++
 src/paimon/common/sst/block_handle.h        |  53 +++++
 src/paimon/common/sst/block_iterator.cpp    | 103 ++++++++
 src/paimon/common/sst/block_iterator.h      |  56 +++++
 src/paimon/common/sst/block_reader.cpp      |  58 +++++
 src/paimon/common/sst/block_reader.h        |  90 +++++++
 src/paimon/common/sst/block_trailer.cpp     |  51 ++++
 src/paimon/common/sst/block_trailer.h       |  53 +++++
 src/paimon/common/sst/block_writer.cpp      |  70 ++++++
 src/paimon/common/sst/block_writer.h        |  92 +++++++
 src/paimon/common/sst/bloom_filter_handle.h |  63 +++++
 src/paimon/common/sst/sst_file_io_test.cpp  | 305 ++++++++++++++++++++++++
 src/paimon/common/sst/sst_file_reader.cpp   | 195 +++++++++++++++
 src/paimon/common/sst/sst_file_reader.h     |  99 ++++++++
 src/paimon/common/sst/sst_file_utils.h      |  44 ++++
 src/paimon/common/sst/sst_file_writer.cpp   | 152 ++++++++++++
 src/paimon/common/sst/sst_file_writer.h     |  91 +++++++
 21 files changed, 2177 insertions(+)

diff --git a/src/paimon/common/sst/block_aligned_type.h 
b/src/paimon/common/sst/block_aligned_type.h
new file mode 100644
index 0000000..0c4e7d4
--- /dev/null
+++ b/src/paimon/common/sst/block_aligned_type.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// Enumeration for stream seek origin positions.
+enum class PAIMON_EXPORT BlockAlignedType { ALIGNED = 0, UNALIGNED = 1 };
+
+inline Result<BlockAlignedType> From(int8_t v) {
+    if (v == 0) {
+        return BlockAlignedType::ALIGNED;
+    } else if (v == 1) {
+        return BlockAlignedType::UNALIGNED;
+    } else {
+        return Status::Invalid("Invalid block aligned type: " + 
std::to_string(v));
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_cache.h 
b/src/paimon/common/sst/block_cache.h
new file mode 100644
index 0000000..4fd8bb2
--- /dev/null
+++ b/src/paimon/common/sst/block_cache.h
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+
+#include "paimon/common/io/cache/cache_manager.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+namespace paimon {
+
+class PAIMON_EXPORT BlockCache {
+ public:
+    BlockCache(const std::string& file_path, const 
std::shared_ptr<InputStream>& in,
+               const std::shared_ptr<CacheManager>& cache_manager,
+               const std::shared_ptr<MemoryPool>& pool)
+        : pool_(pool), file_path_(file_path), in_(in), 
cache_manager_(cache_manager) {}
+
+    ~BlockCache() {
+        Close();
+    }
+
+    Result<MemorySegment> GetBlock(
+        int64_t position, int32_t length, bool is_index,
+        std::function<Result<MemorySegment>(const MemorySegment&)> 
decompress_func) {
+        auto key = CacheKey::ForPosition(file_path_, position, length, 
is_index);
+        auto it = blocks_.find(key);
+        if (it == blocks_.end() || it->second.GetAccessCount() == 
CacheManager::REFRESH_COUNT) {
+            PAIMON_ASSIGN_OR_RAISE(
+                MemorySegment segment,
+                cache_manager_->GetPage(
+                    key,
+                    [&](const std::shared_ptr<paimon::CacheKey>&) -> 
Result<MemorySegment> {
+                        PAIMON_ASSIGN_OR_RAISE(MemorySegment compress_data,
+                                               ReadFrom(position, length));
+                        if (!decompress_func) {
+                            return compress_data;
+                        }
+                        return decompress_func(compress_data);
+                    },
+                    [this](const std::shared_ptr<CacheKey>& evicted_key) {
+                        blocks_.erase(evicted_key);
+                    }));
+            auto container = CacheManager::SegmentContainer(segment);
+            const auto& result_segment = container.Access();
+            blocks_.insert_or_assign(key, container);
+            return result_segment;
+        }
+        return it->second.Access();
+    }
+
+    /// Returns the number of entries in the local blocks_ cache.
+    size_t BlocksSize() const {
+        return blocks_.size();
+    }
+
+    /// Returns true if the local blocks_ cache contains an entry for the 
given position/length.
+    bool ContainsBlock(int64_t position, int32_t length, bool is_index) const {
+        auto key = CacheKey::ForPosition(file_path_, position, length, 
is_index);
+        return blocks_.find(key) != blocks_.end();
+    }
+
+    void Close() {
+        // Snapshot blocks_ to avoid iterator invalidation from `InvalidPage` 
callback.
+        auto copied_blocks = blocks_;
+        for (const auto& [key, _] : copied_blocks) {
+            cache_manager_->InvalidPage(key);
+        }
+        // Some entries may remain in blocks_ if they were already evicted 
from the
+        // LRU cache (InvalidPage is a no-op for missing keys), so clear 
explicitly.
+        blocks_.clear();
+    }
+
+ private:
+    Result<MemorySegment> ReadFrom(int64_t offset, int32_t length) {
+        PAIMON_RETURN_NOT_OK(in_->Seek(offset, SeekOrigin::FS_SEEK_SET));
+        auto segment = MemorySegment::AllocateHeapMemory(length, pool_.get());
+        PAIMON_RETURN_NOT_OK(in_->Read(segment.MutableData(), length));
+        return segment;
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+    std::string file_path_;
+    std::shared_ptr<InputStream> in_;
+
+    std::shared_ptr<CacheManager> cache_manager_;
+    std::unordered_map<std::shared_ptr<CacheKey>, 
CacheManager::SegmentContainer, CacheKeyHash,
+                       CacheKeyEqual>
+        blocks_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_cache_test.cpp 
b/src/paimon/common/sst/block_cache_test.cpp
new file mode 100644
index 0000000..7417d91
--- /dev/null
+++ b/src/paimon/common/sst/block_cache_test.cpp
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_cache.h"
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/io/cache/cache_manager.h"
+#include "paimon/common/io/cache/lru_cache.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+
+class BlockCacheTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        dir_ = UniqueTestDirectory::Create();
+        fs_ = dir_->GetFileSystem();
+        pool_ = GetDefaultPool();
+    }
+
+    void TearDown() override {}
+
+    Status WriteTestFile(const std::string& path, int32_t num_blocks, int32_t 
block_size) const {
+        PAIMON_ASSIGN_OR_RAISE(auto out, fs_->Create(path, false));
+        for (int32_t i = 0; i < num_blocks; i++) {
+            auto segment = MemorySegment::AllocateHeapMemory(block_size, 
pool_.get());
+            std::memset(segment.MutableData(), i & 0xFF, block_size);
+            PAIMON_RETURN_NOT_OK(out->Write(segment.MutableData(), 
block_size));
+        }
+        PAIMON_RETURN_NOT_OK(out->Flush());
+        PAIMON_RETURN_NOT_OK(out->Close());
+        return Status::OK();
+    }
+
+    Result<MemorySegment> GetBlock(int32_t block_id, int32_t block_size, 
BlockCache* block_cache,
+                                   bool is_index = false) const {
+        return block_cache->GetBlock(/*position=*/block_id * block_size, 
/*length=*/block_size,
+                                     is_index,
+                                     /*decompress_func=*/nullptr);
+    }
+
+    bool ContainsBlock(int32_t block_id, int32_t block_size, BlockCache* 
block_cache,
+                       bool is_index = false) const {
+        return block_cache->ContainsBlock(/*position=*/block_id * block_size, 
/*length=*/block_size,
+                                          is_index);
+    }
+
+ private:
+    std::unique_ptr<UniqueTestDirectory> dir_;
+    std::shared_ptr<FileSystem> fs_;
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+/// Verifies that the first GetBlock call reads from IO and subsequent calls 
return from the
+/// local blocks_ cache without re-reading.
+TEST_F(BlockCacheTest, TestBasicCacheHit) {
+    const int32_t block_size = 64;
+    const int32_t num_blocks = 4;
+    auto file_path = dir_->Str() + "/basic_hit.data";
+    ASSERT_OK(WriteTestFile(file_path, num_blocks, block_size));
+
+    auto cache_manager = std::make_shared<CacheManager>(block_size * 
num_blocks * 2, 0.0);
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, 
fs_->Open(file_path));
+    BlockCache block_cache(file_path, in, cache_manager, pool_);
+
+    // Initially blocks_ is empty
+    ASSERT_EQ(block_cache.BlocksSize(), 0);
+
+    // First access: populates both blocks_ and LRU
+    ASSERT_OK_AND_ASSIGN(auto seg1, GetBlock(0, block_size, &block_cache));
+    ASSERT_EQ(seg1.Size(), block_size);
+    ASSERT_EQ(seg1.Get(0), static_cast<char>(0));
+    ASSERT_EQ(block_cache.BlocksSize(), 1);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 1);
+
+    // Second access: returns from blocks_, no new LRU entry
+    ASSERT_OK_AND_ASSIGN(auto seg2, GetBlock(0, block_size, &block_cache));
+    ASSERT_EQ(seg2.Size(), block_size);
+    ASSERT_EQ(block_cache.BlocksSize(), 1);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 1);
+
+    // Load a different block
+    ASSERT_OK_AND_ASSIGN(auto seg3, GetBlock(1, block_size, &block_cache));
+    ASSERT_EQ(seg3.Get(0), static_cast<char>(1));
+    ASSERT_EQ(block_cache.BlocksSize(), 2);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+    ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+}
+
+/// Verifies that when LRU evicts an entry due to capacity pressure, the 
eviction callback
+/// removes the corresponding entry from BlockCache's blocks_ map.
+TEST_F(BlockCacheTest, TestLruEvictionSyncsWithBlocks) {
+    const int32_t block_size = 100;
+    auto file_path = dir_->Str() + "/eviction.data";
+    ASSERT_OK(WriteTestFile(file_path, 5, block_size));
+
+    // Cache can hold at most 2 blocks (200 bytes)
+    auto cache_manager = std::make_shared<CacheManager>(block_size * 2, 0.0);
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, 
fs_->Open(file_path));
+    BlockCache block_cache(file_path, in, cache_manager, pool_);
+
+    // Load block 0 at position 0
+    ASSERT_OK_AND_ASSIGN(auto seg0, GetBlock(0, block_size, &block_cache));
+    ASSERT_EQ(seg0.Get(0), static_cast<char>(0));
+    ASSERT_EQ(block_cache.BlocksSize(), 1);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+
+    // Load block 1 at position block_size
+    ASSERT_OK_AND_ASSIGN(auto seg1, GetBlock(1, block_size, &block_cache));
+    ASSERT_EQ(seg1.Get(0), static_cast<char>(1));
+    ASSERT_EQ(block_cache.BlocksSize(), 2);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+    ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+
+    // Load block 2: evicts block 0 (LRU) from both LRU and blocks_
+    ASSERT_OK_AND_ASSIGN(auto seg2, GetBlock(2, block_size, &block_cache));
+    ASSERT_EQ(seg2.Get(0), static_cast<char>(2));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+    ASSERT_EQ(block_cache.BlocksSize(), 2);
+    // block 0 should be evicted from blocks_
+    ASSERT_FALSE(ContainsBlock(0, block_size, &block_cache));
+    // block 1 and block 2 should remain
+    ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+    ASSERT_TRUE(ContainsBlock(2, block_size, &block_cache));
+
+    // Re-access block 0: triggers fresh IO read, evicts block 1 (now LRU)
+    ASSERT_OK_AND_ASSIGN(auto seg0_reloaded, GetBlock(0, block_size, 
&block_cache));
+    ASSERT_EQ(seg0_reloaded.Get(0), static_cast<char>(0));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+    ASSERT_EQ(block_cache.BlocksSize(), 2);
+    // block 1 should now be evicted
+    ASSERT_FALSE(ContainsBlock(1, block_size, &block_cache));
+    // block 0 and block 2 should remain
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+    ASSERT_TRUE(ContainsBlock(2, block_size, &block_cache));
+}
+
+/// Verifies that Close() invalidates all entries from both blocks_ and the 
LRU cache.
+TEST_F(BlockCacheTest, TestClose) {
+    const int32_t block_size = 64;
+    auto file_path = dir_->Str() + "/close.data";
+    ASSERT_OK(WriteTestFile(file_path, 3, block_size));
+
+    auto cache_manager = std::make_shared<CacheManager>(block_size * 10, 0.0);
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, 
fs_->Open(file_path));
+    BlockCache block_cache(file_path, in, cache_manager, pool_);
+
+    // Load 3 blocks and verify blocks_ keys
+    for (int32_t i = 0; i < 3; i++) {
+        ASSERT_OK_AND_ASSIGN(auto seg, GetBlock(i, block_size, &block_cache));
+        ASSERT_EQ(seg.Get(0), static_cast<char>(i));
+    }
+    ASSERT_EQ(block_cache.BlocksSize(), 3);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+    ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+    ASSERT_TRUE(ContainsBlock(2, block_size, &block_cache));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 3);
+
+    block_cache.Close();
+
+    // After Close, both blocks_ and LRU should be empty
+    ASSERT_EQ(block_cache.BlocksSize(), 0);
+    ASSERT_FALSE(ContainsBlock(0, block_size, &block_cache));
+    ASSERT_FALSE(ContainsBlock(1, block_size, &block_cache));
+    ASSERT_FALSE(ContainsBlock(2, block_size, &block_cache));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 0);
+}
+
+/// Verifies that two BlockCache instances sharing the same CacheManager have 
independent blocks_
+/// maps, but eviction in the shared LRU only affects the owning BlockCache's 
blocks_.
+TEST_F(BlockCacheTest, TestSharedCacheManagerEvictionIsolation) {
+    const int32_t block_size = 100;
+    auto file_path_a = dir_->Str() + "/file_a.data";
+    auto file_path_b = dir_->Str() + "/file_b.data";
+    ASSERT_OK(WriteTestFile(file_path_a, 3, block_size));
+    ASSERT_OK(WriteTestFile(file_path_b, 3, block_size));
+
+    // Shared cache can hold 3 blocks total
+    auto cache_manager = std::make_shared<CacheManager>(block_size * 3, 0.0);
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in_a, 
fs_->Open(file_path_a));
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in_b, 
fs_->Open(file_path_b));
+    BlockCache cache_a(file_path_a, in_a, cache_manager, pool_);
+    BlockCache cache_b(file_path_b, in_b, cache_manager, pool_);
+
+    // Load 2 blocks from file_a
+    ASSERT_OK_AND_ASSIGN(auto seg_a0, GetBlock(0, block_size, &cache_a));
+    ASSERT_OK_AND_ASSIGN(auto seg_a1, GetBlock(1, block_size, &cache_a));
+    ASSERT_EQ(cache_a.BlocksSize(), 2);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &cache_a));
+    ASSERT_TRUE(ContainsBlock(1, block_size, &cache_a));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+
+    // Load 1 block from file_b (total 3, at capacity)
+    ASSERT_OK_AND_ASSIGN(auto seg_b0, GetBlock(0, block_size, &cache_b));
+    ASSERT_EQ(cache_b.BlocksSize(), 1);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &cache_b));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 3);
+
+    // Load another block from file_b: should evict file_a's block 0 (the LRU 
entry)
+    ASSERT_OK_AND_ASSIGN(auto seg_b1, GetBlock(1, block_size, &cache_b));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 3);
+
+    // cache_b should have 2 entries
+    ASSERT_EQ(cache_b.BlocksSize(), 2);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &cache_b));
+    ASSERT_TRUE(ContainsBlock(1, block_size, &cache_b));
+
+    // cache_a's block 0 was evicted by LRU callback, only block 1 remains
+    ASSERT_EQ(cache_a.BlocksSize(), 1);
+    ASSERT_FALSE(ContainsBlock(0, block_size, &cache_a));
+    ASSERT_TRUE(ContainsBlock(1, block_size, &cache_a));
+
+    // Re-access file_a's block 0: triggers fresh IO read, evicts file_a's 
block 1 (now LRU)
+    ASSERT_OK_AND_ASSIGN(auto seg_a0_reloaded, GetBlock(0, block_size, 
&cache_a));
+    ASSERT_EQ(seg_a0_reloaded.Get(0), static_cast<char>(0));
+    ASSERT_EQ(cache_a.BlocksSize(), 1);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &cache_a));
+    ASSERT_FALSE(ContainsBlock(1, block_size, &cache_a));
+
+    // cache_b should be unaffected
+    ASSERT_EQ(cache_b.BlocksSize(), 2);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &cache_b));
+    ASSERT_TRUE(ContainsBlock(1, block_size, &cache_b));
+
+    cache_a.Close();
+    cache_b.Close();
+}
+
+/// Verifies the REFRESH_COUNT mechanism interacts correctly with LRU eviction 
ordering.
+/// After refreshing a block (re-inserting into LRU front), it should not be 
the first to be
+/// evicted when capacity pressure occurs.
+TEST_F(BlockCacheTest, TestRefreshPreventsEviction) {
+    const int32_t block_size = 100;
+    auto file_path = dir_->Str() + "/refresh_eviction.data";
+    ASSERT_OK(WriteTestFile(file_path, 4, block_size));
+
+    // Cache can hold 2 blocks
+    auto cache_manager = std::make_shared<CacheManager>(block_size * 2, 0.0);
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, 
fs_->Open(file_path));
+    BlockCache block_cache(file_path, in, cache_manager, pool_);
+
+    // Load block 0 and block 1
+    ASSERT_OK_AND_ASSIGN(auto seg0, GetBlock(0, block_size, &block_cache));
+    ASSERT_OK_AND_ASSIGN(auto seg1, GetBlock(1, block_size, &block_cache));
+    ASSERT_EQ(block_cache.BlocksSize(), 2);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+    ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+
+    // Access block 0 REFRESH_COUNT times to trigger a refresh (moves it to 
LRU front)
+    for (int32_t i = 1; i < CacheManager::REFRESH_COUNT; i++) {
+        ASSERT_OK_AND_ASSIGN(seg0, GetBlock(0, block_size, &block_cache));
+    }
+    // This 11th access triggers refresh, moving block 0 to LRU front
+    ASSERT_OK_AND_ASSIGN(seg0, GetBlock(0, block_size, &block_cache));
+
+    // blocks_ should still have both entries after refresh
+    ASSERT_EQ(block_cache.BlocksSize(), 2);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+    ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache));
+
+    // Load block 2: should evict block 1 (not block 0, since block 0 was just 
refreshed)
+    ASSERT_OK_AND_ASSIGN(auto seg2, GetBlock(2, block_size, &block_cache));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 2);
+    ASSERT_EQ(block_cache.BlocksSize(), 2);
+    // block 0 should still be in blocks_ (was refreshed to LRU front)
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache));
+    // block 1 should be evicted from blocks_
+    ASSERT_FALSE(ContainsBlock(1, block_size, &block_cache));
+    // block 2 should be in blocks_
+    ASSERT_TRUE(ContainsBlock(2, block_size, &block_cache));
+
+    // Block 0 should still be accessible from blocks_ cache
+    ASSERT_OK_AND_ASSIGN(seg0, GetBlock(0, block_size, &block_cache));
+    ASSERT_EQ(seg0.Get(0), static_cast<char>(0));
+
+    // Block 1 was evicted, re-accessing triggers IO read
+    ASSERT_OK_AND_ASSIGN(seg1, GetBlock(1, block_size, &block_cache));
+    ASSERT_EQ(seg1.Get(0), static_cast<char>(1));
+}
+
+TEST_F(BlockCacheTest, TestIndexAndDataCache) {
+    const int32_t block_size = 100;
+    const int32_t num_blocks = 6;
+    auto file_path = dir_->Str() + "/index_and_data.data";
+    ASSERT_OK(WriteTestFile(file_path, num_blocks, block_size));
+
+    // index cache max weight = 100
+    // data cache max weight = 300
+    auto cache_manager = std::make_shared<CacheManager>(block_size * 4, 0.25);
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, 
fs_->Open(file_path));
+    BlockCache block_cache(file_path, in, cache_manager, pool_);
+
+    // First access for seg0, index
+    ASSERT_OK_AND_ASSIGN(auto seg0, GetBlock(0, block_size, &block_cache, 
/*is_index=*/true));
+    ASSERT_EQ(seg0.Get(0), static_cast<char>(0));
+    ASSERT_EQ(block_cache.BlocksSize(), 1);
+    ASSERT_TRUE(ContainsBlock(0, block_size, &block_cache, /*is_index=*/true));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 0);
+    ASSERT_EQ(cache_manager->IndexCache()->Size(), 1);
+
+    // Second access for seg1, index, seg0 will be evicted
+    ASSERT_OK_AND_ASSIGN(auto seg1, GetBlock(1, block_size, &block_cache, 
/*is_index=*/true));
+    ASSERT_EQ(seg1.Get(0), static_cast<char>(1));
+    ASSERT_EQ(block_cache.BlocksSize(), 1);
+    ASSERT_TRUE(ContainsBlock(1, block_size, &block_cache, /*is_index=*/true));
+    ASSERT_FALSE(ContainsBlock(0, block_size, &block_cache, 
/*is_index=*/true));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 0);
+    ASSERT_EQ(cache_manager->IndexCache()->Size(), 1);
+
+    // Fills data cache
+    for (int32_t i = 2; i < 5; i++) {
+        ASSERT_OK_AND_ASSIGN(auto seg, GetBlock(i, block_size, &block_cache, 
/*is_index=*/false));
+        ASSERT_EQ(seg.Get(0), static_cast<char>(i));
+        ASSERT_EQ(block_cache.BlocksSize(), 1 + i - 1);
+        ASSERT_TRUE(ContainsBlock(i, block_size, &block_cache, 
/*is_index=*/false));
+        ASSERT_EQ(cache_manager->IndexCache()->Size(), 1);
+        ASSERT_EQ(cache_manager->DataCache()->Size(), i - 1);
+    }
+
+    ASSERT_OK_AND_ASSIGN(auto seg5, GetBlock(5, block_size, &block_cache, 
/*is_index=*/false));
+    ASSERT_EQ(seg5.Get(0), static_cast<char>(5));
+    ASSERT_EQ(block_cache.BlocksSize(), 4);
+    ASSERT_TRUE(ContainsBlock(5, block_size, &block_cache, 
/*is_index=*/false));
+    ASSERT_TRUE(ContainsBlock(4, block_size, &block_cache, 
/*is_index=*/false));
+    ASSERT_TRUE(ContainsBlock(3, block_size, &block_cache, 
/*is_index=*/false));
+    ASSERT_FALSE(ContainsBlock(2, block_size, &block_cache, 
/*is_index=*/false));
+    ASSERT_EQ(cache_manager->DataCache()->Size(), 3);
+    ASSERT_EQ(cache_manager->IndexCache()->Size(), 1);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/sst/block_entry.h 
b/src/paimon/common/sst/block_entry.h
new file mode 100644
index 0000000..d7eb231
--- /dev/null
+++ b/src/paimon/common/sst/block_entry.h
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+struct BlockEntry {
+    BlockEntry(const MemorySlice& _key, const MemorySlice& _value) : 
key(_key), value(_value) {}
+
+    MemorySlice key;
+    MemorySlice value;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_handle.cpp 
b/src/paimon/common/sst/block_handle.cpp
new file mode 100644
index 0000000..6b02259
--- /dev/null
+++ b/src/paimon/common/sst/block_handle.cpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_handle.h"
+
+#include "paimon/common/memory/memory_slice_output.h"
+
+namespace paimon {
+
+Result<BlockHandle> BlockHandle::ReadBlockHandle(MemorySliceInput* input) {
+    PAIMON_ASSIGN_OR_RAISE(int64_t offset, input->ReadVarLenLong());
+    PAIMON_ASSIGN_OR_RAISE(int32_t size, input->ReadVarLenInt());
+    return BlockHandle(offset, size);
+}
+
+BlockHandle::BlockHandle(int64_t offset, int32_t size) : offset_(offset), 
size_(size) {}
+
+int64_t BlockHandle::Offset() const {
+    return offset_;
+}
+
+int32_t BlockHandle::Size() const {
+    return size_;
+}
+
+int32_t BlockHandle::GetFullBlockSize() const {
+    return size_ + MAX_ENCODED_LENGTH;
+}
+
+std::string BlockHandle::ToString() const {
+    return "BlockHandle{offset=" + std::to_string(offset_) + ", size=" + 
std::to_string(size_) +
+           "}";
+}
+
+Result<MemorySlice> BlockHandle::WriteBlockHandle(MemoryPool* pool) {
+    MemorySliceOutput output(MAX_ENCODED_LENGTH, pool);
+    PAIMON_RETURN_NOT_OK(output.WriteVarLenLong(offset_));
+    PAIMON_RETURN_NOT_OK(output.WriteVarLenInt(size_));
+    return output.ToSlice();
+}
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_handle.h 
b/src/paimon/common/sst/block_handle.h
new file mode 100644
index 0000000..c00a5af
--- /dev/null
+++ b/src/paimon/common/sst/block_handle.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+class PAIMON_EXPORT BlockHandle {
+ public:
+    static Result<BlockHandle> ReadBlockHandle(MemorySliceInput* input);
+
+ public:
+    BlockHandle(int64_t offset, int32_t size);
+    ~BlockHandle() = default;
+
+    int64_t Offset() const;
+    int32_t Size() const;
+    int32_t GetFullBlockSize() const;
+
+    std::string ToString() const;
+    Result<MemorySlice> WriteBlockHandle(MemoryPool* pool);
+
+ public:
+    // max len for varlong is 9 bytes, max len for varint is 5 bytes
+    static constexpr int32_t MAX_ENCODED_LENGTH = 9 + 5;
+
+ private:
+    int64_t offset_;
+    int32_t size_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_iterator.cpp 
b/src/paimon/common/sst/block_iterator.cpp
new file mode 100644
index 0000000..55d0ab9
--- /dev/null
+++ b/src/paimon/common/sst/block_iterator.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_iterator.h"
+
+#include "paimon/common/sst/block_reader.h"
+
+namespace paimon {
+BlockIterator::BlockIterator(const std::shared_ptr<BlockReader>& reader)
+    : input_(reader->BlockInput()), reader_(reader) {}
+
+bool BlockIterator::HasNext() const {
+    return polled_position_ >= 0 || input_.IsReadable();
+}
+
+Result<BlockEntry> BlockIterator::Next() {
+    if (!HasNext()) {
+        return Status::Invalid("no such element");
+    }
+    if (polled_position_ >= 0) {
+        PAIMON_RETURN_NOT_OK(input_.SetPosition(polled_position_));
+        polled_position_ = -1;
+        return ReadEntry();
+    }
+    return ReadEntry();
+}
+
+Result<BlockEntry> BlockIterator::ReadEntry() {
+    PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt());
+    auto key = input_.ReadSliceView(key_length);
+    PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt());
+    auto value = input_.ReadSliceView(value_length);
+    return BlockEntry(key, value);
+}
+
+Result<MemorySlice> BlockIterator::SkipKeyAndReadValue() {
+    if (polled_position_ >= 0) {
+        PAIMON_RETURN_NOT_OK(input_.SetPosition(polled_position_));
+        polled_position_ = -1;
+    }
+    PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt());
+    PAIMON_RETURN_NOT_OK(input_.SetPosition(input_.Position() + key_length));
+    PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt());
+    return input_.ReadSliceView(value_length);
+}
+
+Result<MemorySlice> BlockIterator::ReadKeyAndSkipValue() {
+    PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt());
+    auto key = input_.ReadSliceView(key_length);
+    PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt());
+    PAIMON_RETURN_NOT_OK(input_.SetPosition(input_.Position() + value_length));
+    return key;
+}
+
+Result<bool> BlockIterator::SeekTo(const MemorySlice& target_key) {
+    int32_t left = 0;
+    int32_t right = reader_->RecordCount() - 1;
+    polled_position_ = -1;
+
+    while (left <= right) {
+        int32_t mid = left + (right - left) / 2;
+
+        int32_t entry_position = reader_->SeekTo(mid);
+        PAIMON_RETURN_NOT_OK(input_.SetPosition(entry_position));
+
+        PAIMON_ASSIGN_OR_RAISE(MemorySlice mid_key, ReadKeyAndSkipValue());
+        PAIMON_ASSIGN_OR_RAISE(int32_t compare, reader_->Comparator()(mid_key, 
target_key));
+
+        if (compare == 0) {
+            polled_position_ = entry_position;
+            return true;
+        } else if (compare > 0) {
+            // mid_key > target_key, this could be the first key >= target_key
+            polled_position_ = entry_position;
+            right = mid - 1;
+        } else {
+            // mid_key < target_key, need to look at larger keys
+            // Don't reset polled_position_ here - keep the last position 
where key > target
+            left = mid + 1;
+        }
+    }
+
+    // If we exit the loop without finding exact match, polled_position_ 
points to
+    // the first entry with key >= target_key (if any), or -1 if all keys < 
target_key
+    return false;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_iterator.h 
b/src/paimon/common/sst/block_iterator.h
new file mode 100644
index 0000000..bfc8970
--- /dev/null
+++ b/src/paimon/common/sst/block_iterator.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/common/sst/block_entry.h"
+
+namespace paimon {
+class BlockReader;
+
+class PAIMON_EXPORT BlockIterator {
+ public:
+    explicit BlockIterator(const std::shared_ptr<BlockReader>& reader);
+
+    bool HasNext() const;
+
+    Result<BlockEntry> Next();
+
+    /// Read only the value MemorySlice from the current position, skipping 
the key.
+    /// Used in fast-path iteration where no key comparison is needed.
+    /// @note Public function, conceptually similar to `Next()` but optimized 
to skip key parsing.
+    Result<MemorySlice> SkipKeyAndReadValue();
+
+    Result<bool> SeekTo(const MemorySlice& target_key);
+
+ private:
+    /// Read only the key MemorySlice from the current position, skipping the 
value.
+    /// @note Inner function, only called by `SeekTo`.
+    Result<MemorySlice> ReadKeyAndSkipValue();
+
+    Result<BlockEntry> ReadEntry();
+
+    MemorySliceInput input_;
+    /// Position of the entry that should be returned by Next() after SeekTo.
+    /// -1 means no pending entry.
+    int32_t polled_position_ = -1;
+    std::shared_ptr<BlockReader> reader_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_reader.cpp 
b/src/paimon/common/sst/block_reader.cpp
new file mode 100644
index 0000000..c280be0
--- /dev/null
+++ b/src/paimon/common/sst/block_reader.cpp
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_reader.h"
+
+#include "paimon/common/sst/block_trailer.h"
+namespace paimon {
+
+Result<std::shared_ptr<BlockReader>> BlockReader::Create(const MemorySlice& 
block,
+                                                         
MemorySlice::SliceComparator comparator) {
+    PAIMON_ASSIGN_OR_RAISE(BlockAlignedType type, 
From(block.ReadByte(block.Length() - 1)));
+    const auto trailer_len = BlockTrailer::ENCODED_LENGTH;
+    int32_t size = block.ReadInt(block.Length() - trailer_len);
+    if (type == BlockAlignedType::ALIGNED) {
+        auto data = block.Slice(0, block.Length() - trailer_len);
+        return std::make_shared<AlignedBlockReader>(data, size, 
std::move(comparator));
+    } else {
+        int32_t index_length = size * 4;
+        int32_t index_offset = block.Length() - trailer_len - index_length;
+        auto data = block.Slice(0, index_offset);
+        auto index = block.Slice(index_offset, index_length);
+        return std::make_shared<UnAlignedBlockReader>(data, index, 
std::move(comparator));
+    }
+}
+
+std::unique_ptr<BlockIterator> BlockReader::Iterator() {
+    std::shared_ptr<BlockReader> ptr = shared_from_this();
+    return std::make_unique<BlockIterator>(ptr);
+}
+
+MemorySliceInput BlockReader::BlockInput() {
+    return block_.ToInput();
+}
+
+int32_t BlockReader::RecordCount() const {
+    return record_count_;
+}
+
+const MemorySlice::SliceComparator& BlockReader::Comparator() const {
+    return comparator_;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_reader.h 
b/src/paimon/common/sst/block_reader.h
new file mode 100644
index 0000000..e8c7ffe
--- /dev/null
+++ b/src/paimon/common/sst/block_reader.h
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/common/sst/block_aligned_type.h"
+#include "paimon/common/sst/block_iterator.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+namespace paimon {
+class BlockIterator;
+
+/// Reader for a block.
+class PAIMON_EXPORT BlockReader : public 
std::enable_shared_from_this<BlockReader> {
+ public:
+    virtual ~BlockReader() = default;
+
+    static Result<std::shared_ptr<BlockReader>> Create(const MemorySlice& 
block,
+                                                       
MemorySlice::SliceComparator comparator);
+
+    virtual int32_t SeekTo(int32_t record_position) = 0;
+
+    int32_t RecordCount() const;
+    const MemorySlice::SliceComparator& Comparator() const;
+
+    std::unique_ptr<BlockIterator> Iterator();
+    MemorySliceInput BlockInput();
+
+ protected:
+    BlockReader(const MemorySlice& block, int32_t record_count,
+                MemorySlice::SliceComparator comparator)
+        : block_(block), comparator_(std::move(comparator)), 
record_count_(record_count) {}
+
+ private:
+    MemorySlice block_;
+    MemorySlice::SliceComparator comparator_;
+    int32_t record_count_;
+};
+
+class AlignedBlockReader : public BlockReader {
+ public:
+    AlignedBlockReader(const MemorySlice& block, int32_t record_size,
+                       MemorySlice::SliceComparator comparator)
+        : BlockReader(block, block.Length() / record_size, 
std::move(comparator)),
+          record_size_(record_size) {}
+
+    int32_t SeekTo(int32_t record_position) override {
+        return record_size_ * record_position;
+    }
+
+ private:
+    int32_t record_size_;
+};
+
+class UnAlignedBlockReader : public BlockReader {
+ public:
+    UnAlignedBlockReader(const MemorySlice& data, const MemorySlice& index,
+                         MemorySlice::SliceComparator comparator)
+        : BlockReader(data, index.Length() / 4, std::move(comparator)), 
index_(index) {}
+
+    int32_t SeekTo(int32_t record_position) override {
+        return index_.ReadInt(record_position * 4);
+    }
+
+ private:
+    MemorySlice index_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_trailer.cpp 
b/src/paimon/common/sst/block_trailer.cpp
new file mode 100644
index 0000000..5649102
--- /dev/null
+++ b/src/paimon/common/sst/block_trailer.cpp
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_trailer.h"
+
+#include "fmt/format.h"
+#include "paimon/common/memory/memory_slice_output.h"
+
+namespace paimon {
+
+std::unique_ptr<BlockTrailer> BlockTrailer::ReadBlockTrailer(MemorySliceInput* 
input) {
+    auto compress = input->ReadUnsignedByte();
+    auto crc32c = input->ReadInt();
+    return std::make_unique<BlockTrailer>(compress, crc32c);
+}
+
+int32_t BlockTrailer::Crc32c() const {
+    return crc32c_;
+}
+
+int8_t BlockTrailer::CompressionType() const {
+    return compression_type_;
+}
+
+std::string BlockTrailer::ToString() const {
+    return fmt::format("BlockTrailer{{compression_type={}, crc32c_={:#x}}}",
+                       std::to_string(compression_type_), 
static_cast<uint32_t>(crc32c_));
+}
+
+MemorySlice BlockTrailer::WriteBlockTrailer(MemoryPool* pool) {
+    MemorySliceOutput output(ENCODED_LENGTH, pool);
+    output.WriteValue(compression_type_);
+    output.WriteValue(crc32c_);
+    return output.ToSlice();
+}
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_trailer.h 
b/src/paimon/common/sst/block_trailer.h
new file mode 100644
index 0000000..a4c8aa5
--- /dev/null
+++ b/src/paimon/common/sst/block_trailer.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Trailer of a block.
+class PAIMON_EXPORT BlockTrailer {
+ public:
+    static std::unique_ptr<BlockTrailer> ReadBlockTrailer(MemorySliceInput* 
input);
+
+ public:
+    BlockTrailer(int8_t compression_type, int32_t crc32c)
+        : crc32c_(crc32c), compression_type_(compression_type) {}
+
+    ~BlockTrailer() = default;
+
+    int32_t Crc32c() const;
+    int8_t CompressionType() const;
+
+    std::string ToString() const;
+    MemorySlice WriteBlockTrailer(MemoryPool* pool);
+
+ public:
+    static constexpr int32_t ENCODED_LENGTH = 5;
+
+ private:
+    int32_t crc32c_;
+    int8_t compression_type_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_writer.cpp 
b/src/paimon/common/sst/block_writer.cpp
new file mode 100644
index 0000000..4be50fe
--- /dev/null
+++ b/src/paimon/common/sst/block_writer.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/block_writer.h"
+
+#include "paimon/common/sst/block_aligned_type.h"
+
+namespace paimon {
+
+Status BlockWriter::Write(std::shared_ptr<Bytes>& key, std::shared_ptr<Bytes>& 
value) {
+    int32_t start_position = block_->Size();
+    PAIMON_RETURN_NOT_OK(block_->WriteVarLenInt(key->size()));
+    block_->WriteBytes(key);
+    PAIMON_RETURN_NOT_OK(block_->WriteVarLenInt(value->size()));
+    block_->WriteBytes(value);
+    int32_t end_position = block_->Size();
+    positions_.push_back(start_position);
+    if (aligned_) {
+        int32_t current_size = end_position - start_position;
+        if (aligned_size_ == 0) {
+            aligned_size_ = current_size;
+        } else {
+            aligned_ = aligned_size_ == current_size;
+        }
+    }
+    return Status::OK();
+}
+
+void BlockWriter::Reset() {
+    positions_.clear();
+    block_ = std::make_shared<MemorySliceOutput>(size_, pool_.get());
+    aligned_size_ = 0;
+    aligned_ = true;
+}
+
+Result<MemorySlice> BlockWriter::Finish() {
+    if (positions_.size() == 0) {
+        // Do not use alignment mode, as it is impossible to calculate how 
many records are
+        // inside when reading
+        aligned_ = false;
+    }
+    if (aligned_) {
+        block_->WriteValue(aligned_size_);
+    } else {
+        for (const auto& position : positions_) {
+            block_->WriteValue(position);
+        }
+        block_->WriteValue(static_cast<int32_t>(positions_.size()));
+    }
+    block_->WriteValue(aligned_ ? static_cast<char>(BlockAlignedType::ALIGNED)
+                                : 
static_cast<char>(BlockAlignedType::UNALIGNED));
+    return block_->ToSlice();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/sst/block_writer.h 
b/src/paimon/common/sst/block_writer.h
new file mode 100644
index 0000000..8e9d407
--- /dev/null
+++ b/src/paimon/common/sst/block_writer.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/memory/memory_slice_output.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+///
+/// Writer to build a Block. A block is designed for storing and 
random-accessing k-v pairs. The
+/// layout is as below:
+///
+/// <pre>
+///     +---------------+
+///     | Block Trailer |
+///     +------------------------------------------------+
+///     |       Block CRC32C      |     Compression      |
+///     +------------------------------------------------+
+///     +---------------+
+///     |  Block Data   |
+///     +---------------+--------------------------------+----+
+///     | key len | key bytes | value len | value bytes  |    |
+///     +------------------------------------------------+    |
+///     | key len | key bytes | value len | value bytes  |    +-> Key-Value 
pairs
+///     +------------------------------------------------+    |
+///     |                  ... ...                       |    |
+///     +------------------------------------------------+----+
+///     | entry pos | entry pos |     ...    | entry pos |    +-> optional, 
for unaligned block
+///     +------------------------------------------------+----+
+///     |   entry num  /  entry size   |   aligned type  |
+///     +------------------------------------------------+
+/// </pre>
+///
+class PAIMON_EXPORT BlockWriter {
+ public:
+    BlockWriter(int32_t size, const std::shared_ptr<MemoryPool>& pool, bool 
aligned = true)
+        : size_(size), pool_(pool), aligned_(aligned) {
+        block_ = std::make_shared<MemorySliceOutput>(size, pool_.get());
+        aligned_size_ = 0;
+    }
+
+    ~BlockWriter() = default;
+
+    Status Write(std::shared_ptr<Bytes>& key, std::shared_ptr<Bytes>& value);
+
+    void Reset();
+
+    int32_t Size() const {
+        return positions_.size();
+    }
+
+    int32_t Memory() const {
+        int32_t memory = block_->Size() + 5;
+        if (!aligned_) {
+            memory += positions_.size() * 4;
+        }
+        return memory;
+    }
+
+    Result<MemorySlice> Finish();
+
+ private:
+    const int32_t size_;
+    const std::shared_ptr<MemoryPool> pool_;
+
+    std::vector<int32_t> positions_;
+    std::shared_ptr<MemorySliceOutput> block_;
+    bool aligned_;
+    int32_t aligned_size_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/sst/bloom_filter_handle.h 
b/src/paimon/common/sst/bloom_filter_handle.h
new file mode 100644
index 0000000..9796bcb
--- /dev/null
+++ b/src/paimon/common/sst/bloom_filter_handle.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+class PAIMON_EXPORT BloomFilterHandle {
+ public:
+    BloomFilterHandle(int64_t offset, int32_t size, int64_t expected_entries)
+        : offset_(offset), size_(size), expected_entries_(expected_entries) {}
+
+    BloomFilterHandle() = default;
+    ~BloomFilterHandle() = default;
+
+    int64_t Offset() const {
+        return offset_;
+    }
+
+    int32_t Size() const {
+        return size_;
+    }
+
+    int64_t ExpectedEntries() const {
+        return expected_entries_;
+    }
+
+    std::string ToString() const {
+        return "BloomFilterHandle{offset=" + std::to_string(offset_) +
+               ", size=" + std::to_string(size_) +
+               ", expected_entries=" + std::to_string(expected_entries_) + "}";
+    }
+
+ public:
+    static constexpr int32_t MAX_ENCODED_LENGTH = 9 + 5;
+
+ private:
+    int64_t offset_;
+    int32_t size_;
+    int64_t expected_entries_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/sst/sst_file_io_test.cpp 
b/src/paimon/common/sst/sst_file_io_test.cpp
new file mode 100644
index 0000000..25053e1
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_io_test.cpp
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdint>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/factories/io_hook.h"
+#include "paimon/common/lookup/sort/sort_lookup_store_footer.h"
+#include "paimon/common/sst/sst_file_reader.h"
+#include "paimon/common/sst/sst_file_writer.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/defs.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/io_exception_helper.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon {
+class Predicate;
+}  // namespace paimon
+
+namespace paimon::test {
+struct SstFileParam {
+    std::string file_path;
+    BlockCompressionType type;
+};
+
+class SstFileIOTest : public ::testing::TestWithParam<SstFileParam> {
+ public:
+    void SetUp() override {
+        dir_ = paimon::test::UniqueTestDirectory::Create();
+        fs_ = dir_->GetFileSystem();
+        pool_ = GetDefaultPool();
+        comparator_ = [](const MemorySlice& a, const MemorySlice& b) -> 
Result<int32_t> {
+            std::string_view va = a.ReadStringView();
+            std::string_view vb = b.ReadStringView();
+            if (va == vb) {
+                return 0;
+            }
+            return va > vb ? 1 : -1;
+        };
+    }
+
+    void TearDown() override {
+        ASSERT_OK(fs_->Delete(dir_->Str()));
+    }
+
+ protected:
+    std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+    std::shared_ptr<paimon::FileSystem> fs_;
+    std::shared_ptr<paimon::MemoryPool> pool_;
+
+    MemorySlice::SliceComparator comparator_;
+    std::shared_ptr<CacheManager> cache_manager_ = 
std::make_shared<CacheManager>(1024 * 1024, 0.0);
+};
+
+TEST_P(SstFileIOTest, TestSimple) {
+    auto param = GetParam();
+    auto index_path = dir_->Str() + "/sst_file_test.data";
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<BlockCompressionFactory> factory,
+                         BlockCompressionFactory::Create(param.type));
+
+    // write content
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                         fs_->Create(index_path, /*overwrite=*/false));
+
+    // write data
+    auto bf = BloomFilter::Create(30, 0.01);
+    auto seg_for_bf = MemorySegment::AllocateHeapMemory(bf->ByteLength(), 
pool_.get());
+    ASSERT_OK(bf->SetMemorySegment(seg_for_bf));
+    auto writer = std::make_shared<SstFileWriter>(out, bf, 50, factory, pool_);
+    std::set<int32_t> value_hash;
+    // k1-k5
+    for (size_t i = 1; i <= 5; i++) {
+        std::string key = "k" + std::to_string(i);
+        std::string value = std::to_string(i);
+        ASSERT_OK(writer->Write(std::make_shared<Bytes>(key, pool_.get()),
+                                std::make_shared<Bytes>(value, pool_.get())));
+        auto bytes = std::make_shared<Bytes>(key, pool_.get());
+        value_hash.insert(MurmurHashUtils::HashBytes(bytes));
+    }
+    // k910-k920
+    for (size_t i = 10; i <= 20; i++) {
+        std::string key = "k9" + std::to_string(i);
+        std::string value = "looooooooooong-值-" + std::to_string(i);
+        ASSERT_OK(writer->Write(std::make_shared<Bytes>(key, pool_.get()),
+                                std::make_shared<Bytes>(value, pool_.get())));
+        auto bytes = std::make_shared<Bytes>(key, pool_.get());
+        value_hash.insert(MurmurHashUtils::HashBytes(bytes));
+    }
+    ASSERT_OK(writer->Flush());
+
+    ASSERT_EQ(6, writer->IndexWriter()->Size());
+
+    ASSERT_OK_AND_ASSIGN(auto bloom_filter_handle, writer->WriteBloomFilter());
+    ASSERT_OK_AND_ASSIGN(auto index_block_handle, writer->WriteIndexBlock());
+    SortLookupStoreFooter footer(index_block_handle, bloom_filter_handle);
+    auto slice = footer.WriteSortLookupStoreFooter(pool_.get());
+    ASSERT_OK(writer->WriteSlice(slice));
+
+    ASSERT_OK(out->Flush());
+    ASSERT_OK(out->Close());
+
+    // bloom filter test
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, 
fs_->Open(index_path));
+    auto entries = bloom_filter_handle->ExpectedEntries();
+    auto offset = bloom_filter_handle->Offset();
+    auto size = bloom_filter_handle->Size();
+    ASSERT_OK(in->Seek(offset, SeekOrigin::FS_SEEK_SET));
+    auto bloom_filer_bytes = Bytes::AllocateBytes(size, pool_.get());
+    ASSERT_OK(in->Read(bloom_filer_bytes->data(), bloom_filer_bytes->size()));
+    auto seg = MemorySegment::Wrap(std::move(bloom_filer_bytes));
+    auto bloom_filter = std::make_shared<BloomFilter>(entries, size);
+    ASSERT_OK(bloom_filter->SetMemorySegment(seg));
+    for (const auto& value : value_hash) {
+        ASSERT_TRUE(bloom_filter->TestHash(value));
+    }
+
+    // test read
+    ASSERT_OK_AND_ASSIGN(in, fs_->Open(index_path));
+    auto block_cache = std::make_shared<BlockCache>(index_path, in, 
cache_manager_, pool_);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader, SstFileReader::CreateForSortLookupStore(in, comparator_, 
block_cache, pool_));
+
+    // not exist key
+    std::string k0 = "k0";
+    ASSERT_FALSE(reader->Lookup(std::make_shared<Bytes>(k0, 
pool_.get())).value());
+
+    // k4
+    std::string k4 = "k4";
+    ASSERT_OK_AND_ASSIGN(auto v4, reader->Lookup(std::make_shared<Bytes>(k4, 
pool_.get())));
+    ASSERT_TRUE(v4);
+    std::string string4{v4->data(), v4->size()};
+    ASSERT_EQ("4", string4);
+
+    // not exist key
+    std::string k55 = "k55";
+    ASSERT_FALSE(reader->Lookup(std::make_shared<Bytes>(k55, 
pool_.get())).value());
+
+    // k915
+    std::string k915 = "k915";
+    ASSERT_OK_AND_ASSIGN(auto v15, 
reader->Lookup(std::make_shared<Bytes>(k915, pool_.get())));
+    ASSERT_TRUE(v15);
+    std::string string15{v15->data(), v15->size()};
+    ASSERT_EQ("looooooooooong-值-15", string15);
+}
+
+TEST_P(SstFileIOTest, TestJavaCompatibility) {
+    auto param = GetParam();
+
+    // key range [1_000_000, 2_000_000], value is equal to the key
+    std::string file = GetDataDir() + "/sst/" + param.file_path;
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, fs_->Open(file));
+
+    // test read
+    auto block_cache = std::make_shared<BlockCache>(file, in, cache_manager_, 
pool_);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader, SstFileReader::CreateForSortLookupStore(in, comparator_, 
block_cache, pool_));
+    // not exist key
+    std::string k0 = "10000";
+    ASSERT_FALSE(reader->Lookup(std::make_shared<Bytes>(k0, 
pool_.get())).value());
+
+    // k1314520
+    std::string k1314520 = "1314520";
+    ASSERT_OK_AND_ASSIGN(auto v1314520,
+                         reader->Lookup(std::make_shared<Bytes>(k1314520, 
pool_.get())));
+    ASSERT_TRUE(v1314520);
+    std::string string1314520{v1314520->data(), v1314520->size()};
+    ASSERT_EQ("1314520", string1314520);
+
+    // not exist key
+    std::string k13145200 = "13145200";
+    ASSERT_FALSE(reader->Lookup(std::make_shared<Bytes>(k13145200, 
pool_.get())).value());
+
+    std::string k1314521 = "1314521";
+    ASSERT_OK_AND_ASSIGN(auto v1314521,
+                         reader->Lookup(std::make_shared<Bytes>(k1314521, 
pool_.get())));
+    ASSERT_TRUE(v1314521);
+    std::string string1314521{v1314521->data(), v1314521->size()};
+    ASSERT_EQ("1314521", string1314521);
+
+    std::string k1999999 = "1999999";
+    ASSERT_OK_AND_ASSIGN(auto v1999999,
+                         reader->Lookup(std::make_shared<Bytes>(k1999999, 
pool_.get())));
+    ASSERT_TRUE(v1999999);
+    std::string string1999999{v1999999->data(), v1999999->size()};
+    ASSERT_EQ("1999999", string1999999);
+}
+
+TEST_F(SstFileIOTest, TestIOException) {
+    bool run_complete = false;
+    auto io_hook = paimon::IOHook::GetInstance();
+    for (size_t i = 0; i < 200; i++) {
+        auto test_dir = paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(test_dir);
+        paimon::ScopeGuard guard([&io_hook]() { io_hook->Clear(); });
+        io_hook->Reset(i, paimon::IOHook::Mode::RETURN_ERROR);
+
+        auto index_path = test_dir->Str() + "/sst_io_exception_test.data";
+
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<BlockCompressionFactory> factory,
+                             
BlockCompressionFactory::Create(BlockCompressionType::ZSTD));
+
+        // write
+        auto out_result = fs_->Create(index_path, /*overwrite=*/false);
+        CHECK_HOOK_STATUS(out_result.status(), i);
+        std::shared_ptr<OutputStream> out = std::move(out_result).value();
+
+        auto bf = BloomFilter::Create(30, 0.01);
+        MemorySegment seg_for_bf = 
MemorySegment::AllocateHeapMemory(bf->ByteLength(), pool_.get());
+        ASSERT_OK(bf->SetMemorySegment(seg_for_bf));
+        auto writer = std::make_shared<SstFileWriter>(out, bf, 50, factory, 
pool_);
+
+        bool write_failed = false;
+        for (size_t j = 1; j <= 5; j++) {
+            std::string key = "k" + std::to_string(j);
+            std::string value = std::to_string(j);
+            auto write_status = writer->Write(std::make_shared<Bytes>(key, 
pool_.get()),
+                                              std::make_shared<Bytes>(value, 
pool_.get()));
+            if (!write_status.ok()) {
+                CHECK_HOOK_STATUS(write_status, i);
+                write_failed = true;
+                break;
+            }
+        }
+        if (write_failed) {
+            continue;
+        }
+
+        CHECK_HOOK_STATUS(writer->Flush(), i);
+
+        auto bloom_filter_handle_result = writer->WriteBloomFilter();
+        CHECK_HOOK_STATUS(bloom_filter_handle_result.status(), i);
+        auto index_block_handle_result = writer->WriteIndexBlock();
+        CHECK_HOOK_STATUS(index_block_handle_result.status(), i);
+        SortLookupStoreFooter test_footer(index_block_handle_result.value(),
+                                          bloom_filter_handle_result.value());
+        auto test_slice = test_footer.WriteSortLookupStoreFooter(pool_.get());
+        CHECK_HOOK_STATUS(writer->WriteSlice(test_slice), i);
+
+        CHECK_HOOK_STATUS(out->Flush(), i);
+        CHECK_HOOK_STATUS(out->Close(), i);
+
+        // read
+        auto in_result = fs_->Open(index_path);
+        CHECK_HOOK_STATUS(in_result.status(), i);
+        std::shared_ptr<InputStream> in = std::move(in_result).value();
+
+        auto block_cache = std::make_shared<BlockCache>(index_path, in, 
cache_manager_, pool_);
+        auto reader_result =
+            SstFileReader::CreateForSortLookupStore(in, comparator_, 
block_cache, pool_);
+        CHECK_HOOK_STATUS(reader_result.status(), i);
+        std::shared_ptr<SstFileReader> reader = 
std::move(reader_result).value();
+
+        std::string k4 = "k4";
+        auto v4_result = reader->Lookup(std::make_shared<Bytes>(k4, 
pool_.get()));
+        CHECK_HOOK_STATUS(v4_result.status(), i);
+        ASSERT_TRUE(v4_result.value());
+        std::string string4{v4_result.value()->data(), 
v4_result.value()->size()};
+        ASSERT_EQ("4", string4);
+
+        ASSERT_OK(reader->Close());
+        run_complete = true;
+        break;
+    }
+    ASSERT_TRUE(run_complete);
+}
+
+INSTANTIATE_TEST_SUITE_P(Group, SstFileIOTest,
+                         
::testing::Values(SstFileParam{"none/79d01717-8380-4504-86e1-387e6c058d0a",
+                                                        
BlockCompressionType::NONE},
+                                           
SstFileParam{"zstd/83d05c53-2353-4160-b756-d50dd851b474",
+                                                        
BlockCompressionType::ZSTD},
+                                           
SstFileParam{"lz4/10540951-41d3-4216-aa2c-b15dfd25eb75",
+                                                        
BlockCompressionType::LZ4}));
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/sst/sst_file_reader.cpp 
b/src/paimon/common/sst/sst_file_reader.cpp
new file mode 100644
index 0000000..9639bcd
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_reader.cpp
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/sst/sst_file_reader.h"
+
+#include "fmt/format.h"
+#include "paimon/common/lookup/sort/sort_lookup_store_footer.h"
+#include "paimon/common/sst/sst_file_utils.h"
+#include "paimon/common/utils/crc32c.h"
+#include "paimon/common/utils/murmurhash_utils.h"
+namespace paimon {
+
+Result<std::shared_ptr<SstFileReader>> SstFileReader::Create(
+    const BlockHandle& index_block_handle,
+    const std::optional<BloomFilterHandle>& bloom_filter_handle,
+    MemorySlice::SliceComparator comparator, const 
std::shared_ptr<BlockCache>& block_cache,
+    const std::shared_ptr<MemoryPool>& pool) {
+    // read bloom filter directly now
+    std::shared_ptr<BloomFilter> bloom_filter = nullptr;
+    if (bloom_filter_handle.has_value() &&
+        (bloom_filter_handle->ExpectedEntries() || bloom_filter_handle->Size() 
||
+         bloom_filter_handle->Offset())) {
+        bloom_filter = 
std::make_shared<BloomFilter>(bloom_filter_handle->ExpectedEntries(),
+                                                     
bloom_filter_handle->Size());
+        PAIMON_ASSIGN_OR_RAISE(
+            MemorySegment bloom_filter_data,
+            block_cache->GetBlock(bloom_filter_handle->Offset(), 
bloom_filter_handle->Size(),
+                                  /*is_index=*/true, 
/*decompress_func=*/nullptr));
+        
PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(bloom_filter_data));
+    }
+
+    // create index block reader
+    PAIMON_ASSIGN_OR_RAISE(
+        MemorySegment trailer_data,
+        block_cache->GetBlock(index_block_handle.Offset() + 
index_block_handle.Size(),
+                              BlockTrailer::ENCODED_LENGTH, /*is_index=*/true,
+                              /*decompress_func=*/nullptr));
+    auto trailer_slice = MemorySlice::Wrap(trailer_data);
+    auto trailer_input = trailer_slice.ToInput();
+    std::shared_ptr<BlockTrailer> trailer = 
BlockTrailer::ReadBlockTrailer(&trailer_input);
+    PAIMON_ASSIGN_OR_RAISE(
+        MemorySegment block_data,
+        block_cache->GetBlock(index_block_handle.Offset(), 
index_block_handle.Size(), true,
+                              [pool, trailer](const MemorySegment& seg) -> 
Result<MemorySegment> {
+                                  return DecompressBlock(seg, trailer, pool);
+                              }));
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BlockReader> reader,
+                           BlockReader::Create(MemorySlice::Wrap(block_data), 
comparator));
+    return std::shared_ptr<SstFileReader>(
+        new SstFileReader(pool, block_cache, bloom_filter, reader, 
comparator));
+}
+
+Result<std::shared_ptr<SstFileReader>> SstFileReader::CreateForSortLookupStore(
+    const std::shared_ptr<InputStream>& in, MemorySlice::SliceComparator 
comparator,
+    const std::shared_ptr<BlockCache>& block_cache, const 
std::shared_ptr<MemoryPool>& pool) {
+    PAIMON_ASSIGN_OR_RAISE(uint64_t file_len, in->Length());
+    PAIMON_RETURN_NOT_OK(
+        in->Seek(file_len - SortLookupStoreFooter::ENCODED_LENGTH, 
SeekOrigin::FS_SEEK_SET));
+    auto footer_bytes = 
Bytes::AllocateBytes(SortLookupStoreFooter::ENCODED_LENGTH, pool.get());
+    PAIMON_RETURN_NOT_OK(in->Read(footer_bytes->data(), footer_bytes->size()));
+    auto footer_segment = MemorySegment::Wrap(std::move(footer_bytes));
+    auto footer_slice = MemorySlice::Wrap(footer_segment);
+    auto footer_input = footer_slice.ToInput();
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<SortLookupStoreFooter> read_footer,
+                           
SortLookupStoreFooter::ReadSortLookupStoreFooter(&footer_input));
+    return SstFileReader::Create(read_footer->GetIndexBlockHandle(),
+                                 read_footer->GetBloomFilterHandle(), 
std::move(comparator),
+                                 block_cache, pool);
+}
+
+SstFileReader::SstFileReader(const std::shared_ptr<MemoryPool>& pool,
+                             const std::shared_ptr<BlockCache>& block_cache,
+                             const std::shared_ptr<BloomFilter>& bloom_filter,
+                             const std::shared_ptr<BlockReader>& 
index_block_reader,
+                             MemorySlice::SliceComparator comparator)
+    : pool_(pool),
+      block_cache_(block_cache),
+      bloom_filter_(bloom_filter),
+      index_block_reader_(index_block_reader),
+      comparator_(std::move(comparator)) {}
+
+std::unique_ptr<BlockIterator> SstFileReader::CreateIndexIterator() {
+    return index_block_reader_->Iterator();
+}
+
+Result<std::shared_ptr<Bytes>> SstFileReader::Lookup(const 
std::shared_ptr<Bytes>& key) {
+    if (bloom_filter_.get() && 
!bloom_filter_->TestHash(MurmurHashUtils::HashBytes(key))) {
+        return std::shared_ptr<Bytes>();
+    }
+    auto key_slice = MemorySlice::Wrap(key);
+    // seek the index to the block containing the key
+    auto index_block_iterator = index_block_reader_->Iterator();
+    PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool _, 
index_block_iterator->SeekTo(key_slice));
+    // if indexIterator does not have a next, it means the key does not exist 
in this iterator
+    if (index_block_iterator->HasNext()) {
+        // seek the current iterator to the key
+        PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<BlockIterator> current,
+                               GetNextBlock(index_block_iterator));
+        PAIMON_ASSIGN_OR_RAISE(bool success, current->SeekTo(key_slice));
+        if (success) {
+            PAIMON_ASSIGN_OR_RAISE(BlockEntry ret, current->Next());
+            return ret.value.CopyBytes(pool_.get());
+        }
+    }
+    return std::shared_ptr<Bytes>();
+}
+
+Result<std::unique_ptr<BlockIterator>> SstFileReader::GetNextBlock(
+    std::unique_ptr<BlockIterator>& index_iterator) {
+    PAIMON_ASSIGN_OR_RAISE(BlockEntry block_entry, index_iterator->Next());
+    auto block_input = block_entry.value.ToInput();
+    PAIMON_ASSIGN_OR_RAISE(BlockHandle block_handle, 
BlockHandle::ReadBlockHandle(&block_input));
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BlockReader> reader, 
ReadBlock(block_handle, false));
+    return reader->Iterator();
+}
+
+Result<std::shared_ptr<BlockReader>> SstFileReader::ReadBlock(const 
BlockHandle& handle,
+                                                              bool index) {
+    PAIMON_ASSIGN_OR_RAISE(
+        MemorySegment trailer_data,
+        block_cache_->GetBlock(handle.Offset() + handle.Size(), 
BlockTrailer::ENCODED_LENGTH,
+                               /*is_index=*/true, 
/*decompress_func=*/nullptr));
+    auto trailer_slice = MemorySlice::Wrap(trailer_data);
+    auto trailer_input = trailer_slice.ToInput();
+    std::shared_ptr<paimon::BlockTrailer> trailer = 
BlockTrailer::ReadBlockTrailer(&trailer_input);
+    PAIMON_ASSIGN_OR_RAISE(
+        MemorySegment block_data,
+        block_cache_->GetBlock(handle.Offset(), handle.Size(), index,
+                               [this, trailer](const MemorySegment& seg) -> 
Result<MemorySegment> {
+                                   return DecompressBlock(seg, trailer, pool_);
+                               }));
+    return BlockReader::Create(MemorySlice::Wrap(block_data), comparator_);
+}
+
+Result<MemorySegment> SstFileReader::DecompressBlock(const MemorySegment& 
compressed_data,
+                                                     const 
std::shared_ptr<BlockTrailer>& trailer,
+                                                     const 
std::shared_ptr<MemoryPool>& pool) {
+    // check crc32c
+    auto crc32c_code = CRC32C::calculate(compressed_data.Data(), 
compressed_data.Size());
+    auto compression_val =
+        static_cast<char>(static_cast<int32_t>(trailer->CompressionType()) & 
0xFF);
+    crc32c_code = CRC32C::calculate(&compression_val, 1, crc32c_code);
+    if (trailer->Crc32c() != static_cast<int32_t>(crc32c_code)) {
+        return Status::Invalid(fmt::format("Expected crc32c({:#x}) but found 
crc32c({:#x})",
+                                           
static_cast<uint32_t>(trailer->Crc32c()), crc32c_code));
+    }
+
+    // decompress data
+    PAIMON_ASSIGN_OR_RAISE(BlockCompressionType compress_type,
+                           SstFileUtils::From(trailer->CompressionType()));
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BlockCompressionFactory> factory,
+                           BlockCompressionFactory::Create(compress_type));
+    if (!factory || factory->GetCompressionType() == 
BlockCompressionType::NONE) {
+        return compressed_data;
+    } else {
+        auto decompressor = factory->GetDecompressor();
+        auto slice = MemorySlice::Wrap(compressed_data);
+        auto input = slice.ToInput();
+        PAIMON_ASSIGN_OR_RAISE(int32_t uncompressed_size, 
input.ReadVarLenInt());
+        auto output = MemorySegment::AllocateHeapMemory(uncompressed_size, 
pool.get());
+
+        PAIMON_ASSIGN_OR_RAISE(
+            int32_t actual_uncompressed_size,
+            decompressor->Decompress(compressed_data.Data() + 
input.Position(), input.Available(),
+                                     output.MutableData(), output.Size()));
+        if (actual_uncompressed_size != output.Size()) {
+            return Status::Invalid(fmt::format(
+                "Invalid data: expect uncompressed size {}, actual 
uncompressed size {}",
+                output.Size(), actual_uncompressed_size));
+        }
+        return output;
+    }
+}
+
+Status SstFileReader::Close() {
+    // TODO(xinyu.lxy): support close FileBasedBloomFilter
+    block_cache_->Close();
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/sst/sst_file_reader.h 
b/src/paimon/common/sst/sst_file_reader.h
new file mode 100644
index 0000000..c977468
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_reader.h
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/compression/block_compression_factory.h"
+#include "paimon/common/sst/block_cache.h"
+#include "paimon/common/sst/block_handle.h"
+#include "paimon/common/sst/block_iterator.h"
+#include "paimon/common/sst/block_reader.h"
+#include "paimon/common/sst/block_trailer.h"
+#include "paimon/common/sst/bloom_filter_handle.h"
+#include "paimon/common/utils/bit_set.h"
+#include "paimon/common/utils/bloom_filter.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class SstFileIterator;
+
+/// An SST File Reader which serves point queries and range queries. Users can 
call
+/// CreateIterator() to create a file iterator and then use seek and read 
methods to do range
+/// queries. Note that this class is NOT thread-safe.
+class PAIMON_EXPORT SstFileReader {
+ public:
+    static Result<std::shared_ptr<SstFileReader>> Create(
+        const BlockHandle& index_block_handle,
+        const std::optional<BloomFilterHandle>& bloom_filter_handle,
+        MemorySlice::SliceComparator comparator, const 
std::shared_ptr<BlockCache>& block_cache,
+        const std::shared_ptr<MemoryPool>& pool);
+
+    /// Create an SstFileReader by reading the SortLookupStoreFooter from the 
given InputStream.
+    /// This method encapsulates the common pattern of reading the footer, 
parsing it, and
+    /// creating the reader, which avoids code duplication across callers.
+    static Result<std::shared_ptr<SstFileReader>> CreateForSortLookupStore(
+        const std::shared_ptr<InputStream>& input, 
MemorySlice::SliceComparator comparator,
+        const std::shared_ptr<BlockCache>& block_cache, const 
std::shared_ptr<MemoryPool>& pool);
+
+    ~SstFileReader() {
+        [[maybe_unused]] Status _ = Close();
+    }
+
+    /// Create an iterator for the index block.
+    std::unique_ptr<BlockIterator> CreateIndexIterator();
+
+    /// Lookup the specified key in the file.
+    ///
+    /// @param key serialized key
+    /// @return corresponding serialized value, nullptr if not found.
+    Result<std::shared_ptr<Bytes>> Lookup(const std::shared_ptr<Bytes>& key);
+
+    Result<std::unique_ptr<BlockIterator>> GetNextBlock(
+        std::unique_ptr<BlockIterator>& index_iterator);
+
+    /// @param handle The block handle.
+    /// @param index Whether read the block as an index.
+    /// @return The reader of the target block.
+    Result<std::shared_ptr<BlockReader>> ReadBlock(const BlockHandle& handle, 
bool index);
+
+    Status Close();
+
+ private:
+    static Result<MemorySegment> DecompressBlock(const MemorySegment& 
compressed_data,
+                                                 const 
std::shared_ptr<BlockTrailer>& trailer,
+                                                 const 
std::shared_ptr<MemoryPool>& pool);
+
+    SstFileReader(const std::shared_ptr<MemoryPool>& pool,
+                  const std::shared_ptr<BlockCache>& block_cache,
+                  const std::shared_ptr<BloomFilter>& bloom_filter,
+                  const std::shared_ptr<BlockReader>& index_block_reader,
+                  MemorySlice::SliceComparator comparator);
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<BlockCache> block_cache_;
+    std::shared_ptr<BloomFilter> bloom_filter_;
+    std::shared_ptr<BlockReader> index_block_reader_;
+    MemorySlice::SliceComparator comparator_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/sst/sst_file_utils.h 
b/src/paimon/common/sst/sst_file_utils.h
new file mode 100644
index 0000000..24c6178
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_utils.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <sstream>
+
+#include "fmt/format.h"
+#include "paimon/common/compression/block_compression_type.h"
+#include "paimon/common/memory/memory_slice.h"
+
+namespace paimon {
+
+/// Utils for sst file.
+class SstFileUtils {
+ public:
+    static Result<BlockCompressionType> From(int8_t v) {
+        if (v == 0) {
+            return BlockCompressionType::NONE;
+        } else if (v == 1) {
+            return BlockCompressionType::ZSTD;
+        } else if (v == 2) {
+            return BlockCompressionType::LZ4;
+        }
+        return Status::Invalid(
+            fmt::format("not support compression type code {}", 
static_cast<int32_t>(v)));
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/sst/sst_file_writer.cpp 
b/src/paimon/common/sst/sst_file_writer.cpp
new file mode 100644
index 0000000..003a386
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_writer.cpp
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/sst/sst_file_writer.h"
+
+#include "paimon/common/sst/sst_file_utils.h"
+#include "paimon/common/utils/crc32c.h"
+#include "paimon/common/utils/murmurhash_utils.h"
+
+namespace paimon {
+SstFileWriter::SstFileWriter(const std::shared_ptr<OutputStream>& out,
+                             const std::shared_ptr<BloomFilter>& bloom_filter, 
int32_t block_size,
+                             const std::shared_ptr<BlockCompressionFactory>& 
factory,
+                             const std::shared_ptr<MemoryPool>& pool)
+    : pool_(pool), out_(out), bloom_filter_(bloom_filter), 
block_size_(block_size) {
+    data_block_writer_ =
+        std::make_unique<BlockWriter>(static_cast<int32_t>(block_size * 1.1), 
pool);
+    index_block_writer_ =
+        std::make_unique<BlockWriter>(BlockHandle::MAX_ENCODED_LENGTH * 1024, 
pool);
+    compression_type_ = factory->GetCompressionType();
+    compressor_ = factory->GetCompressor();
+}
+
+Status SstFileWriter::Write(std::shared_ptr<Bytes>&& key, 
std::shared_ptr<Bytes>&& value) {
+    PAIMON_RETURN_NOT_OK(data_block_writer_->Write(key, value));
+    last_key_ = key;
+    if (data_block_writer_->Memory() > block_size_) {
+        PAIMON_RETURN_NOT_OK(Flush());
+    }
+    if (bloom_filter_) {
+        // Double-check that bloom_filter_ is valid
+        if (!bloom_filter_->GetBitSet()) {
+            return Status::Invalid("Bloom filter bit set is null");
+        }
+        
PAIMON_RETURN_NOT_OK(bloom_filter_->AddHash(MurmurHashUtils::HashBytes(key)));
+    }
+    return Status::OK();
+}
+
+Status SstFileWriter::Flush() {
+    if (data_block_writer_->Size() == 0) {
+        return Status::OK();
+    }
+
+    PAIMON_ASSIGN_OR_RAISE(BlockHandle handle, 
FlushBlockWriter(data_block_writer_.get()));
+
+    PAIMON_ASSIGN_OR_RAISE(MemorySlice slice, 
handle.WriteBlockHandle(pool_.get()));
+    auto value = slice.CopyBytes(pool_.get());
+    PAIMON_RETURN_NOT_OK(index_block_writer_->Write(last_key_, value));
+    return Status::OK();
+}
+
+Result<BlockHandle> SstFileWriter::WriteIndexBlock() {
+    return FlushBlockWriter(index_block_writer_.get());
+}
+
+Result<std::optional<BloomFilterHandle>> SstFileWriter::WriteBloomFilter() {
+    if (!bloom_filter_) {
+        return std::optional<BloomFilterHandle>();
+    }
+    auto bf_slice = bloom_filter_->GetBitSet()->ToSlice();
+    auto data = bf_slice.ReadStringView();
+    PAIMON_ASSIGN_OR_RAISE(int64_t bloom_filter_pos, out_->GetPos());
+    BloomFilterHandle handle(bloom_filter_pos, data.size(), 
bloom_filter_->ExpectedEntries());
+
+    PAIMON_RETURN_NOT_OK(WriteBytes(data.data(), data.size()));
+
+    return std::optional<BloomFilterHandle>(handle);
+}
+
+Status SstFileWriter::WriteSlice(const MemorySlice& slice) {
+    auto data = slice.ReadStringView();
+    PAIMON_RETURN_NOT_OK(WriteBytes(data.data(), data.size()));
+    return Status::OK();
+}
+
+Result<BlockHandle> SstFileWriter::FlushBlockWriter(BlockWriter* writer) {
+    PAIMON_ASSIGN_OR_RAISE(MemorySlice memory_slice, writer->Finish());
+
+    auto view = memory_slice.ReadStringView();
+
+    std::shared_ptr<Bytes> buffer;
+    BlockCompressionType compression_type = BlockCompressionType::NONE;
+    if (compressor_.get()) {
+        auto new_size = compressor_->GetMaxCompressedSize(view.size());
+        // 5 bytes for original length
+        buffer = std::make_shared<Bytes>(new_size + 5, pool_.get());
+        PAIMON_ASSIGN_OR_RAISE(int32_t offset, WriteVarLenInt(buffer->data(), 
view.size()));
+        PAIMON_ASSIGN_OR_RAISE(int32_t actual_size, 
compressor_->Compress(view.data(), view.size(),
+                                                                          
buffer->data() + offset,
+                                                                          
buffer->size() - offset));
+        actual_size += offset;
+        // Don't use the compressed data if compressed less than 12.5%,
+        if (static_cast<size_t>(actual_size) < view.size() - (view.size() / 
8)) {
+            compression_type = compression_type_;
+            view = std::string_view{buffer->data(), 
static_cast<size_t>(actual_size)};
+        }
+    }
+
+    auto crc32c = CRC32C::calculate(view.data(), view.size());
+    auto compression_val = 
static_cast<char>(static_cast<int32_t>(compression_type) & 0xFF);
+    crc32c = CRC32C::calculate(&compression_val, 1, crc32c);
+    auto trailer = BlockTrailer(static_cast<int8_t>(compression_type), crc32c);
+    auto trailer_memory_slice = trailer.WriteBlockTrailer(pool_.get());
+    PAIMON_ASSIGN_OR_RAISE(int64_t block_pos, out_->GetPos());
+    BlockHandle block_handle(block_pos, view.size());
+
+    // 1. write data
+    PAIMON_RETURN_NOT_OK(WriteBytes(view.data(), view.size()));
+
+    // 2. write trailer
+    auto trailer_data = trailer_memory_slice.ReadStringView();
+    PAIMON_RETURN_NOT_OK(WriteBytes(trailer_data.data(), trailer_data.size()));
+
+    writer->Reset();
+
+    return block_handle;
+}
+
+Status SstFileWriter::WriteBytes(const char* data, size_t size) {
+    PAIMON_RETURN_NOT_OK(out_->Write(data, size));
+    return Status::OK();
+}
+
+Result<int32_t> SstFileWriter::WriteVarLenInt(char* bytes, int32_t value) {
+    if (value < 0) {
+        return Status::Invalid("negative value: v=" + std::to_string(value));
+    }
+    int32_t i = 0;
+    while ((value & ~0x7F) != 0) {
+        bytes[i++] = (static_cast<char>((value & 0x7F) | 0x80));
+        value >>= 7;
+    }
+    bytes[i++] = static_cast<char>(value);
+    return i;
+}
+}  // namespace paimon
diff --git a/src/paimon/common/sst/sst_file_writer.h 
b/src/paimon/common/sst/sst_file_writer.h
new file mode 100644
index 0000000..15ffd1a
--- /dev/null
+++ b/src/paimon/common/sst/sst_file_writer.h
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+
+#include "paimon/common/compression/block_compression_factory.h"
+#include "paimon/common/sst/block_handle.h"
+#include "paimon/common/sst/block_trailer.h"
+#include "paimon/common/sst/block_writer.h"
+#include "paimon/common/sst/bloom_filter_handle.h"
+#include "paimon/common/utils/bit_set.h"
+#include "paimon/common/utils/bloom_filter.h"
+#include "paimon/common/utils/murmurhash_utils.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class Array;
+}  // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+
+/// The writer for writing SST Files. SST Files are row-oriented and designed 
to serve frequent
+/// point queries and range queries by key.
+class PAIMON_EXPORT SstFileWriter {
+ public:
+    SstFileWriter(const std::shared_ptr<OutputStream>& out,
+                  const std::shared_ptr<BloomFilter>& bloom_filter, int32_t 
block_size,
+                  const std::shared_ptr<BlockCompressionFactory>& factory,
+                  const std::shared_ptr<MemoryPool>& pool);
+
+    ~SstFileWriter() = default;
+
+    Status Write(std::shared_ptr<Bytes>&& key, std::shared_ptr<Bytes>&& value);
+
+    Status Flush();
+
+    Result<BlockHandle> WriteIndexBlock();
+
+    // When bloom-filter is disabled, return nullptr.
+    Result<std::optional<BloomFilterHandle>> WriteBloomFilter();
+
+    Status WriteSlice(const MemorySlice& slice);
+
+ private:
+    Result<BlockHandle> FlushBlockWriter(BlockWriter* writer);
+
+    Status WriteBytes(const char* data, size_t size);
+
+    Result<int32_t> WriteVarLenInt(char* bytes, int32_t value);
+
+    // api for testing
+    BlockWriter* IndexWriter() const {
+        return index_block_writer_.get();
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+
+    std::shared_ptr<OutputStream> out_;
+    std::shared_ptr<BloomFilter> bloom_filter_;
+
+    BlockCompressionType compression_type_;
+    std::shared_ptr<BlockCompressor> compressor_;
+
+    std::shared_ptr<Bytes> last_key_;
+
+    int32_t block_size_;
+    std::unique_ptr<BlockWriter> data_block_writer_;
+    std::unique_ptr<BlockWriter> index_block_writer_;
+};
+}  // namespace paimon


Reply via email to