This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 348e89d  feat: add BloomFilter, CRC32C, MurmurHash, varint utilities, 
and DeltaVarintCompressor (#37)
348e89d is described below

commit 348e89d7a4b155e6f8c8d12c42c497c7bfcaf325
Author: lxy <[email protected]>
AuthorDate: Wed Jun 3 13:50:08 2026 +0800

    feat: add BloomFilter, CRC32C, MurmurHash, varint utilities, and 
DeltaVarintCompressor (#37)
---
 LICENSE                                            |  10 +
 NOTICE                                             |   3 +
 src/paimon/common/data/data_define_test.cpp        | 320 ++++++++++++++
 src/paimon/common/utils/bloom_filter.cpp           | 106 +++++
 src/paimon/common/utils/bloom_filter.h             |  72 ++++
 src/paimon/common/utils/bloom_filter64.cpp         | 107 +++++
 src/paimon/common/utils/bloom_filter64.h           |  75 ++++
 src/paimon/common/utils/bloom_filter64_test.cpp    |  83 ++++
 src/paimon/common/utils/bloom_filter_test.cpp      | 157 +++++++
 src/paimon/common/utils/crc32c.cpp                 |  66 +++
 src/paimon/common/utils/crc32c.h                   |  46 +++
 src/paimon/common/utils/crc32c_test.cpp            |  43 ++
 .../common/utils/delta_varint_compressor.cpp       | 114 +++++
 src/paimon/common/utils/delta_varint_compressor.h  |  52 +++
 .../common/utils/delta_varint_compressor_test.cpp  | 459 +++++++++++++++++++++
 src/paimon/common/utils/murmurhash_utils.h         |  13 +-
 src/paimon/common/utils/murmurhash_utils_test.cpp  |  13 +-
 src/paimon/common/utils/var_length_int_utils.h     | 139 +++++++
 .../common/utils/var_length_int_utils_test.cpp     | 144 +++++++
 src/paimon/common/utils/xxhash_test.cpp            |  49 +++
 src/paimon/testing/utils/testharness.cpp           | 170 +++++++-
 src/paimon/testing/utils/testharness.h             |  76 +++-
 22 files changed, 2289 insertions(+), 28 deletions(-)

diff --git a/LICENSE b/LICENSE
index 3321b54..103c326 100644
--- a/LICENSE
+++ b/LICENSE
@@ -256,6 +256,16 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 
SUCH DAMAGE.
 
 
--------------------------------------------------------------------------------
 
+This product includes code from LiteRT.
+
+* UniqueTestDirectory utility in src/paimon/testing/utils/testharness.cpp and 
src/paimon/testing/utils/testharness.h
+
+Copyright: 2024 Google LLC.
+Home page: https://ai.google.dev/edge/litert
+License: https://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
 This product includes code from Apache Arrow.
 
 * Core utilities:
diff --git a/NOTICE b/NOTICE
index 303de2a..3be42fb 100644
--- a/NOTICE
+++ b/NOTICE
@@ -17,6 +17,9 @@ This product includes software from RocksDB project (Apache 
2.0 and BSD 3-clause
 Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
 Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 
+This product includes software from LiteRT project (Apache 2.0)
+Copyright 2024 Google LLC.
+
 This product includes software from xxHash project (BSD 2-clause)
 Copyright (C) 2012-2023 Yann Collet
 
diff --git a/src/paimon/common/data/data_define_test.cpp 
b/src/paimon/common/data/data_define_test.cpp
new file mode 100644
index 0000000..2a76837
--- /dev/null
+++ b/src/paimon/common/data/data_define_test.cpp
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/data/data_define.h"
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+// Test case: IsVariantNull should return true for NullType
+TEST(DataDefineTest, IsVariantNullReturnsTrueForNull) {
+    VariantType null_variant = NullType{};
+    ASSERT_TRUE(DataDefine::IsVariantNull(null_variant));
+}
+
+// Test case: IsVariantNull should return false for non-NullType variants
+TEST(DataDefineTest, IsVariantNullReturnsFalseForNonNullTypes) {
+    VariantType non_null_variant = 42;  // Example with int
+    ASSERT_FALSE(DataDefine::IsVariantNull(non_null_variant));
+
+    non_null_variant = true;  // Example with bool
+    ASSERT_FALSE(DataDefine::IsVariantNull(non_null_variant));
+}
+
+// Test case: GetVariantValue should return valid value for matched types
+TEST(DataDefineTest, GetVariantValue) {
+    {
+        VariantType int_variant = 42;  // Variant holding an int
+        const auto int_value = 
DataDefine::GetVariantValue<int32_t>(int_variant);
+        ASSERT_EQ(int_value, 42);
+    }
+    {
+        VariantType bool_variant = true;  // Variant holding a bool
+        const bool bool_value = 
DataDefine::GetVariantValue<bool>(bool_variant);
+        ASSERT_EQ(bool_value, true);
+    }
+    {
+        auto array = std::dynamic_pointer_cast<arrow::StringArray>(
+            arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), 
R"(["abc", "def", "hello"])")
+                .ValueOrDie());
+        VariantType view_variant = array->GetView(2);  // Variant holding a 
StringView
+        const auto view_value = 
DataDefine::GetVariantValue<std::string_view>(view_variant);
+        ASSERT_EQ(std::string(view_value), "hello");
+        ASSERT_EQ(DataDefine::VariantValueToString(view_variant), "hello");
+    }
+}
+
+// Test case: GetVariantValue should return valid value for matching types
+TEST(DataDefineTest, GetVariantValueReturnsPointerForMatchingType) {}
+
+// Test case: VariantValueToString should handle NullType
+TEST(DataDefineTest, VariantValueToStringReturnsStringForNullType) {
+    VariantType null_variant = NullType{};
+    ASSERT_EQ(DataDefine::VariantValueToString(null_variant), "null");
+}
+
+// Test case: VariantValueToString should handle bool
+TEST(DataDefineTest, VariantValueToStringReturnsStringForBool) {
+    VariantType bool_variant = true;
+    ASSERT_EQ(DataDefine::VariantValueToString(bool_variant), "true");
+
+    bool_variant = false;
+    ASSERT_EQ(DataDefine::VariantValueToString(bool_variant), "false");
+}
+
+// Test case: VariantValueToString should handle integer types
+TEST(DataDefineTest, VariantValueToStringReturnsStringForInt) {
+    VariantType int_variant = 42;
+    ASSERT_EQ(DataDefine::VariantValueToString(int_variant), "42");
+}
+
+// Test case: VariantValueToString should handle string data (BinaryString)
+TEST(DataDefineTest, VariantValueToStringReturnsStringForBinaryString) {
+    auto pool = GetDefaultPool();
+    auto binary_str = BinaryString::FromString("Hello, world!", pool.get());
+    VariantType binary_variant = binary_str;
+    ASSERT_EQ(DataDefine::VariantValueToString(binary_variant), "Hello, 
world!");
+}
+
+// Test case: VariantValueToString should handle shared_ptr<Bytes>
+TEST(DataDefineTest, VariantValueToStringReturnsStringForSharedPtrBytes) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<Bytes> bytes_ptr = Bytes::AllocateBytes("abc", pool.get());
+    VariantType bytes_variant = bytes_ptr;
+    ASSERT_EQ(DataDefine::VariantValueToString(bytes_variant), "abc");
+}
+
+// Test case: VariantValueToString should handle Timestamp
+TEST(DataDefineTest, VariantValueToStringReturnsStringForTimestamp) {
+    auto timestamp =
+        Timestamp(/*millisecond=*/1622520000000l, /*nano_of_millisecond=*/0);  
// A timestamp value
+    VariantType timestamp_variant = timestamp;
+    ASSERT_EQ(DataDefine::VariantValueToString(timestamp_variant), 
timestamp.ToString());
+}
+
+// Test case: VariantValueToString should handle Decimal
+TEST(DataDefineTest, VariantValueToStringReturnsStringForDecimal) {
+    Decimal decimal(38, 38, 
DecimalUtils::StrToInt128("12345678998765432145678").value());
+    VariantType decimal_variant = decimal;
+    ASSERT_EQ(DataDefine::VariantValueToString(decimal_variant), 
decimal.ToString());
+}
+
+// Test case: VariantValueToString should handle shared_ptr<InternalRow> 
(mocking with a string)
+TEST(DataDefineTest, VariantValueToStringReturnsStringForInternalRow) {
+    std::shared_ptr<InternalRow> row_ptr = std::make_shared<BinaryRow>(0);
+    VariantType row_variant = row_ptr;
+    ASSERT_EQ(DataDefine::VariantValueToString(row_variant), "row");
+}
+
+// Test case: VariantValueToString should handle shared_ptr<InternalArray> 
(mocking with a string)
+TEST(DataDefineTest, VariantValueToStringReturnsStringForInternalArray) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<InternalArray> array_ptr = std::make_shared<BinaryArray>();
+    VariantType array_variant = array_ptr;
+    ASSERT_EQ(DataDefine::VariantValueToString(array_variant), "array");
+}
+
+// Test case: GetStringView should handle all variant types and edge cases
+TEST(DataDefineTest, GetStringView) {
+    auto pool = GetDefaultPool();
+
+    {
+        // from string_view
+        std::string original = "hello world";
+        VariantType view_variant = std::string_view(original.data(), 
original.size());
+        auto result = DataDefine::GetStringView(view_variant);
+        ASSERT_EQ(result, "hello world");
+        ASSERT_EQ(result.data(), original.data());
+    }
+    {
+        // from shared_ptr<Bytes>
+        std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("test bytes", 
pool.get());
+        VariantType bytes_variant = bytes;
+        auto result = DataDefine::GetStringView(bytes_variant);
+        ASSERT_EQ(std::string(result), "test bytes");
+        ASSERT_EQ(result.size(), 10);
+    }
+    {
+        // from BinaryString
+        auto binary_str = BinaryString::FromString("binary string content", 
pool.get());
+        VariantType binary_variant = binary_str;
+        auto result = DataDefine::GetStringView(binary_variant);
+        ASSERT_EQ(std::string(result), "binary string content");
+    }
+    {
+        // empty string_view
+        VariantType view_variant = std::string_view();
+        auto result = DataDefine::GetStringView(view_variant);
+        ASSERT_TRUE(result.empty());
+        ASSERT_EQ(result.size(), 0);
+    }
+    {
+        // empty Bytes
+        std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("", pool.get());
+        VariantType bytes_variant = bytes;
+        auto result = DataDefine::GetStringView(bytes_variant);
+        ASSERT_TRUE(result.empty());
+        ASSERT_EQ(result.size(), 0);
+    }
+    {
+        // empty BinaryString
+        VariantType binary_variant = BinaryString::EmptyUtf8();
+        auto result = DataDefine::GetStringView(binary_variant);
+        ASSERT_TRUE(result.empty());
+        ASSERT_EQ(result.size(), 0);
+    }
+}
+
+TEST(DataDefineTest, VariantValueToLiteral) {
+    auto pool = GetDefaultPool();
+
+    {
+        // BOOL
+        VariantType value = true;
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::BOOL));
+        ASSERT_EQ(literal.GetValue<bool>(), true);
+    }
+    {
+        // INT8
+        VariantType value = static_cast<char>(42);
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::INT8));
+        ASSERT_EQ(literal.GetValue<int8_t>(), 42);
+    }
+    {
+        // INT16
+        VariantType value = static_cast<int16_t>(1000);
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::INT16));
+        ASSERT_EQ(literal.GetValue<int16_t>(), 1000);
+    }
+    {
+        // INT32
+        VariantType value = static_cast<int32_t>(100000);
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::INT32));
+        ASSERT_EQ(literal.GetValue<int32_t>(), 100000);
+    }
+    {
+        // INT64
+        VariantType value = static_cast<int64_t>(123456789L);
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::INT64));
+        ASSERT_EQ(literal.GetValue<int64_t>(), 123456789L);
+    }
+    {
+        // FLOAT
+        VariantType value = 3.14f;
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::FLOAT));
+        ASSERT_FLOAT_EQ(literal.GetValue<float>(), 3.14f);
+    }
+    {
+        // DOUBLE
+        VariantType value = 2.718;
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::DOUBLE));
+        ASSERT_DOUBLE_EQ(literal.GetValue<double>(), 2.718);
+    }
+    {
+        // STRING from BinaryString
+        auto binary_str = BinaryString::FromString("hello", pool.get());
+        VariantType value = binary_str;
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::STRING));
+        ASSERT_EQ(literal.GetValue<std::string>(), "hello");
+    }
+    {
+        // STRING from shared_ptr<Bytes>
+        std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("world", 
pool.get());
+        VariantType value = bytes;
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::STRING));
+        ASSERT_EQ(literal.GetValue<std::string>(), "world");
+    }
+    {
+        // STRING from string_view
+        std::string original = "view_str";
+        VariantType value = std::string_view(original.data(), original.size());
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::STRING));
+        ASSERT_EQ(literal.GetValue<std::string>(), "view_str");
+    }
+    {
+        // BINARY from shared_ptr<Bytes>
+        std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("binary_data", 
pool.get());
+        VariantType value = bytes;
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::BINARY));
+        ASSERT_EQ(literal.GetValue<std::string>(), "binary_data");
+    }
+    {
+        // BINARY from BinaryString
+        auto binary_str = BinaryString::FromString("bin_str", pool.get());
+        VariantType value = binary_str;
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::BINARY));
+        ASSERT_EQ(literal.GetValue<std::string>(), "bin_str");
+    }
+    {
+        // TIMESTAMP
+        Timestamp ts(12345, 1);
+        VariantType value = ts;
+        ASSERT_OK_AND_ASSIGN(
+            auto literal, DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::TIMESTAMP));
+        ASSERT_EQ(literal.GetValue<Timestamp>(), ts);
+    }
+    {
+        // DECIMAL128
+        Decimal decimal(20, 3, 123456);
+        VariantType value = decimal;
+        ASSERT_OK_AND_ASSIGN(
+            auto literal, DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::DECIMAL128));
+        ASSERT_EQ(literal.GetValue<Decimal>(), decimal);
+    }
+    {
+        // DATE32
+        VariantType value = static_cast<int32_t>(19000);
+        ASSERT_OK_AND_ASSIGN(auto literal,
+                             DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::DATE32));
+        ASSERT_EQ(literal.GetValue<int32_t>(), 19000);
+    }
+    {
+        // unsupported type
+        VariantType value = static_cast<int32_t>(0);
+        ASSERT_NOK_WITH_MSG(DataDefine::VariantValueToLiteral(value, 
arrow::Type::type::LIST),
+                            "Not support arrow type");
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/bloom_filter.cpp 
b/src/paimon/common/utils/bloom_filter.cpp
new file mode 100644
index 0000000..d66420f
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter.cpp
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/bloom_filter.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cmath>
+#include <utility>
+
+namespace paimon {
+
+int32_t BloomFilter::OptimalNumOfBits(int64_t expect_entries, double fpp) {
+    if (expect_entries <= 0 || fpp <= 0.0 || fpp >= 1.0) {
+        return 0;
+    }
+    double result = -static_cast<double>(expect_entries) * log(fpp) / (log(2) 
* log(2));
+    if (result > INT32_MAX) return INT32_MAX;
+    if (result < 0) return 0;
+    return static_cast<int32_t>(result);
+}
+
+int32_t BloomFilter::OptimalNumOfHashFunctions(int64_t expect_entries, int64_t 
bit_size) {
+    if (expect_entries <= 0) {
+        return 1;
+    }
+    double ratio = static_cast<double>(bit_size) / 
static_cast<double>(expect_entries);
+    double result = ratio * std::log(2.0);
+    return std::max(1, static_cast<int32_t>(std::round(result)));
+}
+
+std::shared_ptr<BloomFilter> BloomFilter::Create(int64_t expect_entries, 
double fpp) {
+    auto bytes =
+        
static_cast<int32_t>(ceil(BloomFilter::OptimalNumOfBits(expect_entries, fpp) / 
8.0));
+    return std::make_shared<BloomFilter>(expect_entries, bytes);
+}
+
+BloomFilter::BloomFilter(int64_t expected_entries, int32_t byte_length)
+    : expected_entries_(expected_entries) {
+    num_hash_functions_ = OptimalNumOfHashFunctions(
+        expected_entries, 
static_cast<int64_t>(static_cast<uint64_t>(byte_length) << 3));
+    bit_set_ = std::make_shared<BitSet>(byte_length);
+}
+
+Status BloomFilter::AddHash(int32_t hash1) {
+    auto hash2 = static_cast<int32_t>(static_cast<uint32_t>(hash1) >> 16);
+
+    for (int32_t i = 1; i <= num_hash_functions_; i++) {
+        // Use uint32_t arithmetic to avoid signed overflow UB (matches Java 
int wrap semantics)
+        auto combined_hash =
+            static_cast<int32_t>(static_cast<uint32_t>(hash1) +
+                                 (static_cast<uint32_t>(i) * 
static_cast<uint32_t>(hash2)));
+        // hashcode should be positive, flip all the bits if it's negative
+        if (combined_hash < 0) {
+            combined_hash = ~combined_hash;
+        }
+        int32_t pos = combined_hash % bit_set_->BitSize();
+        PAIMON_RETURN_NOT_OK(bit_set_->Set(pos));
+    }
+    return Status::OK();
+}
+
+bool BloomFilter::TestHash(int32_t hash1) const {
+    auto hash2 = static_cast<int32_t>(static_cast<uint32_t>(hash1) >> 16);
+
+    for (int32_t i = 1; i <= num_hash_functions_; i++) {
+        // Use uint32_t arithmetic to avoid signed overflow UB (matches Java 
int wrap semantics)
+        auto combined_hash =
+            static_cast<int32_t>(static_cast<uint32_t>(hash1) +
+                                 (static_cast<uint32_t>(i) * 
static_cast<uint32_t>(hash2)));
+        // hashcode should be positive, flip all the bits if it's negative
+        if (combined_hash < 0) {
+            combined_hash = ~combined_hash;
+        }
+        int32_t pos = combined_hash % bit_set_->BitSize();
+        if (!bit_set_->Get(pos)) {
+            return false;
+        }
+    }
+    return true;
+}
+
+Status BloomFilter::SetMemorySegment(MemorySegment segment, int32_t offset) {
+    return bit_set_->SetMemorySegment(segment, offset);
+}
+
+void BloomFilter::Reset() {
+    bit_set_->Clear();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/bloom_filter.h 
b/src/paimon/common/utils/bloom_filter.h
new file mode 100644
index 0000000..8f1b327
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+
+#include "paimon/common/utils/bit_set.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// Bloom filter based on MemorySegment.
+class PAIMON_EXPORT BloomFilter {
+ public:
+    static int32_t OptimalNumOfBits(int64_t expect_entries, double fpp);
+    static int32_t OptimalNumOfHashFunctions(int64_t expect_entries, int64_t 
bit_size);
+    static std::shared_ptr<BloomFilter> Create(int64_t expect_entries, double 
fpp);
+
+ public:
+    BloomFilter(int64_t expected_entries, int32_t byte_length);
+
+    int32_t GetNumHashFunctions() const {
+        return num_hash_functions_;
+    }
+
+    int64_t ExpectedEntries() const {
+        return expected_entries_;
+    }
+
+    int64_t ByteLength() const {
+        return bit_set_->ByteLength();
+    }
+
+    std::shared_ptr<BitSet> GetBitSet() const {
+        return bit_set_;
+    }
+
+    Status SetMemorySegment(MemorySegment segment, int32_t offset = 0);
+
+    Status AddHash(int32_t hash1);
+
+    bool TestHash(int32_t hash1) const;
+
+    void Reset();
+
+ private:
+    static constexpr int32_t BYTE_SIZE = 8;
+
+ private:
+    int64_t expected_entries_;
+    int32_t num_hash_functions_ = -1;
+    std::shared_ptr<BitSet> bit_set_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/utils/bloom_filter64.cpp 
b/src/paimon/common/utils/bloom_filter64.cpp
new file mode 100644
index 0000000..f2685a1
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter64.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/bloom_filter64.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cmath>
+#include <utility>
+
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+class MemoryPool;
+
+BloomFilter64::BitSet::BitSet(const std::shared_ptr<Bytes>& bytes, int32_t 
offset)
+    : offset_(offset), bytes_(bytes) {
+    assert(bytes_->size() > 0);
+    assert(offset_ >= 0);
+}
+
+void BloomFilter64::BitSet::Set(int32_t index) {
+    char* data = bytes_->data();
+    data[(static_cast<uint32_t>(index) >> 3) + offset_] |=
+        static_cast<char>(1u << (index & BloomFilter64::BitSet::MASK));
+}
+
+bool BloomFilter64::BitSet::Get(int32_t index) const {
+    const char* data = bytes_->data();
+    return (data[(static_cast<uint32_t>(index) >> 3) + offset_] &
+            static_cast<char>(1u << (index & BloomFilter64::BitSet::MASK))) != 
0;
+}
+
+int32_t BloomFilter64::BitSet::BitSize() const {
+    return (bytes_->size() - offset_) * BloomFilter64::BYTE_SIZE;
+}
+
+BloomFilter64::BloomFilter64(int64_t items, double fpp, const 
std::shared_ptr<MemoryPool>& pool)
+    : pool_(pool) {
+    auto nb = static_cast<int32_t>(-items * std::log(fpp) / (std::log(2) * 
std::log(2)));
+    num_bits_ = nb + (BloomFilter64::BYTE_SIZE - (nb % 
BloomFilter64::BYTE_SIZE));
+    num_hash_functions_ = std::max(
+        1, static_cast<int32_t>(std::round(static_cast<double>(num_bits_) / 
items * std::log(2))));
+    auto bytes = std::make_shared<Bytes>(num_bits_ / BloomFilter64::BYTE_SIZE, 
pool_.get());
+    bit_set_ = std::make_unique<BitSet>(bytes, /*offset=*/0);
+}
+
+BloomFilter64::BloomFilter64(int32_t num_hash_functions, 
std::unique_ptr<BitSet>&& bit_set)
+    : num_bits_(bit_set->BitSize()),
+      num_hash_functions_(num_hash_functions),
+      bit_set_(std::move(bit_set)) {}
+
+void BloomFilter64::AddHash(int64_t hash64) {
+    auto hash1 = static_cast<int32_t>(hash64);
+    auto hash2 = static_cast<int32_t>(static_cast<uint64_t>(hash64) >> 32);
+
+    for (int32_t i = 1; i <= num_hash_functions_; i++) {
+        // Use uint32_t arithmetic to avoid signed overflow UB (matches Java 
int wrap semantics)
+        auto combined_hash =
+            static_cast<int32_t>(static_cast<uint32_t>(hash1) +
+                                 (static_cast<uint32_t>(i) * 
static_cast<uint32_t>(hash2)));
+        // hashcode should be positive, flip all the bits if it's negative
+        if (combined_hash < 0) {
+            combined_hash = ~combined_hash;
+        }
+        int32_t pos = combined_hash % num_bits_;
+        bit_set_->Set(pos);
+    }
+}
+
+bool BloomFilter64::TestHash(int64_t hash64) const {
+    auto hash1 = static_cast<int32_t>(hash64);
+    auto hash2 = static_cast<int32_t>(static_cast<uint64_t>(hash64) >> 32);
+
+    for (int32_t i = 1; i <= num_hash_functions_; i++) {
+        // Use uint32_t arithmetic to avoid signed overflow UB (matches Java 
int wrap semantics)
+        auto combined_hash =
+            static_cast<int32_t>(static_cast<uint32_t>(hash1) +
+                                 (static_cast<uint32_t>(i) * 
static_cast<uint32_t>(hash2)));
+        // hashcode should be positive, flip all the bits if it's negative
+        if (combined_hash < 0) {
+            combined_hash = ~combined_hash;
+        }
+        int32_t pos = combined_hash % num_bits_;
+        if (!bit_set_->Get(pos)) {
+            return false;
+        }
+    }
+    return true;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/bloom_filter64.h 
b/src/paimon/common/utils/bloom_filter64.h
new file mode 100644
index 0000000..3ba4e86
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter64.h
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+
+#include "paimon/memory/bytes.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Bytes;
+class MemoryPool;
+
+/// Bloom filter 64 handle 64 bits hash.
+class PAIMON_EXPORT BloomFilter64 {
+ public:
+    BloomFilter64(int64_t items, double fpp, const 
std::shared_ptr<MemoryPool>& pool);
+    class BitSet;
+
+    BloomFilter64(int32_t num_hash_functions, std::unique_ptr<BitSet>&& 
bit_set);
+
+    void AddHash(int64_t hash64);
+
+    bool TestHash(int64_t hash64) const;
+
+    int32_t GetNumHashFunctions() const {
+        return num_hash_functions_;
+    }
+
+    const BitSet& GetBitSet() const {
+        return *bit_set_;
+    }
+
+    class BitSet {
+     public:
+        BitSet(const std::shared_ptr<Bytes>& bytes, int32_t offset);
+        void Set(int32_t index);
+        bool Get(int32_t index) const;
+        int32_t BitSize() const;
+
+     private:
+        static constexpr int8_t MASK = 0x07;
+
+     private:
+        int32_t offset_;
+        std::shared_ptr<Bytes> bytes_;
+    };
+
+ private:
+    static constexpr int32_t BYTE_SIZE = 8;
+
+ private:
+    int32_t num_bits_ = -1;
+    int32_t num_hash_functions_ = -1;
+    std::shared_ptr<MemoryPool> pool_;
+    std::unique_ptr<BitSet> bit_set_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/utils/bloom_filter64_test.cpp 
b/src/paimon/common/utils/bloom_filter64_test.cpp
new file mode 100644
index 0000000..aa548fb
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter64_test.cpp
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/bloom_filter64.h"
+
+#include <cstring>
+#include <limits>
+#include <random>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace paimon::test {
+
+TEST(BloomFilter64Test, TestSimple) {
+    int32_t items = 10000;
+    auto pool = GetDefaultPool();
+    BloomFilter64 bloom_filter(items, 0.02, pool);
+    std::mt19937_64 engine(std::random_device{}());  // 
NOLINT(whitespace/braces)
+    std::uniform_int_distribution<int64_t> 
distribution(std::numeric_limits<int64_t>::min(),
+                                                        
std::numeric_limits<int64_t>::max());
+    std::set<int64_t> test_data;
+    for (int32_t i = 0; i < items; i++) {
+        int64_t random = distribution(engine);
+        test_data.insert(random);
+        bloom_filter.AddHash(random);
+    }
+
+    for (const auto& value : test_data) {
+        ASSERT_TRUE(bloom_filter.TestHash(value));
+    }
+
+    // test false positive
+    int32_t false_positives = 0;
+    int32_t num = 1000000;
+    for (int32_t i = 0; i < num; i++) {
+        int64_t random = distribution(engine);
+        if (bloom_filter.TestHash(random) && test_data.find(random) == 
test_data.end()) {
+            false_positives++;
+        }
+    }
+    ASSERT_TRUE(static_cast<double>(false_positives) / num < 0.03);
+}
+
+TEST(BloomFilter64Test, TestCompatibleWithJava) {
+    // data: -10, -5, 0, 13, 100, 200, 500
+    std::vector<uint8_t> se_bytes = {241, 255, 17, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+    auto pool = GetDefaultPool();
+    auto bytes = std::make_shared<Bytes>(se_bytes.size(), pool.get());
+    memcpy(bytes->data(), reinterpret_cast<char*>(se_bytes.data()), 
bytes->size());
+    auto bit_set = std::make_unique<BloomFilter64::BitSet>(bytes, 
/*offset=*/0);
+    BloomFilter64 bloom_filter(/*num_hash_functions=*/7, std::move(bit_set));
+    std::vector<int64_t> expected_data = {-10, -5, 0, 13, 100, 200, 500};
+    for (const auto& value : expected_data) {
+        ASSERT_TRUE(bloom_filter.TestHash(value));
+    }
+
+    BloomFilter64 bloom_filter2(10, 0.01, pool);
+    ASSERT_EQ(7, bloom_filter2.GetNumHashFunctions());
+    ASSERT_EQ(se_bytes.size() * BloomFilter64::BYTE_SIZE, 
bloom_filter2.num_bits_);
+    ASSERT_EQ(se_bytes.size(), bloom_filter2.GetBitSet().bytes_->size());
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/bloom_filter_test.cpp 
b/src/paimon/common/utils/bloom_filter_test.cpp
new file mode 100644
index 0000000..fdd6abb
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter_test.cpp
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/bloom_filter.h"
+
+#include <cstring>
+#include <limits>
+#include <random>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(BloomFilterTest, TestOneSegmentBuilder) {
+    int32_t items = 100;
+    auto pool = GetDefaultPool();
+    auto bloom_filter = BloomFilter::Create(items, 0.01);
+    auto seg = MemorySegment::AllocateHeapMemory(1024, pool.get());
+    ASSERT_OK(bloom_filter->SetMemorySegment(seg));
+
+    std::mt19937_64 engine(std::random_device{}());  // 
NOLINT(whitespace/braces)
+    std::uniform_int_distribution<int32_t> distribution(0, items);
+    std::set<int32_t> test_data;
+    for (int32_t i = 0; i < items; i++) {
+        int32_t random = distribution(engine);
+        test_data.insert(random);
+        ASSERT_OK(bloom_filter->AddHash(random));
+    }
+
+    for (const auto& value : test_data) {
+        ASSERT_TRUE(bloom_filter->TestHash(value));
+    }
+}
+
+TEST(BloomFilterTest, TestEstimatedHashFunctions) {
+    ASSERT_EQ(7, BloomFilter::Create(1000, 0.01)->GetNumHashFunctions());
+    ASSERT_EQ(7, BloomFilter::Create(10000, 0.01)->GetNumHashFunctions());
+    ASSERT_EQ(7, BloomFilter::Create(100000, 0.01)->GetNumHashFunctions());
+    ASSERT_EQ(4, BloomFilter::Create(100000, 0.05)->GetNumHashFunctions());
+    ASSERT_EQ(7, BloomFilter::Create(1000000, 0.01)->GetNumHashFunctions());
+    ASSERT_EQ(4, BloomFilter::Create(1000000, 0.05)->GetNumHashFunctions());
+}
+
+TEST(BloomFilterTest, TestBloomNumBits) {
+    ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(0, 0));
+    ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(0, 1));
+    ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(1, 1));
+    ASSERT_EQ(7, BloomFilter::OptimalNumOfBits(1, 0.03));
+    ASSERT_EQ(72, BloomFilter::OptimalNumOfBits(10, 0.03));
+    ASSERT_EQ(729, BloomFilter::OptimalNumOfBits(100, 0.03));
+    ASSERT_EQ(7298, BloomFilter::OptimalNumOfBits(1000, 0.03));
+    ASSERT_EQ(72984, BloomFilter::OptimalNumOfBits(10000, 0.03));
+    ASSERT_EQ(729844, BloomFilter::OptimalNumOfBits(100000, 0.03));
+    ASSERT_EQ(7298440, BloomFilter::OptimalNumOfBits(1000000, 0.03));
+    ASSERT_EQ(6235224, BloomFilter::OptimalNumOfBits(1000000, 0.05));
+    ASSERT_EQ(1870567268, BloomFilter::OptimalNumOfBits(300000000, 0.05));
+    ASSERT_EQ(1437758756, BloomFilter::OptimalNumOfBits(300000000, 0.1));
+    ASSERT_EQ(432808512, BloomFilter::OptimalNumOfBits(300000000, 0.5));
+    ASSERT_EQ(1393332198, BloomFilter::OptimalNumOfBits(3000000000, 0.8));
+    ASSERT_EQ(657882327, BloomFilter::OptimalNumOfBits(3000000000, 0.9));
+    ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(3000000000, 1));
+}
+
+TEST(BloomFilterTest, TestBloomNumHashFunctions) {
+    ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(-1, -1));
+    ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(0, 0));
+    ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10, 0));
+    ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10, 10));
+    ASSERT_EQ(7, BloomFilter::OptimalNumOfHashFunctions(10, 100));
+    ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(100, 100));
+    ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000, 100));
+    ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10000, 100));
+    ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(100000, 100));
+    ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000000, 100));
+    ASSERT_EQ(3634, BloomFilter::OptimalNumOfHashFunctions(100, 64 * 1024 * 
8));
+    ASSERT_EQ(363, BloomFilter::OptimalNumOfHashFunctions(1000, 64 * 1024 * 
8));
+    ASSERT_EQ(36, BloomFilter::OptimalNumOfHashFunctions(10000, 64 * 1024 * 
8));
+    ASSERT_EQ(4, BloomFilter::OptimalNumOfHashFunctions(100000, 64 * 1024 * 
8));
+    ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000000, 64 * 1024 * 
8));
+}
+
+TEST(BloomFilterTest, TestBloomFilter) {
+    int32_t items = 100;
+    auto pool = GetDefaultPool();
+    auto bloom_filter = std::make_shared<BloomFilter>(100, 1024);
+
+    std::mt19937_64 engine(std::random_device{}());  // 
NOLINT(whitespace/braces)
+    std::uniform_int_distribution<int32_t> distribution(0, items);
+
+    // segments 1
+    auto seg1 = MemorySegment::AllocateHeapMemory(1024, pool.get());
+    ASSERT_OK(bloom_filter->SetMemorySegment(seg1));
+
+    std::set<int32_t> test_data1;
+    for (int32_t i = 0; i < items; i++) {
+        int32_t random = distribution(engine);
+        test_data1.insert(random);
+        ASSERT_OK(bloom_filter->AddHash(random));
+    }
+    for (const auto& value : test_data1) {
+        ASSERT_TRUE(bloom_filter->TestHash(value));
+    }
+
+    // segments 2
+    std::set<int32_t> test_data2;
+    auto seg2 = MemorySegment::AllocateHeapMemory(1024, pool.get());
+    ASSERT_OK(bloom_filter->SetMemorySegment(seg2));
+    for (int32_t i = 0; i < items; i++) {
+        int32_t random = distribution(engine);
+        test_data2.insert(random);
+        ASSERT_OK(bloom_filter->AddHash(random));
+    }
+    for (const auto& value : test_data2) {
+        ASSERT_TRUE(bloom_filter->TestHash(value));
+    }
+    // switch to segment1
+    ASSERT_OK(bloom_filter->SetMemorySegment(seg1));
+    for (const auto& value : test_data1) {
+        ASSERT_TRUE(bloom_filter->TestHash(value));
+    }
+
+    // clear segment1
+    bloom_filter->Reset();
+    for (const auto& value : test_data1) {
+        ASSERT_FALSE(bloom_filter->TestHash(value));
+    }
+
+    // switch to segment2 and clear
+    ASSERT_OK(bloom_filter->SetMemorySegment(seg2));
+    bloom_filter->Reset();
+    for (const auto& value : test_data2) {
+        ASSERT_FALSE(bloom_filter->TestHash(value));
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/crc32c.cpp 
b/src/paimon/common/utils/crc32c.cpp
new file mode 100644
index 0000000..1cba744
--- /dev/null
+++ b/src/paimon/common/utils/crc32c.cpp
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/crc32c.h"
+
+#include "arrow/util/crc32.h"
+
+namespace paimon {
+uint32_t CRC32C::calculate(const char* data, size_t length, uint32_t crc) {
+#if defined(PAIMON_HAVE_SSE4_2)
+    return crc32c_hw(data, length, crc);
+#else
+    return arrow::internal::crc32(crc, data, length);
+#endif
+}
+
+#if defined(PAIMON_HAVE_SSE4_2)
+uint32_t CRC32C::crc32c_hw(const char* data, size_t length, uint32_t crc) {
+    crc = ~crc;
+
+    while (length && (reinterpret_cast<uintptr_t>(data) & 7)) {
+        crc = _mm_crc32_u8(crc, *data++);
+        length--;
+    }
+
+    while (length >= 8) {
+        crc = _mm_crc32_u64(crc, *reinterpret_cast<const uint64_t*>(data));
+        data += 8;
+        length -= 8;
+    }
+
+    while (length >= 4) {
+        crc = _mm_crc32_u32(crc, *reinterpret_cast<const uint32_t*>(data));
+        data += 4;
+        length -= 4;
+    }
+
+    while (length >= 2) {
+        crc = _mm_crc32_u16(crc, *reinterpret_cast<const uint16_t*>(data));
+        data += 2;
+        length -= 2;
+    }
+
+    while (length--) {
+        crc = _mm_crc32_u8(crc, *data++);
+    }
+
+    return ~crc;
+}
+#endif
+}  // namespace paimon
diff --git a/src/paimon/common/utils/crc32c.h b/src/paimon/common/utils/crc32c.h
new file mode 100644
index 0000000..7c4db75
--- /dev/null
+++ b/src/paimon/common/utils/crc32c.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#if defined(PAIMON_HAVE_SSE4_2)
+#include <nmmintrin.h>
+#endif
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// CRC32C
+class PAIMON_EXPORT CRC32C {
+ public:
+    static uint32_t calculate(const char* data, size_t length, uint32_t crc = 
0);
+
+ private:
+#if defined(PAIMON_HAVE_SSE4_2)
+    /// Simd implementation for crc32c.
+    ///
+    /// @param data data to be calculated
+    /// @param length length of data
+    /// @param crc initial crc value
+    /// @return crc32c value
+    static uint32_t crc32c_hw(const char* data, size_t length, uint32_t crc);
+#endif
+};
+}  // namespace paimon
diff --git a/src/paimon/common/utils/crc32c_test.cpp 
b/src/paimon/common/utils/crc32c_test.cpp
new file mode 100644
index 0000000..10dbbbf
--- /dev/null
+++ b/src/paimon/common/utils/crc32c_test.cpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/crc32c.h"
+
+#include <cstring>
+#include <limits>
+#include <random>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "arrow/util/crc32.h"
+#include "gtest/gtest.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+
+TEST(CRC32CTest, TestSimple) {
+    char a = 'a';
+    ASSERT_EQ(CRC32C::calculate(&a, 1), 3904355907);
+
+    std::string data = "hello paimon c++";
+    ASSERT_EQ(CRC32C::calculate(data.c_str(), data.size()), 1311805437);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/delta_varint_compressor.cpp 
b/src/paimon/common/utils/delta_varint_compressor.cpp
new file mode 100644
index 0000000..89f908d
--- /dev/null
+++ b/src/paimon/common/utils/delta_varint_compressor.cpp
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/delta_varint_compressor.h"
+
+#include <cstdint>
+#include <utility>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+std::vector<char> DeltaVarintCompressor::Compress(const std::vector<int64_t>& 
data) {
+    if (data.empty()) {
+        return {};
+    }
+
+    // 1. Delta encoding (use unsigned subtraction to avoid signed overflow UB)
+    std::vector<int64_t> deltas;
+    deltas.reserve(data.size());
+    deltas.push_back(data[0]);
+    for (size_t i = 1; i < data.size(); i++) {
+        uint64_t unsigned_delta =
+            static_cast<uint64_t>(data[i]) - static_cast<uint64_t>(data[i - 
1]);
+        deltas.push_back(static_cast<int64_t>(unsigned_delta));
+    }
+
+    // 2. ZigZag + Varint
+    std::vector<char> out;
+    out.reserve(data.size() * 10);
+    for (const auto& delta : deltas) {
+        EncodeVarint(delta, &out);
+    }
+    return out;
+}
+
+Result<std::vector<int64_t>> DeltaVarintCompressor::Decompress(const 
std::vector<char>& bytes) {
+    if (bytes.empty()) {
+        return std::vector<int64_t>();
+    }
+
+    // 1. Decode ZigZag + Varint → delta
+    std::vector<int64_t> deltas;
+    deltas.reserve(bytes.size());
+    size_t pos = 0;
+    while (pos < bytes.size()) {
+        PAIMON_ASSIGN_OR_RAISE(int64_t delta, DecodeVarint(bytes, &pos));
+        deltas.push_back(delta);
+    }
+
+    // 2. Delta decoding (use unsigned addition to avoid signed overflow UB)
+    std::vector<int64_t> result(deltas.size());
+    result[0] = deltas[0];
+    for (size_t i = 1; i < result.size(); i++) {
+        uint64_t reconstructed =
+            static_cast<uint64_t>(result[i - 1]) + 
static_cast<uint64_t>(deltas[i]);
+        result[i] = static_cast<int64_t>(reconstructed);
+    }
+    return result;
+}
+
+void DeltaVarintCompressor::EncodeVarint(int64_t value, std::vector<char>* 
out) {
+    uint64_t tmp = ZigZag(value);
+    // Check if multiple bytes are needed
+    while (tmp & ~0x7F) {
+        // Set MSB to 1 (continuation)
+        out->push_back(static_cast<char>((tmp & 0x7F) | 0x80));
+        // Unsigned right shift
+        tmp >>= 7;
+    }
+    // Final byte with MSB set to 0
+    out->push_back(static_cast<char>(tmp));
+}
+
+Result<int64_t> DeltaVarintCompressor::DecodeVarint(const std::vector<char>& 
in, size_t* pos) {
+    uint64_t result = 0;
+    int32_t shift = 0;
+    while (true) {
+        if (*pos >= in.size()) {
+            return Status::Invalid("Unexpected end of input");
+        }
+        char byte = in[(*pos)++];
+        // Extract 7 bits
+        result |= (static_cast<uint64_t>(byte & 0x7F) << shift);
+        // MSB is 0, end of encoding
+        if ((byte & 0x80) == 0) {
+            break;
+        }
+        shift += 7;
+        if (shift > 63) {
+            return Status::Invalid("Varint overflow");
+        }
+    }
+    return UnZigZag(result);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/delta_varint_compressor.h 
b/src/paimon/common/utils/delta_varint_compressor.h
new file mode 100644
index 0000000..a66cbcf
--- /dev/null
+++ b/src/paimon/common/utils/delta_varint_compressor.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// Combining Delta Encoding and Varints Encoding, suitable for integer 
sequences that are
+/// increasing or not significantly different.
+class PAIMON_EXPORT DeltaVarintCompressor {
+ public:
+    // Compresses an int64_t array using delta encoding, ZigZag 
transformation, and Varints encoding
+    static std::vector<char> Compress(const std::vector<int64_t>& data);
+    // Decompresses a byte array back to the original long array
+    static Result<std::vector<int64_t>> Decompress(const std::vector<char>& 
bytes);
+
+ private:
+    // Encodes a long value using ZigZag and Varints
+    static void EncodeVarint(int64_t value, std::vector<char>* out);
+    // Decodes a Varints-encoded value and reverses ZigZag transformation
+    static Result<int64_t> DecodeVarint(const std::vector<char>& in, size_t* 
pos);
+
+    inline static uint64_t ZigZag(int64_t value) {
+        return ((static_cast<uint64_t>(value) << 1) ^ (value >> 63));
+    }
+    inline static int64_t UnZigZag(uint64_t value) {
+        return (value >> 1) ^ -(value & 1);
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/delta_varint_compressor_test.cpp 
b/src/paimon/common/utils/delta_varint_compressor_test.cpp
new file mode 100644
index 0000000..ac0017b
--- /dev/null
+++ b/src/paimon/common/utils/delta_varint_compressor_test.cpp
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/delta_varint_compressor.h"
+
+#include <cstdio>
+#include <fstream>
+#include <limits>
+#include <memory>
+#include <random>
+#include <string>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(DeltaVarintCompressorTest, TestCompatibleWithJava) {
+    std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>();
+
+    for (int32_t i = 0; i < 5; ++i) {
+        std::string file_prefix = paimon::test::GetDataDir() +
+                                  "/delta_varint_compressor.data/case-000" + 
std::to_string(i);
+
+        // Read original data from text file
+        std::string original_file = file_prefix + ".txt";
+        std::ifstream in(original_file);
+        std::vector<int64_t> original;
+        int64_t value;
+        while (in >> value) {
+            original.push_back(value);
+        }
+        auto compressed = DeltaVarintCompressor::Compress(original);
+
+        // Read expected compressed bytes from binary file
+        std::string binary_file = file_prefix + ".bin";
+        std::string expected_bytes;
+        ASSERT_OK(fs->ReadFile(binary_file, &expected_bytes));
+        ASSERT_EQ(expected_bytes, std::string(compressed.data(), 
compressed.size()));
+
+        // Decompress and verify
+        ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+        ASSERT_EQ(original, decompressed);
+    }
+}
+
+/// Test cases from Java Paimon
+// Test case for normal compression and decompression
+TEST(DeltaVarintCompressorTest, TestNormalCase1) {
+    std::vector<int64_t> original = {80, 50, 90, 80, 70};
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    ASSERT_EQ(6u, compressed.size());
+}
+
+TEST(DeltaVarintCompressorTest, TestNormalCase2) {
+    std::vector<int64_t> original = {100, 50, 150, 100, 200};
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    ASSERT_EQ(8u, compressed.size());
+}
+
+TEST(DeltaVarintCompressorTest, TestRandomRoundTrip) {
+    std::mt19937 gen(123456789);
+    std::uniform_int_distribution<uint64_t> dist;
+
+    for (int32_t iter = 0; iter < 10000; ++iter) {
+        std::vector<int64_t> original;
+        for (int32_t i = 0; i < 100; ++i) {
+            original.push_back(static_cast<int64_t>(dist(gen)));
+        }
+
+        auto compressed = DeltaVarintCompressor::Compress(original);
+        ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+        ASSERT_EQ(original, decompressed);
+    }
+}
+
+// Test case for empty array
+TEST(DeltaVarintCompressorTest, TestEmptyArray) {
+    std::vector<int64_t> original;
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    ASSERT_EQ(0u, compressed.size());
+}
+
+// Test case for single-element array
+TEST(DeltaVarintCompressorTest, TestSingleElement) {
+    std::vector<int64_t> original = {42};
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    // Calculate expected size: Varint encoding for 42 (0x2A -> 1 byte)
+    ASSERT_EQ(1u, compressed.size());
+}
+
+// Test case for extreme values (INT64.MIN_VALUE and MAX_VALUE)
+TEST(DeltaVarintCompressorTest, TestExtremeValues) {
+    std::vector<int64_t> original(
+        {std::numeric_limits<int64_t>::min(), 
std::numeric_limits<int64_t>::max()});
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    // Expected size: 10 bytes (MIN_VALUE) + 1 bytes (delta overflow) = 11 
bytes
+    ASSERT_EQ(11u, compressed.size());
+}
+
+// Test case for negative deltas with ZigZag optimization
+TEST(DeltaVarintCompressorTest, TestNegativeDeltas) {
+    std::vector<int64_t> original = {100, -50, -150, -100};
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    // Verify ZigZag optimization: -1 → 1 (1 byte)
+    // Delta sequence: [100, -150, -100, 50] → ZigZag → Each encoded in 1-2 
bytes
+    ASSERT_LE(compressed.size(), 8u);
+}
+
+// Test case for unsorted data (worse compression ratio)
+TEST(DeltaVarintCompressorTest, TestUnsortedData) {
+    std::vector<int64_t> original = {1000, 5, 9999, 12345, 6789};
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    // Larger deltas → more bytes (e.g., 9994 → 3 bytes)
+    ASSERT_GT(compressed.size(), 5u);  // Worse than sorted case
+}
+
+// Test case for corrupted input (invalid Varint)
+TEST(DeltaVarintCompressorTest, TestCorruptedInput) {
+    std::vector<char> corrupted = {static_cast<char>(0x80), 
static_cast<char>(0x80),
+                                   static_cast<char>(0x80)};
+    ASSERT_NOK(DeltaVarintCompressor::Decompress(corrupted));
+}
+
+/// Test cases from Python Paimon
+// Test case for arrays with zero values
+TEST(DeltaVarintCompressorTest, TestZeroValues) {
+    std::vector<int64_t> original = {0, 0, 0, 0, 0};
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    // All deltas are 0, so should compress very well
+    ASSERT_LE(compressed.size(), 5u);
+}
+
+// Test case for ascending sequence (optimal for delta compression)
+TEST(DeltaVarintCompressorTest, TestAscendingSequence) {
+    std::vector<int64_t> original = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    // All deltas are 1, so should compress very well
+    ASSERT_LE(compressed.size(), 15u);  // Much smaller than original
+}
+
+// Test case for descending sequence
+TEST(DeltaVarintCompressorTest, TestDescendingSequence) {
+    std::vector<int64_t> original = {10, 9, 8, 7, 6, 5, 4, 3, 2, 1};
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    // All deltas are -1, should still compress well with ZigZag
+    ASSERT_LE(compressed.size(), 15u);
+}
+
+// Test case for large positive values
+TEST(DeltaVarintCompressorTest, TestLargePositiveValues) {
+    std::vector<int64_t> original = {1000000, 2000000, 3000000, 4000000};
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    // Large values but consistent deltas should still compress reasonably
+    ASSERT_GT(compressed.size(), 4u);  // Will be larger due to big numbers
+}
+
+// Test case for mixed positive and negative values
+TEST(DeltaVarintCompressorTest, TestMixedPositiveNegative) {
+    std::vector<int64_t> original = {100, -200, 300, -400, 500};
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    // Mixed signs create larger deltas
+    ASSERT_GT(compressed.size(), 5u);
+}
+
+// Test that compression actually reduces size for suitable data
+TEST(DeltaVarintCompressorTest, TestCompressionEfficiency) {
+    // Create a sequence with small deltas
+    std::vector<int64_t> original;
+    std::mt19937 gen(42);  // Fixed seed for reproducibility
+    std::uniform_int_distribution<int32_t> delta_dist(-10, 10);
+
+    int64_t base = 1000;
+    for (int32_t i = 0; i < 100; ++i) {
+        base += delta_dist(gen);  // Small deltas
+        original.push_back(base);
+    }
+
+    auto compressed = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+    ASSERT_EQ(original, decompressed);
+    // For small deltas, compression should be effective
+    // Original would need 8 bytes per int64_t (800 bytes), compressed should 
be much smaller
+    ASSERT_LT(compressed.size(), original.size() * 4);  // At least 50% 
compression
+}
+
+// Test that multiple compress/decompress cycles are consistent
+TEST(DeltaVarintCompressorTest, TestRoundTripConsistency) {
+    std::vector<int64_t> original = {1, 10, 100, 1000, 10000};
+
+    // First round trip
+    auto compressed1 = DeltaVarintCompressor::Compress(original);
+    ASSERT_OK_AND_ASSIGN(auto decompressed1, 
DeltaVarintCompressor::Decompress(compressed1));
+
+    // Second round trip
+    auto compressed2 = DeltaVarintCompressor::Compress(decompressed1);
+    ASSERT_OK_AND_ASSIGN(auto decompressed2, 
DeltaVarintCompressor::Decompress(compressed2));
+
+    // All should be identical
+    ASSERT_EQ(original, decompressed1);
+    ASSERT_EQ(original, decompressed2);
+    ASSERT_EQ(compressed1, compressed2);
+}
+
+// Test boundary values for varint encoding
+TEST(DeltaVarintCompressorTest, TestBoundaryValues) {
+    // Test values around varint boundaries (127, 16383, etc.)
+    std::vector<int64_t> boundary_values = {0,     1,      127,    128,    
255,   256,  16383,
+                                            16384, 32767,  32768,  -1,     
-127,  -128, -255,
+                                            -256,  -16383, -16384, -32767, 
-32768};
+
+    auto compressed = DeltaVarintCompressor::Compress(boundary_values);
+    ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+    ASSERT_EQ(boundary_values, decompressed);
+}
+
+// Test ZigZag encoding compatibility with Java implementation
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityZigZagEncoding) {
+    // Test cases that verify ZigZag encoding matches Java's implementation
+    // ZigZag mapping: 0->0, -1->1, 1->2, -2->3, 2->4, -3->5, 3->6, etc.
+    std::vector<std::pair<int64_t, uint64_t>> zigzag_test_cases = {
+        {0, 0},      // 0 -> 0
+        {-1, 1},     // -1 -> 1
+        {1, 2},      // 1 -> 2
+        {-2, 3},     // -2 -> 3
+        {2, 4},      // 2 -> 4
+        {-3, 5},     // -3 -> 5
+        {3, 6},      // 3 -> 6
+        {-64, 127},  // -64 -> 127
+        {64, 128},   // 64 -> 128
+        {-65, 129},  // -65 -> 129
+    };
+
+    for (const auto& [original_value, expected_zigzag] : zigzag_test_cases) {
+        // Test single value compression to verify ZigZag encoding
+        std::vector<int64_t> single_value = {original_value};
+        auto compressed = DeltaVarintCompressor::Compress(single_value);
+        ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+
+        ASSERT_EQ(single_value, decompressed)
+            << "ZigZag encoding failed for value " << original_value;
+        ASSERT_EQ(expected_zigzag, 
DeltaVarintCompressor::ZigZag(original_value))
+            << "ZigZag encoding failed for value " << original_value;
+    }
+}
+
+// Test with known test vectors that should match Java implementation
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityKnownVectors) {
+    // Test vectors with expected compressed output (hexadecimal)
+    std::vector<std::pair<std::vector<int64_t>, std::string>> test_vectors = {
+        // Simple cases
+        {{0}, "00"},   // 0 -> ZigZag(0) = 0 -> Varint(0) = 0x00
+        {{1}, "02"},   // 1 -> ZigZag(1) = 2 -> Varint(2) = 0x02
+        {{-1}, "01"},  // -1 -> ZigZag(-1) = 1 -> Varint(1) = 0x01
+        {{2}, "04"},   // 2 -> ZigZag(2) = 4 -> Varint(4) = 0x04
+        {{-2}, "03"},  // -2 -> ZigZag(-2) = 3 -> Varint(3) = 0x03
+
+        // Delta encoding cases
+        {{0, 1}, "0002"},   // [0, 1] -> [0, delta=1] -> [0x00, 0x02]
+        {{1, 2}, "0202"},   // [1, 2] -> [1, delta=1] -> [0x02, 0x02]
+        {{0, -1}, "0001"},  // [0, -1] -> [0, delta=-1] -> [0x00, 0x01]
+        {{1, 0}, "0201"},   // [1, 0] -> [1, delta=-1] -> [0x02, 0x01]
+
+        // Larger values
+        {{127}, "fe01"},   // 127 -> ZigZag(127) = 254 -> Varint(254) = 0xfe01
+        {{-127}, "fd01"},  // -127 -> ZigZag(-127) = 253 -> Varint(253) = 
0xfd01
+        {{128}, "8002"},   // 128 -> ZigZag(128) = 256 -> Varint(256) = 0x8002
+        {{-128}, "ff01"},  // -128 -> ZigZag(-128) = 255 -> Varint(255) = 
0xff01
+    };
+
+    auto bytes_to_hex = [](const std::vector<char>& bytes) -> std::string {
+        std::string hex;
+        for (char byte : bytes) {
+            char buf[3];
+            snprintf(buf, sizeof(buf), "%02x", static_cast<unsigned 
char>(byte));
+            hex += buf;
+        }
+        return hex;
+    };
+
+    for (const auto& [original, expected_hex] : test_vectors) {
+        auto compressed = DeltaVarintCompressor::Compress(original);
+        std::string actual_hex = bytes_to_hex(compressed);
+
+        ASSERT_EQ(expected_hex, actual_hex)
+            << "Binary compatibility failed for original data. "
+            << "Expected: " << expected_hex << ", Got: " << actual_hex;
+
+        // Also verify round-trip
+        ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+        ASSERT_EQ(original, decompressed) << "Round-trip failed for original 
data";
+    }
+}
+
+// Test compatibility with Java for large numbers (64-bit range)
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityLargeNumbers) {
+    // Test cases covering the full 64-bit signed integer range
+    std::vector<int64_t> large_number_cases = {
+        2147483647LL,            // Integer.MAX_VALUE
+        -2147483648LL,           // Integer.MIN_VALUE
+        9223372036854775807LL,   // Long.MAX_VALUE
+        -9223372036854775807LL,  // Long.MIN_VALUE + 1 (avoid overflow)
+        4294967295LL,            // 2^32 - 1
+        -4294967296LL,           // -2^32
+    };
+
+    for (int64_t value : large_number_cases) {
+        // Test individual values
+        std::vector<int64_t> single_value = {value};
+        auto compressed = DeltaVarintCompressor::Compress(single_value);
+        ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+        ASSERT_EQ(single_value, decompressed) << "Large number compatibility 
failed for " << value;
+    }
+
+    // Test as a sequence to verify delta encoding with large numbers
+    auto compressed_seq = DeltaVarintCompressor::Compress(large_number_cases);
+    ASSERT_OK_AND_ASSIGN(auto decompressed_seq, 
DeltaVarintCompressor::Decompress(compressed_seq));
+    ASSERT_EQ(large_number_cases, decompressed_seq) << "Large number sequence 
compatibility failed";
+}
+
+// Test Varint encoding boundaries that match Java implementation
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityVarintBoundaries) {
+    // Test values at Varint encoding boundaries
+    std::vector<int64_t> varint_boundary_cases = {
+        // 1-byte Varint boundary
+        63,   // ZigZag(63) = 126, fits in 1 byte
+        64,   // ZigZag(64) = 128, needs 2 bytes
+        -64,  // ZigZag(-64) = 127, fits in 1 byte
+        -65,  // ZigZag(-65) = 129, needs 2 bytes
+
+        // 2-byte Varint boundary
+        8191,   // ZigZag(8191) = 16382, fits in 2 bytes
+        8192,   // ZigZag(8192) = 16384, needs 3 bytes
+        -8192,  // ZigZag(-8192) = 16383, fits in 2 bytes
+        -8193,  // ZigZag(-8193) = 16385, needs 3 bytes
+
+        // 3-byte Varint boundary
+        1048575,  // ZigZag(1048575) = 2097150, fits in 3 bytes
+        1048576,  // ZigZag(1048576) = 2097152, needs 4 bytes
+    };
+
+    for (int64_t value : varint_boundary_cases) {
+        std::vector<int64_t> single_value = {value};
+        auto compressed = DeltaVarintCompressor::Compress(single_value);
+        ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+        ASSERT_EQ(single_value, decompressed)
+            << "Varint boundary compatibility failed for " << value;
+    }
+}
+
+// Test delta encoding edge cases for Java compatibility
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityDeltaEdgeCases) {
+    // Edge cases that test delta encoding behavior
+    std::vector<std::vector<int64_t>> delta_edge_cases = {
+        // Maximum positive delta
+        {0, std::numeric_limits<int64_t>::max()},
+        // Maximum negative delta
+        {std::numeric_limits<int64_t>::max(), 0},
+        // Alternating large deltas
+        {0, 1000000, -1000000, 2000000, -2000000},
+        // Sequence with zero deltas
+        {42, 42, 42, 42},
+        // Mixed small and large deltas
+        {0, 1, 1000000, 1000001, 0},
+    };
+
+    for (const auto& test_case : delta_edge_cases) {
+        auto compressed = DeltaVarintCompressor::Compress(test_case);
+        ASSERT_OK_AND_ASSIGN(auto decompressed, 
DeltaVarintCompressor::Decompress(compressed));
+        ASSERT_EQ(test_case, decompressed) << "Delta edge case compatibility 
failed";
+    }
+}
+
+// Test error conditions that should match Java behavior
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityErrorConditions) {
+    // Test cases for error handling - our implementation gracefully handles
+    // truncated data by returning errors, which is acceptable behavior
+
+    // Test with various truncated/invalid byte sequences
+    std::vector<std::vector<char>> invalid_cases = {
+        {static_cast<char>(0x80)},                           // Single 
incomplete byte
+        {static_cast<char>(0x80), static_cast<char>(0x80)},  // Incomplete 
3-byte varint
+        {static_cast<char>(0xFF), static_cast<char>(0xFF), 
static_cast<char>(0xFF),
+         static_cast<char>(0xFF), static_cast<char>(0xFF), 
static_cast<char>(0xFF),
+         static_cast<char>(0xFF), static_cast<char>(0xFF), 
static_cast<char>(0xFF),
+         static_cast<char>(0x80)},  // Long sequence
+    };
+
+    for (const auto& invalid_data : invalid_cases) {
+        // Our implementation handles invalid data by returning an error
+        // This is acceptable behavior for robustness
+        auto result = DeltaVarintCompressor::Decompress(invalid_data);
+        ASSERT_NOK(result) << "Should return error for invalid data";
+    }
+
+    // Test that valid empty input returns empty list
+    std::vector<char> empty_input;
+    ASSERT_OK_AND_ASSIGN(auto empty_result, 
DeltaVarintCompressor::Decompress(empty_input));
+    ASSERT_TRUE(empty_result.empty()) << "Empty input should return empty 
list";
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/murmurhash_utils.h 
b/src/paimon/common/utils/murmurhash_utils.h
index cebf41b..52831c9 100644
--- a/src/paimon/common/utils/murmurhash_utils.h
+++ b/src/paimon/common/utils/murmurhash_utils.h
@@ -7,14 +7,13 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 /*
diff --git a/src/paimon/common/utils/murmurhash_utils_test.cpp 
b/src/paimon/common/utils/murmurhash_utils_test.cpp
index b4a475f..d0c09ea 100644
--- a/src/paimon/common/utils/murmurhash_utils_test.cpp
+++ b/src/paimon/common/utils/murmurhash_utils_test.cpp
@@ -7,14 +7,13 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 #include "paimon/common/utils/murmurhash_utils.h"
diff --git a/src/paimon/common/utils/var_length_int_utils.h 
b/src/paimon/common/utils/var_length_int_utils.h
new file mode 100644
index 0000000..ec76460
--- /dev/null
+++ b/src/paimon/common/utils/var_length_int_utils.h
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <cstring>
+
+#include "fmt/format.h"
+#include "paimon/macros.h"
+#include "paimon/result.h"
+namespace paimon {
+
+/// Variable-length integer encoding/decoding utilities.
+///
+/// Encoding format (same as protobuf unsigned varint):
+///   - Each byte stores 7 payload bits in bits [6:0].
+///   - Bit 7 (0x80) is the continuation flag: 1 = more bytes follow, 0 = last 
byte.
+///   - A varint32 uses at most 5 bytes; a varint64 uses at most 9 bytes.
+///
+/// Based on the LongPacker from PalDB (https://github.com/linkedin/PalDB),
+/// licensed under Apache 2.0.
+class VarLengthIntUtils {
+ public:
+    VarLengthIntUtils() = delete;
+    ~VarLengthIntUtils() = delete;
+
+    static constexpr int32_t kMaxVarIntSize = 5;
+    static constexpr int32_t kMaxVarLongSize = 9;
+
+    // ==================== Encoding (writes to char*) ====================
+
+    /// Encodes a non-negative int32 as varint into `dest`.
+    /// Returns the number of bytes written.
+    static Result<int32_t> EncodeInt(int32_t value, char* dest) {
+        if (PAIMON_UNLIKELY(value < 0)) {
+            return Status::Invalid(
+                fmt::format("negative value: v={} for VarLengthInt Encoding", 
value));
+        }
+        int32_t num_bytes = 0;
+        while ((value & ~0x7F) != 0) {
+            dest[num_bytes] = static_cast<char>((value & 0x7F) | 0x80);
+            value >>= 7;
+            ++num_bytes;
+        }
+        dest[num_bytes] = static_cast<char>(value);
+        return num_bytes + 1;
+    }
+
+    /// Encodes a non-negative int64 as varint into `dest`.
+    /// Returns the number of bytes written.
+    static Result<int32_t> EncodeLong(int64_t value, char* dest) {
+        if (PAIMON_UNLIKELY(value < 0)) {
+            return Status::Invalid(
+                fmt::format("negative value: v={} for VarLengthInt Encoding", 
value));
+        }
+        int32_t num_bytes = 0;
+        while ((value & ~0x7FLL) != 0) {
+            dest[num_bytes] = static_cast<char>(static_cast<int32_t>(value & 
0x7F) | 0x80);
+            value >>= 7;
+            ++num_bytes;
+        }
+        dest[num_bytes] = static_cast<char>(value);
+        return num_bytes + 1;
+    }
+
+    // ==================== Decoding (reads from const char*) 
====================
+
+    /// Decodes a varint32 from `data` at `*offset`, advancing `*offset` past 
the consumed bytes.
+    /// Inlines a 1-byte fast path (values 0-127), which is the most common 
case.
+    static inline Result<int32_t> DecodeInt(const char* data, int32_t* offset) 
{
+        auto first_byte = static_cast<uint8_t>(data[*offset]);
+        if (PAIMON_LIKELY((first_byte & 0x80) == 0)) {
+            ++(*offset);
+            return static_cast<int32_t>(first_byte);
+        }
+        // Multi-byte: fall through to generic loop.
+        // NOTE: EncodeInt only encodes non-negative values, so a decoded 
negative result
+        // indicates malformed data.
+        uint32_t result = 0;
+        for (int32_t shift = 0; shift < 32; shift += 7) {
+            auto byte_val = static_cast<uint8_t>(data[*offset]);
+            ++(*offset);
+            result |= static_cast<uint32_t>(byte_val & 0x7F) << shift;
+            if ((byte_val & 0x80) == 0) {
+                auto signed_result = static_cast<int32_t>(result);
+                if (PAIMON_UNLIKELY(signed_result < 0)) {
+                    return Status::Invalid("Malformed varint32: decoded 
negative value");
+                }
+                return signed_result;
+            }
+        }
+        return Status::Invalid("Malformed varint32: too many continuation 
bytes");
+    }
+
+    /// Decodes a varint64 from `data` at `*offset`, advancing `*offset` past 
the consumed bytes.
+    /// Inlines a 1-byte fast path (values 0-127), which is the most common 
case.
+    static inline Result<int64_t> DecodeLong(const char* data, int32_t* 
offset) {
+        auto first_byte = static_cast<uint8_t>(data[*offset]);
+        if (PAIMON_LIKELY((first_byte & 0x80) == 0)) {
+            ++(*offset);
+            return static_cast<int64_t>(first_byte);
+        }
+        // Multi-byte: fall through to generic loop.
+        // NOTE: EncodeLong only encodes non-negative values, so a decoded 
negative result
+        // indicates malformed data.
+        uint64_t result = 0;
+        for (int32_t shift = 0; shift < 64; shift += 7) {
+            auto byte_val = static_cast<uint8_t>(data[*offset]);
+            ++(*offset);
+            result |= static_cast<uint64_t>(byte_val & 0x7F) << shift;
+            if ((byte_val & 0x80) == 0) {
+                auto signed_result = static_cast<int64_t>(result);
+                if (PAIMON_UNLIKELY(signed_result < 0)) {
+                    return Status::Invalid("Malformed varint64: decoded 
negative value");
+                }
+                return signed_result;
+            }
+        }
+        return Status::Invalid("Malformed varint64: too many continuation 
bytes");
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/var_length_int_utils_test.cpp 
b/src/paimon/common/utils/var_length_int_utils_test.cpp
new file mode 100644
index 0000000..c2404e2
--- /dev/null
+++ b/src/paimon/common/utils/var_length_int_utils_test.cpp
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/var_length_int_utils.h"
+
+#include <cstring>
+#include <limits>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(VarLengthIntUtilsTest, TestEncodeAndDecodeInt) {
+    std::vector<int32_t> test_values = {0,    1,     127,    128,     255,     
 256,
+                                        1000, 10000, 100000, 1000000, 
10000000, 2147483647};
+
+    for (int32_t value : test_values) {
+        char buffer[VarLengthIntUtils::kMaxVarIntSize];
+        std::memset(buffer, 0, sizeof(buffer));
+
+        ASSERT_OK_AND_ASSIGN(int32_t encoded_length, 
VarLengthIntUtils::EncodeInt(value, buffer));
+
+        int32_t offset = 0;
+        ASSERT_OK_AND_ASSIGN(int32_t decoded_value, 
VarLengthIntUtils::DecodeInt(buffer, &offset));
+
+        ASSERT_EQ(value, decoded_value) << "Encoded and decoded values don't 
match for: " << value;
+        ASSERT_EQ(offset, encoded_length) << "Offset doesn't match encoded 
length for: " << value;
+    }
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeAndDecodeLong) {
+    std::vector<int64_t> test_values = {0ll,
+                                        127ll,
+                                        128ll,
+                                        16383ll,
+                                        16384ll,
+                                        2097151ll,
+                                        2097152ll,
+                                        268435455ll,
+                                        268435456ll,
+                                        1234567890123456789ll,
+                                        202405170000000000ll,
+                                        999999999999999999ll,
+                                        std::numeric_limits<int64_t>::max()};
+
+    for (int64_t value : test_values) {
+        char buffer[VarLengthIntUtils::kMaxVarLongSize + 1];
+        std::memset(buffer, 0, sizeof(buffer));
+
+        ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t encoded_length,
+                             VarLengthIntUtils::EncodeLong(value, buffer));
+
+        int32_t offset = 0;
+        ASSERT_OK_AND_ASSIGN(int64_t decoded_value, 
VarLengthIntUtils::DecodeLong(buffer, &offset));
+
+        ASSERT_EQ(value, decoded_value) << "Encoded and decoded values don't 
match for: " << value;
+    }
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeNegativeValue) {
+    char buffer[VarLengthIntUtils::kMaxVarIntSize];
+    ASSERT_NOK_WITH_MSG(VarLengthIntUtils::EncodeInt(-1, buffer),
+                        "negative value: v=-1 for VarLengthInt Encoding");
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeNegativeLongValue) {
+    char buffer[VarLengthIntUtils::kMaxVarLongSize + 1];
+    ASSERT_NOK_WITH_MSG(VarLengthIntUtils::EncodeLong(-1, buffer),
+                        "negative value: v=-1 for VarLengthInt Encoding");
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeIntSequential) {
+    char buffer[VarLengthIntUtils::kMaxVarIntSize * 2];
+    std::memset(buffer, 0, sizeof(buffer));
+    int32_t value1 = 100;
+    int32_t value2 = 200;
+
+    ASSERT_OK_AND_ASSIGN(auto length1, VarLengthIntUtils::EncodeInt(value1, 
buffer));
+    ASSERT_OK_AND_ASSIGN([[maybe_unused]] auto length2,
+                         VarLengthIntUtils::EncodeInt(value2, buffer + 
length1));
+
+    int32_t offset = 0;
+    ASSERT_OK_AND_ASSIGN(int32_t decoded_result1, 
VarLengthIntUtils::DecodeInt(buffer, &offset));
+    ASSERT_EQ(value1, decoded_result1);
+    ASSERT_OK_AND_ASSIGN(int32_t decoded_result2, 
VarLengthIntUtils::DecodeInt(buffer, &offset));
+    ASSERT_EQ(value2, decoded_result2);
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeBytesNumber) {
+    std::vector<int32_t> values = {
+        0x7F,       // 127 - fits in 1 byte
+        0x80,       // 128 - needs 2 bytes
+        0x4000,     // 16384 - needs 3 bytes
+        0x200000,   // 2097152 - needs 4 bytes
+        2147483647  // 2147483647 - needs 5 bytes
+    };
+
+    for (int32_t i = 0; i < static_cast<int32_t>(values.size()); ++i) {
+        char buffer[VarLengthIntUtils::kMaxVarIntSize];
+        std::memset(buffer, 0, sizeof(buffer));
+        ASSERT_OK_AND_ASSIGN(int32_t encoded_length,
+                             VarLengthIntUtils::EncodeInt(values[i], buffer));
+        ASSERT_EQ(encoded_length, i + 1);
+    }
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeLongBytesNumber) {
+    std::vector<int64_t> values = {
+        0x7F,                                // 127 - fits in 1 byte
+        0x80,                                // 128 - needs 2 bytes
+        0x4000,                              // 16384 - needs 3 bytes
+        0x200000,                            // 2097152 - needs 4 bytes
+        2147483647,                          // 2147483647 - needs 5 bytes
+        34359738368ll,                       // 0x800000000 - needs 6 bytes
+        562949953421311ll,                   // 0x1FFFFFFFFFFFFF - needs 7 
bytes
+        72057594037927935ll,                 // 0xFFFFFFFFFFFFFF - needs 8 
bytes
+        std::numeric_limits<int64_t>::max()  // needs 9 bytes
+    };
+
+    for (int32_t i = 0; i < static_cast<int32_t>(values.size()); ++i) {
+        char buffer[VarLengthIntUtils::kMaxVarLongSize + 1];
+        std::memset(buffer, 0, sizeof(buffer));
+        ASSERT_OK_AND_ASSIGN(int32_t encoded_length,
+                             VarLengthIntUtils::EncodeLong(values[i], buffer));
+        ASSERT_EQ(encoded_length, i + 1) << values[i];
+    }
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/xxhash_test.cpp 
b/src/paimon/common/utils/xxhash_test.cpp
new file mode 100644
index 0000000..5d2daca
--- /dev/null
+++ b/src/paimon/common/utils/xxhash_test.cpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "xxhash.h"  // NOLINT(build/include_subdir)
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(XXHashTest, TestCompatibleWithJava) {
+    auto file_system = std::make_unique<LocalFileSystem>();
+    auto file_name = paimon::test::GetDataDir() + "/xxhash.data";
+    std::string bytes;
+    ASSERT_OK(file_system->ReadFile(file_name, &bytes));
+    auto lines = StringUtils::Split(bytes, "\n");
+    // 1000 random str and empty str
+    ASSERT_EQ(1001, lines.size());
+    for (const auto& line : lines) {
+        auto data_and_hash = StringUtils::Split(line, ",", 
/*ignore_empty=*/false);
+        ASSERT_EQ(2, data_and_hash.size());
+        const auto& str = data_and_hash[0];
+        int64_t expected_hash = std::stoull(data_and_hash[1], nullptr, 16);
+        ASSERT_EQ(expected_hash, XXH64(str.data(), str.size(), /*seed=*/0));
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/testing/utils/testharness.cpp 
b/src/paimon/testing/utils/testharness.cpp
index fa62690..9c61ac0 100644
--- a/src/paimon/testing/utils/testharness.cpp
+++ b/src/paimon/testing/utils/testharness.cpp
@@ -7,14 +7,13 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
@@ -29,12 +28,104 @@
 // Assert utilities is adapted from RocksDB
 // https://github.com/facebook/rocksdb/blob/main/test_util/testharness.cc
 
+// Copyright 2024 Google LLC.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// UniqueTestDirectory utility is adapted from LiteRT
+// https://github.com/google-ai-edge/LiteRT/blob/main/litert/test/common.cc
+
 #include "paimon/testing/utils/testharness.h"
 
+#include <unistd.h>
+
+#include <algorithm>
+#include <cassert>
+#include <cstdlib>
+#include <exception>
+#include <fstream>
+#include <iostream>
+#include <random>
+#include <vector>
+
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/common/utils/uuid.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/file_system_factory.h"
 #include "paimon/status.h"
 
 namespace paimon::test {
 
+std::string GetDataDir() {
+    const auto result = std::getenv("PAIMON_TEST_DATA");
+    if (!result || !result[0]) {
+        return "test/test_data/";
+    }
+    return std::string(result);
+}
+
+std::map<std::string, std::string> GetJindoTestOptions() {
+    const char* home = std::getenv("HOME");
+    std::string home_str = "";
+    if (home) {
+        home_str = std::string(home);
+    }
+    std::string config_file_path = home_str + "/.osscredentials";
+    std::ifstream config_file(config_file_path);
+    std::string access_key_id = "";
+    std::string access_key_secret = "";
+    if (config_file.is_open()) {
+        std::string line;
+        while (std::getline(config_file, line)) {
+            std::vector<std::string> key_value = StringUtils::Split(line, "=");
+            if (key_value.size() != 2) {
+                continue;
+            }
+            if (key_value[0].find("accessid") != std::string::npos) {
+                StringUtils::Trim(&key_value[1]);
+                access_key_id = key_value[1];
+            }
+            if (key_value[0].find("accesskey") != std::string::npos) {
+                StringUtils::Trim(&key_value[1]);
+                access_key_secret = key_value[1];
+            }
+        }
+        config_file.close();
+    }
+    std::map<std::string, std::string> options = {
+        {"fs.oss.bucket.paimon-unittest.endpoint", 
"oss-cn-hangzhou-zmf.aliyuncs.com"},
+        {"fs.oss.bucket.paimon-unittest.accessKeyId", access_key_id},
+        {"fs.oss.bucket.paimon-unittest.accessKeySecret", access_key_secret},
+        {"fs.oss.user", "paimon"},
+    };
+    return options;
+}
+std::string GetJindoTestDir() {
+    static const std::string dir = "oss://paimon-unittest/temp/";
+    return dir;
+}
+
+int64_t RandomNumber(int64_t min, int64_t max) {
+    static thread_local std::mt19937 generator(
+        std::random_device{}());  // NOLINT(whitespace/braces)
+    std::uniform_int_distribution<int64_t> distribution(min, max);
+    return distribution(generator);
+}
+
+std::string GetPidStr() {
+    return std::to_string(getpid());
+}
+
 ::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s) {
     if (s.ok()) {
         return ::testing::AssertionSuccess();
@@ -43,4 +134,69 @@ namespace paimon::test {
     }
 }
 
+std::unique_ptr<UniqueTestDirectory> UniqueTestDirectory::Create(const 
std::string& fs_identifier) {
+    static const size_t kMaxTries = 1000;
+    std::string tmp_dir = fs_identifier == "jindo"
+                              ? GetJindoTestDir()
+                              : 
std::filesystem::temp_directory_path().string() + "/";
+    std::map<std::string, std::string> fs_options =
+        fs_identifier == "jindo" ? GetJindoTestOptions() : 
std::map<std::string, std::string>();
+    auto fs = FileSystemFactory::Get(fs_identifier, tmp_dir, fs_options);
+    if (!fs.ok()) {
+        return nullptr;
+    }
+    for (size_t i = 0; i < kMaxTries; ++i) {
+        std::string uuid;
+        if (!UUID::Generate(&uuid)) {
+            continue;
+        }
+        std::string test_dir = tmp_dir + "paimon_test_" + uuid;
+        auto is_exist = fs.value()->Exists(test_dir);
+        if (!is_exist.ok() || is_exist.value()) {
+            continue;
+        }
+        auto status = fs.value()->Mkdirs(test_dir);
+        if (status.ok()) {
+            return std::unique_ptr<UniqueTestDirectory>(
+                new UniqueTestDirectory(test_dir, std::move(fs).value()));
+        }
+    }
+    return nullptr;
+}
+
+UniqueTestDirectory::~UniqueTestDirectory() {
+    auto is_exist = fs_->Exists(tmpdir_);
+    assert(is_exist.ok());
+    if (is_exist.value()) {
+        [[maybe_unused]] auto status = fs_->Delete(tmpdir_, 
/*recursive=*/true);
+        assert(status.ok());
+    }
+    fs_.reset();
+}
+
+bool TestUtil::CopyDirectory(const std::filesystem::path& source,
+                             const std::filesystem::path& destination) {
+    namespace fs = std::filesystem;
+    try {
+        if (!fs::exists(destination)) {
+            fs::create_directories(destination);
+        }
+
+        for (const auto& entry : fs::directory_iterator(source)) {
+            const auto& source_path = entry.path();
+            auto destination_path = destination / source_path.filename();
+
+            if (fs::is_directory(source_path)) {
+                CopyDirectory(source_path, destination_path);
+            } else if (fs::is_regular_file(source_path)) {
+                fs::copy(source_path, destination_path, 
fs::copy_options::overwrite_existing);
+            }
+        }
+    } catch (const std::exception& e) {
+        std::cerr << "Error while copying directory: " << e.what() << 
std::endl;
+        return false;
+    }
+    return true;
+}
+
 }  // namespace paimon::test
diff --git a/src/paimon/testing/utils/testharness.h 
b/src/paimon/testing/utils/testharness.h
index e945b75..5c92fa1 100644
--- a/src/paimon/testing/utils/testharness.h
+++ b/src/paimon/testing/utils/testharness.h
@@ -7,14 +7,13 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
@@ -29,17 +28,49 @@
 // Assert utilities is adapted from RocksDB
 // https://github.com/facebook/rocksdb/blob/main/test_util/testharness.h
 
+// Copyright 2024 Google LLC.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// UniqueTestDirectory utility is adapted from LiteRT
+// https://github.com/google-ai-edge/LiteRT/blob/main/litert/test/common.h
+
 #pragma once
 
+#include <filesystem>
+#include <map>
+#include <memory>
 #include <string>
+#include <utility>
 
 #include "gtest/gtest.h"
 #include "paimon/macros.h"
 #include "paimon/result.h"
 #include "paimon/status.h"
 
+namespace paimon {
+class FileSystem;
+class Status;
+}  // namespace paimon
+
 namespace paimon::test {
 
+std::string GetDataDir();
+std::map<std::string, std::string> GetJindoTestOptions();
+std::string GetJindoTestDir();
+
+int64_t RandomNumber(int64_t min, int64_t max);
+
 ::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s);
 
 #define ASSERT_OK(expr)                                                        
          \
@@ -75,4 +106,35 @@ namespace paimon::test {
 #define EXPECT_OK(s) EXPECT_PRED_FORMAT1(paimon::test::AssertStatus, s)
 #define EXPECT_NOK(s) EXPECT_FALSE((s).ok())
 
+class UniqueTestDirectory {
+ public:
+    static std::unique_ptr<UniqueTestDirectory> Create(const std::string& 
fs_identifier = "local");
+    ~UniqueTestDirectory();
+
+    UniqueTestDirectory(const UniqueTestDirectory&) = delete;
+    UniqueTestDirectory(UniqueTestDirectory&&) = default;
+    UniqueTestDirectory& operator=(const UniqueTestDirectory&) = delete;
+    UniqueTestDirectory& operator=(UniqueTestDirectory&&) = default;
+
+    const std::string& Str() const {
+        return tmpdir_;
+    }
+
+    std::shared_ptr<FileSystem> GetFileSystem() const {
+        return fs_;
+    }
+
+ private:
+    UniqueTestDirectory(const std::string& tmpdir, 
std::shared_ptr<FileSystem>&& fs)
+        : tmpdir_(tmpdir), fs_(std::move(fs)) {}
+    std::string tmpdir_;
+    std::shared_ptr<FileSystem> fs_;
+};
+
+class TestUtil {
+ public:
+    static bool CopyDirectory(const std::filesystem::path& source,
+                              const std::filesystem::path& destination);
+};
+
 }  // namespace paimon::test

Reply via email to