This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 348e89d feat: add BloomFilter, CRC32C, MurmurHash, varint utilities,
and DeltaVarintCompressor (#37)
348e89d is described below
commit 348e89d7a4b155e6f8c8d12c42c497c7bfcaf325
Author: lxy <[email protected]>
AuthorDate: Wed Jun 3 13:50:08 2026 +0800
feat: add BloomFilter, CRC32C, MurmurHash, varint utilities, and
DeltaVarintCompressor (#37)
---
LICENSE | 10 +
NOTICE | 3 +
src/paimon/common/data/data_define_test.cpp | 320 ++++++++++++++
src/paimon/common/utils/bloom_filter.cpp | 106 +++++
src/paimon/common/utils/bloom_filter.h | 72 ++++
src/paimon/common/utils/bloom_filter64.cpp | 107 +++++
src/paimon/common/utils/bloom_filter64.h | 75 ++++
src/paimon/common/utils/bloom_filter64_test.cpp | 83 ++++
src/paimon/common/utils/bloom_filter_test.cpp | 157 +++++++
src/paimon/common/utils/crc32c.cpp | 66 +++
src/paimon/common/utils/crc32c.h | 46 +++
src/paimon/common/utils/crc32c_test.cpp | 43 ++
.../common/utils/delta_varint_compressor.cpp | 114 +++++
src/paimon/common/utils/delta_varint_compressor.h | 52 +++
.../common/utils/delta_varint_compressor_test.cpp | 459 +++++++++++++++++++++
src/paimon/common/utils/murmurhash_utils.h | 13 +-
src/paimon/common/utils/murmurhash_utils_test.cpp | 13 +-
src/paimon/common/utils/var_length_int_utils.h | 139 +++++++
.../common/utils/var_length_int_utils_test.cpp | 144 +++++++
src/paimon/common/utils/xxhash_test.cpp | 49 +++
src/paimon/testing/utils/testharness.cpp | 170 +++++++-
src/paimon/testing/utils/testharness.h | 76 +++-
22 files changed, 2289 insertions(+), 28 deletions(-)
diff --git a/LICENSE b/LICENSE
index 3321b54..103c326 100644
--- a/LICENSE
+++ b/LICENSE
@@ -256,6 +256,16 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
--------------------------------------------------------------------------------
+This product includes code from LiteRT.
+
+* UniqueTestDirectory utility in src/paimon/testing/utils/testharness.cpp and
src/paimon/testing/utils/testharness.h
+
+Copyright: 2024 Google LLC.
+Home page: https://ai.google.dev/edge/litert
+License: https://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
This product includes code from Apache Arrow.
* Core utilities:
diff --git a/NOTICE b/NOTICE
index 303de2a..3be42fb 100644
--- a/NOTICE
+++ b/NOTICE
@@ -17,6 +17,9 @@ This product includes software from RocksDB project (Apache
2.0 and BSD 3-clause
Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+This product includes software from LiteRT project (Apache 2.0)
+Copyright 2024 Google LLC.
+
This product includes software from xxHash project (BSD 2-clause)
Copyright (C) 2012-2023 Yann Collet
diff --git a/src/paimon/common/data/data_define_test.cpp
b/src/paimon/common/data/data_define_test.cpp
new file mode 100644
index 0000000..2a76837
--- /dev/null
+++ b/src/paimon/common/data/data_define_test.cpp
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/data/data_define.h"
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+// Test case: IsVariantNull should return true for NullType
+TEST(DataDefineTest, IsVariantNullReturnsTrueForNull) {
+ VariantType null_variant = NullType{};
+ ASSERT_TRUE(DataDefine::IsVariantNull(null_variant));
+}
+
+// Test case: IsVariantNull should return false for non-NullType variants
+TEST(DataDefineTest, IsVariantNullReturnsFalseForNonNullTypes) {
+ VariantType non_null_variant = 42; // Example with int
+ ASSERT_FALSE(DataDefine::IsVariantNull(non_null_variant));
+
+ non_null_variant = true; // Example with bool
+ ASSERT_FALSE(DataDefine::IsVariantNull(non_null_variant));
+}
+
+// Test case: GetVariantValue should return valid value for matched types
+TEST(DataDefineTest, GetVariantValue) {
+ {
+ VariantType int_variant = 42; // Variant holding an int
+ const auto int_value =
DataDefine::GetVariantValue<int32_t>(int_variant);
+ ASSERT_EQ(int_value, 42);
+ }
+ {
+ VariantType bool_variant = true; // Variant holding a bool
+ const bool bool_value =
DataDefine::GetVariantValue<bool>(bool_variant);
+ ASSERT_EQ(bool_value, true);
+ }
+ {
+ auto array = std::dynamic_pointer_cast<arrow::StringArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(),
R"(["abc", "def", "hello"])")
+ .ValueOrDie());
+ VariantType view_variant = array->GetView(2); // Variant holding a
StringView
+ const auto view_value =
DataDefine::GetVariantValue<std::string_view>(view_variant);
+ ASSERT_EQ(std::string(view_value), "hello");
+ ASSERT_EQ(DataDefine::VariantValueToString(view_variant), "hello");
+ }
+}
+
+// Test case: GetVariantValue should return valid value for matching types
+TEST(DataDefineTest, GetVariantValueReturnsPointerForMatchingType) {}
+
+// Test case: VariantValueToString should handle NullType
+TEST(DataDefineTest, VariantValueToStringReturnsStringForNullType) {
+ VariantType null_variant = NullType{};
+ ASSERT_EQ(DataDefine::VariantValueToString(null_variant), "null");
+}
+
+// Test case: VariantValueToString should handle bool
+TEST(DataDefineTest, VariantValueToStringReturnsStringForBool) {
+ VariantType bool_variant = true;
+ ASSERT_EQ(DataDefine::VariantValueToString(bool_variant), "true");
+
+ bool_variant = false;
+ ASSERT_EQ(DataDefine::VariantValueToString(bool_variant), "false");
+}
+
+// Test case: VariantValueToString should handle integer types
+TEST(DataDefineTest, VariantValueToStringReturnsStringForInt) {
+ VariantType int_variant = 42;
+ ASSERT_EQ(DataDefine::VariantValueToString(int_variant), "42");
+}
+
+// Test case: VariantValueToString should handle string data (BinaryString)
+TEST(DataDefineTest, VariantValueToStringReturnsStringForBinaryString) {
+ auto pool = GetDefaultPool();
+ auto binary_str = BinaryString::FromString("Hello, world!", pool.get());
+ VariantType binary_variant = binary_str;
+ ASSERT_EQ(DataDefine::VariantValueToString(binary_variant), "Hello,
world!");
+}
+
+// Test case: VariantValueToString should handle shared_ptr<Bytes>
+TEST(DataDefineTest, VariantValueToStringReturnsStringForSharedPtrBytes) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<Bytes> bytes_ptr = Bytes::AllocateBytes("abc", pool.get());
+ VariantType bytes_variant = bytes_ptr;
+ ASSERT_EQ(DataDefine::VariantValueToString(bytes_variant), "abc");
+}
+
+// Test case: VariantValueToString should handle Timestamp
+TEST(DataDefineTest, VariantValueToStringReturnsStringForTimestamp) {
+ auto timestamp =
+ Timestamp(/*millisecond=*/1622520000000l, /*nano_of_millisecond=*/0);
// A timestamp value
+ VariantType timestamp_variant = timestamp;
+ ASSERT_EQ(DataDefine::VariantValueToString(timestamp_variant),
timestamp.ToString());
+}
+
+// Test case: VariantValueToString should handle Decimal
+TEST(DataDefineTest, VariantValueToStringReturnsStringForDecimal) {
+ Decimal decimal(38, 38,
DecimalUtils::StrToInt128("12345678998765432145678").value());
+ VariantType decimal_variant = decimal;
+ ASSERT_EQ(DataDefine::VariantValueToString(decimal_variant),
decimal.ToString());
+}
+
+// Test case: VariantValueToString should handle shared_ptr<InternalRow>
(mocking with a string)
+TEST(DataDefineTest, VariantValueToStringReturnsStringForInternalRow) {
+ std::shared_ptr<InternalRow> row_ptr = std::make_shared<BinaryRow>(0);
+ VariantType row_variant = row_ptr;
+ ASSERT_EQ(DataDefine::VariantValueToString(row_variant), "row");
+}
+
+// Test case: VariantValueToString should handle shared_ptr<InternalArray>
(mocking with a string)
+TEST(DataDefineTest, VariantValueToStringReturnsStringForInternalArray) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<InternalArray> array_ptr = std::make_shared<BinaryArray>();
+ VariantType array_variant = array_ptr;
+ ASSERT_EQ(DataDefine::VariantValueToString(array_variant), "array");
+}
+
+// Test case: GetStringView should handle all variant types and edge cases
+TEST(DataDefineTest, GetStringView) {
+ auto pool = GetDefaultPool();
+
+ {
+ // from string_view
+ std::string original = "hello world";
+ VariantType view_variant = std::string_view(original.data(),
original.size());
+ auto result = DataDefine::GetStringView(view_variant);
+ ASSERT_EQ(result, "hello world");
+ ASSERT_EQ(result.data(), original.data());
+ }
+ {
+ // from shared_ptr<Bytes>
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("test bytes",
pool.get());
+ VariantType bytes_variant = bytes;
+ auto result = DataDefine::GetStringView(bytes_variant);
+ ASSERT_EQ(std::string(result), "test bytes");
+ ASSERT_EQ(result.size(), 10);
+ }
+ {
+ // from BinaryString
+ auto binary_str = BinaryString::FromString("binary string content",
pool.get());
+ VariantType binary_variant = binary_str;
+ auto result = DataDefine::GetStringView(binary_variant);
+ ASSERT_EQ(std::string(result), "binary string content");
+ }
+ {
+ // empty string_view
+ VariantType view_variant = std::string_view();
+ auto result = DataDefine::GetStringView(view_variant);
+ ASSERT_TRUE(result.empty());
+ ASSERT_EQ(result.size(), 0);
+ }
+ {
+ // empty Bytes
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("", pool.get());
+ VariantType bytes_variant = bytes;
+ auto result = DataDefine::GetStringView(bytes_variant);
+ ASSERT_TRUE(result.empty());
+ ASSERT_EQ(result.size(), 0);
+ }
+ {
+ // empty BinaryString
+ VariantType binary_variant = BinaryString::EmptyUtf8();
+ auto result = DataDefine::GetStringView(binary_variant);
+ ASSERT_TRUE(result.empty());
+ ASSERT_EQ(result.size(), 0);
+ }
+}
+
+TEST(DataDefineTest, VariantValueToLiteral) {
+ auto pool = GetDefaultPool();
+
+ {
+ // BOOL
+ VariantType value = true;
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::BOOL));
+ ASSERT_EQ(literal.GetValue<bool>(), true);
+ }
+ {
+ // INT8
+ VariantType value = static_cast<char>(42);
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::INT8));
+ ASSERT_EQ(literal.GetValue<int8_t>(), 42);
+ }
+ {
+ // INT16
+ VariantType value = static_cast<int16_t>(1000);
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::INT16));
+ ASSERT_EQ(literal.GetValue<int16_t>(), 1000);
+ }
+ {
+ // INT32
+ VariantType value = static_cast<int32_t>(100000);
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::INT32));
+ ASSERT_EQ(literal.GetValue<int32_t>(), 100000);
+ }
+ {
+ // INT64
+ VariantType value = static_cast<int64_t>(123456789L);
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::INT64));
+ ASSERT_EQ(literal.GetValue<int64_t>(), 123456789L);
+ }
+ {
+ // FLOAT
+ VariantType value = 3.14f;
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::FLOAT));
+ ASSERT_FLOAT_EQ(literal.GetValue<float>(), 3.14f);
+ }
+ {
+ // DOUBLE
+ VariantType value = 2.718;
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::DOUBLE));
+ ASSERT_DOUBLE_EQ(literal.GetValue<double>(), 2.718);
+ }
+ {
+ // STRING from BinaryString
+ auto binary_str = BinaryString::FromString("hello", pool.get());
+ VariantType value = binary_str;
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::STRING));
+ ASSERT_EQ(literal.GetValue<std::string>(), "hello");
+ }
+ {
+ // STRING from shared_ptr<Bytes>
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("world",
pool.get());
+ VariantType value = bytes;
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::STRING));
+ ASSERT_EQ(literal.GetValue<std::string>(), "world");
+ }
+ {
+ // STRING from string_view
+ std::string original = "view_str";
+ VariantType value = std::string_view(original.data(), original.size());
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::STRING));
+ ASSERT_EQ(literal.GetValue<std::string>(), "view_str");
+ }
+ {
+ // BINARY from shared_ptr<Bytes>
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("binary_data",
pool.get());
+ VariantType value = bytes;
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::BINARY));
+ ASSERT_EQ(literal.GetValue<std::string>(), "binary_data");
+ }
+ {
+ // BINARY from BinaryString
+ auto binary_str = BinaryString::FromString("bin_str", pool.get());
+ VariantType value = binary_str;
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::BINARY));
+ ASSERT_EQ(literal.GetValue<std::string>(), "bin_str");
+ }
+ {
+ // TIMESTAMP
+ Timestamp ts(12345, 1);
+ VariantType value = ts;
+ ASSERT_OK_AND_ASSIGN(
+ auto literal, DataDefine::VariantValueToLiteral(value,
arrow::Type::type::TIMESTAMP));
+ ASSERT_EQ(literal.GetValue<Timestamp>(), ts);
+ }
+ {
+ // DECIMAL128
+ Decimal decimal(20, 3, 123456);
+ VariantType value = decimal;
+ ASSERT_OK_AND_ASSIGN(
+ auto literal, DataDefine::VariantValueToLiteral(value,
arrow::Type::type::DECIMAL128));
+ ASSERT_EQ(literal.GetValue<Decimal>(), decimal);
+ }
+ {
+ // DATE32
+ VariantType value = static_cast<int32_t>(19000);
+ ASSERT_OK_AND_ASSIGN(auto literal,
+ DataDefine::VariantValueToLiteral(value,
arrow::Type::type::DATE32));
+ ASSERT_EQ(literal.GetValue<int32_t>(), 19000);
+ }
+ {
+ // unsupported type
+ VariantType value = static_cast<int32_t>(0);
+ ASSERT_NOK_WITH_MSG(DataDefine::VariantValueToLiteral(value,
arrow::Type::type::LIST),
+ "Not support arrow type");
+ }
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/bloom_filter.cpp
b/src/paimon/common/utils/bloom_filter.cpp
new file mode 100644
index 0000000..d66420f
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter.cpp
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/bloom_filter.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cmath>
+#include <utility>
+
+namespace paimon {
+
+int32_t BloomFilter::OptimalNumOfBits(int64_t expect_entries, double fpp) {
+ if (expect_entries <= 0 || fpp <= 0.0 || fpp >= 1.0) {
+ return 0;
+ }
+ double result = -static_cast<double>(expect_entries) * log(fpp) / (log(2)
* log(2));
+ if (result > INT32_MAX) return INT32_MAX;
+ if (result < 0) return 0;
+ return static_cast<int32_t>(result);
+}
+
+int32_t BloomFilter::OptimalNumOfHashFunctions(int64_t expect_entries, int64_t
bit_size) {
+ if (expect_entries <= 0) {
+ return 1;
+ }
+ double ratio = static_cast<double>(bit_size) /
static_cast<double>(expect_entries);
+ double result = ratio * std::log(2.0);
+ return std::max(1, static_cast<int32_t>(std::round(result)));
+}
+
+std::shared_ptr<BloomFilter> BloomFilter::Create(int64_t expect_entries,
double fpp) {
+ auto bytes =
+
static_cast<int32_t>(ceil(BloomFilter::OptimalNumOfBits(expect_entries, fpp) /
8.0));
+ return std::make_shared<BloomFilter>(expect_entries, bytes);
+}
+
+BloomFilter::BloomFilter(int64_t expected_entries, int32_t byte_length)
+ : expected_entries_(expected_entries) {
+ num_hash_functions_ = OptimalNumOfHashFunctions(
+ expected_entries,
static_cast<int64_t>(static_cast<uint64_t>(byte_length) << 3));
+ bit_set_ = std::make_shared<BitSet>(byte_length);
+}
+
+Status BloomFilter::AddHash(int32_t hash1) {
+ auto hash2 = static_cast<int32_t>(static_cast<uint32_t>(hash1) >> 16);
+
+ for (int32_t i = 1; i <= num_hash_functions_; i++) {
+ // Use uint32_t arithmetic to avoid signed overflow UB (matches Java
int wrap semantics)
+ auto combined_hash =
+ static_cast<int32_t>(static_cast<uint32_t>(hash1) +
+ (static_cast<uint32_t>(i) *
static_cast<uint32_t>(hash2)));
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combined_hash < 0) {
+ combined_hash = ~combined_hash;
+ }
+ int32_t pos = combined_hash % bit_set_->BitSize();
+ PAIMON_RETURN_NOT_OK(bit_set_->Set(pos));
+ }
+ return Status::OK();
+}
+
+bool BloomFilter::TestHash(int32_t hash1) const {
+ auto hash2 = static_cast<int32_t>(static_cast<uint32_t>(hash1) >> 16);
+
+ for (int32_t i = 1; i <= num_hash_functions_; i++) {
+ // Use uint32_t arithmetic to avoid signed overflow UB (matches Java
int wrap semantics)
+ auto combined_hash =
+ static_cast<int32_t>(static_cast<uint32_t>(hash1) +
+ (static_cast<uint32_t>(i) *
static_cast<uint32_t>(hash2)));
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combined_hash < 0) {
+ combined_hash = ~combined_hash;
+ }
+ int32_t pos = combined_hash % bit_set_->BitSize();
+ if (!bit_set_->Get(pos)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+Status BloomFilter::SetMemorySegment(MemorySegment segment, int32_t offset) {
+ return bit_set_->SetMemorySegment(segment, offset);
+}
+
+void BloomFilter::Reset() {
+ bit_set_->Clear();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/utils/bloom_filter.h
b/src/paimon/common/utils/bloom_filter.h
new file mode 100644
index 0000000..8f1b327
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+
+#include "paimon/common/utils/bit_set.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// Bloom filter based on MemorySegment.
+class PAIMON_EXPORT BloomFilter {
+ public:
+ static int32_t OptimalNumOfBits(int64_t expect_entries, double fpp);
+ static int32_t OptimalNumOfHashFunctions(int64_t expect_entries, int64_t
bit_size);
+ static std::shared_ptr<BloomFilter> Create(int64_t expect_entries, double
fpp);
+
+ public:
+ BloomFilter(int64_t expected_entries, int32_t byte_length);
+
+ int32_t GetNumHashFunctions() const {
+ return num_hash_functions_;
+ }
+
+ int64_t ExpectedEntries() const {
+ return expected_entries_;
+ }
+
+ int64_t ByteLength() const {
+ return bit_set_->ByteLength();
+ }
+
+ std::shared_ptr<BitSet> GetBitSet() const {
+ return bit_set_;
+ }
+
+ Status SetMemorySegment(MemorySegment segment, int32_t offset = 0);
+
+ Status AddHash(int32_t hash1);
+
+ bool TestHash(int32_t hash1) const;
+
+ void Reset();
+
+ private:
+ static constexpr int32_t BYTE_SIZE = 8;
+
+ private:
+ int64_t expected_entries_;
+ int32_t num_hash_functions_ = -1;
+ std::shared_ptr<BitSet> bit_set_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/utils/bloom_filter64.cpp
b/src/paimon/common/utils/bloom_filter64.cpp
new file mode 100644
index 0000000..f2685a1
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter64.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/bloom_filter64.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cmath>
+#include <utility>
+
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+class MemoryPool;
+
+BloomFilter64::BitSet::BitSet(const std::shared_ptr<Bytes>& bytes, int32_t
offset)
+ : offset_(offset), bytes_(bytes) {
+ assert(bytes_->size() > 0);
+ assert(offset_ >= 0);
+}
+
+void BloomFilter64::BitSet::Set(int32_t index) {
+ char* data = bytes_->data();
+ data[(static_cast<uint32_t>(index) >> 3) + offset_] |=
+ static_cast<char>(1u << (index & BloomFilter64::BitSet::MASK));
+}
+
+bool BloomFilter64::BitSet::Get(int32_t index) const {
+ const char* data = bytes_->data();
+ return (data[(static_cast<uint32_t>(index) >> 3) + offset_] &
+ static_cast<char>(1u << (index & BloomFilter64::BitSet::MASK))) !=
0;
+}
+
+int32_t BloomFilter64::BitSet::BitSize() const {
+ return (bytes_->size() - offset_) * BloomFilter64::BYTE_SIZE;
+}
+
+BloomFilter64::BloomFilter64(int64_t items, double fpp, const
std::shared_ptr<MemoryPool>& pool)
+ : pool_(pool) {
+ auto nb = static_cast<int32_t>(-items * std::log(fpp) / (std::log(2) *
std::log(2)));
+ num_bits_ = nb + (BloomFilter64::BYTE_SIZE - (nb %
BloomFilter64::BYTE_SIZE));
+ num_hash_functions_ = std::max(
+ 1, static_cast<int32_t>(std::round(static_cast<double>(num_bits_) /
items * std::log(2))));
+ auto bytes = std::make_shared<Bytes>(num_bits_ / BloomFilter64::BYTE_SIZE,
pool_.get());
+ bit_set_ = std::make_unique<BitSet>(bytes, /*offset=*/0);
+}
+
+BloomFilter64::BloomFilter64(int32_t num_hash_functions,
std::unique_ptr<BitSet>&& bit_set)
+ : num_bits_(bit_set->BitSize()),
+ num_hash_functions_(num_hash_functions),
+ bit_set_(std::move(bit_set)) {}
+
+void BloomFilter64::AddHash(int64_t hash64) {
+ auto hash1 = static_cast<int32_t>(hash64);
+ auto hash2 = static_cast<int32_t>(static_cast<uint64_t>(hash64) >> 32);
+
+ for (int32_t i = 1; i <= num_hash_functions_; i++) {
+ // Use uint32_t arithmetic to avoid signed overflow UB (matches Java
int wrap semantics)
+ auto combined_hash =
+ static_cast<int32_t>(static_cast<uint32_t>(hash1) +
+ (static_cast<uint32_t>(i) *
static_cast<uint32_t>(hash2)));
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combined_hash < 0) {
+ combined_hash = ~combined_hash;
+ }
+ int32_t pos = combined_hash % num_bits_;
+ bit_set_->Set(pos);
+ }
+}
+
+bool BloomFilter64::TestHash(int64_t hash64) const {
+ auto hash1 = static_cast<int32_t>(hash64);
+ auto hash2 = static_cast<int32_t>(static_cast<uint64_t>(hash64) >> 32);
+
+ for (int32_t i = 1; i <= num_hash_functions_; i++) {
+ // Use uint32_t arithmetic to avoid signed overflow UB (matches Java
int wrap semantics)
+ auto combined_hash =
+ static_cast<int32_t>(static_cast<uint32_t>(hash1) +
+ (static_cast<uint32_t>(i) *
static_cast<uint32_t>(hash2)));
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combined_hash < 0) {
+ combined_hash = ~combined_hash;
+ }
+ int32_t pos = combined_hash % num_bits_;
+ if (!bit_set_->Get(pos)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/utils/bloom_filter64.h
b/src/paimon/common/utils/bloom_filter64.h
new file mode 100644
index 0000000..3ba4e86
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter64.h
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+
+#include "paimon/memory/bytes.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Bytes;
+class MemoryPool;
+
+/// Bloom filter 64 handle 64 bits hash.
+class PAIMON_EXPORT BloomFilter64 {
+ public:
+ BloomFilter64(int64_t items, double fpp, const
std::shared_ptr<MemoryPool>& pool);
+ class BitSet;
+
+ BloomFilter64(int32_t num_hash_functions, std::unique_ptr<BitSet>&&
bit_set);
+
+ void AddHash(int64_t hash64);
+
+ bool TestHash(int64_t hash64) const;
+
+ int32_t GetNumHashFunctions() const {
+ return num_hash_functions_;
+ }
+
+ const BitSet& GetBitSet() const {
+ return *bit_set_;
+ }
+
+ class BitSet {
+ public:
+ BitSet(const std::shared_ptr<Bytes>& bytes, int32_t offset);
+ void Set(int32_t index);
+ bool Get(int32_t index) const;
+ int32_t BitSize() const;
+
+ private:
+ static constexpr int8_t MASK = 0x07;
+
+ private:
+ int32_t offset_;
+ std::shared_ptr<Bytes> bytes_;
+ };
+
+ private:
+ static constexpr int32_t BYTE_SIZE = 8;
+
+ private:
+ int32_t num_bits_ = -1;
+ int32_t num_hash_functions_ = -1;
+ std::shared_ptr<MemoryPool> pool_;
+ std::unique_ptr<BitSet> bit_set_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/utils/bloom_filter64_test.cpp
b/src/paimon/common/utils/bloom_filter64_test.cpp
new file mode 100644
index 0000000..aa548fb
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter64_test.cpp
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/bloom_filter64.h"
+
+#include <cstring>
+#include <limits>
+#include <random>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace paimon::test {
+
+TEST(BloomFilter64Test, TestSimple) {
+ int32_t items = 10000;
+ auto pool = GetDefaultPool();
+ BloomFilter64 bloom_filter(items, 0.02, pool);
+ std::mt19937_64 engine(std::random_device{}()); //
NOLINT(whitespace/braces)
+ std::uniform_int_distribution<int64_t>
distribution(std::numeric_limits<int64_t>::min(),
+
std::numeric_limits<int64_t>::max());
+ std::set<int64_t> test_data;
+ for (int32_t i = 0; i < items; i++) {
+ int64_t random = distribution(engine);
+ test_data.insert(random);
+ bloom_filter.AddHash(random);
+ }
+
+ for (const auto& value : test_data) {
+ ASSERT_TRUE(bloom_filter.TestHash(value));
+ }
+
+ // test false positive
+ int32_t false_positives = 0;
+ int32_t num = 1000000;
+ for (int32_t i = 0; i < num; i++) {
+ int64_t random = distribution(engine);
+ if (bloom_filter.TestHash(random) && test_data.find(random) ==
test_data.end()) {
+ false_positives++;
+ }
+ }
+ ASSERT_TRUE(static_cast<double>(false_positives) / num < 0.03);
+}
+
+TEST(BloomFilter64Test, TestCompatibleWithJava) {
+ // data: -10, -5, 0, 13, 100, 200, 500
+ std::vector<uint8_t> se_bytes = {241, 255, 17, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+ auto pool = GetDefaultPool();
+ auto bytes = std::make_shared<Bytes>(se_bytes.size(), pool.get());
+ memcpy(bytes->data(), reinterpret_cast<char*>(se_bytes.data()),
bytes->size());
+ auto bit_set = std::make_unique<BloomFilter64::BitSet>(bytes,
/*offset=*/0);
+ BloomFilter64 bloom_filter(/*num_hash_functions=*/7, std::move(bit_set));
+ std::vector<int64_t> expected_data = {-10, -5, 0, 13, 100, 200, 500};
+ for (const auto& value : expected_data) {
+ ASSERT_TRUE(bloom_filter.TestHash(value));
+ }
+
+ BloomFilter64 bloom_filter2(10, 0.01, pool);
+ ASSERT_EQ(7, bloom_filter2.GetNumHashFunctions());
+ ASSERT_EQ(se_bytes.size() * BloomFilter64::BYTE_SIZE,
bloom_filter2.num_bits_);
+ ASSERT_EQ(se_bytes.size(), bloom_filter2.GetBitSet().bytes_->size());
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/bloom_filter_test.cpp
b/src/paimon/common/utils/bloom_filter_test.cpp
new file mode 100644
index 0000000..fdd6abb
--- /dev/null
+++ b/src/paimon/common/utils/bloom_filter_test.cpp
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/bloom_filter.h"
+
+#include <cstring>
+#include <limits>
+#include <random>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(BloomFilterTest, TestOneSegmentBuilder) {
+ int32_t items = 100;
+ auto pool = GetDefaultPool();
+ auto bloom_filter = BloomFilter::Create(items, 0.01);
+ auto seg = MemorySegment::AllocateHeapMemory(1024, pool.get());
+ ASSERT_OK(bloom_filter->SetMemorySegment(seg));
+
+ std::mt19937_64 engine(std::random_device{}()); //
NOLINT(whitespace/braces)
+ std::uniform_int_distribution<int32_t> distribution(0, items);
+ std::set<int32_t> test_data;
+ for (int32_t i = 0; i < items; i++) {
+ int32_t random = distribution(engine);
+ test_data.insert(random);
+ ASSERT_OK(bloom_filter->AddHash(random));
+ }
+
+ for (const auto& value : test_data) {
+ ASSERT_TRUE(bloom_filter->TestHash(value));
+ }
+}
+
+TEST(BloomFilterTest, TestEstimatedHashFunctions) {
+ ASSERT_EQ(7, BloomFilter::Create(1000, 0.01)->GetNumHashFunctions());
+ ASSERT_EQ(7, BloomFilter::Create(10000, 0.01)->GetNumHashFunctions());
+ ASSERT_EQ(7, BloomFilter::Create(100000, 0.01)->GetNumHashFunctions());
+ ASSERT_EQ(4, BloomFilter::Create(100000, 0.05)->GetNumHashFunctions());
+ ASSERT_EQ(7, BloomFilter::Create(1000000, 0.01)->GetNumHashFunctions());
+ ASSERT_EQ(4, BloomFilter::Create(1000000, 0.05)->GetNumHashFunctions());
+}
+
+TEST(BloomFilterTest, TestBloomNumBits) {
+ ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(0, 0));
+ ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(0, 1));
+ ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(1, 1));
+ ASSERT_EQ(7, BloomFilter::OptimalNumOfBits(1, 0.03));
+ ASSERT_EQ(72, BloomFilter::OptimalNumOfBits(10, 0.03));
+ ASSERT_EQ(729, BloomFilter::OptimalNumOfBits(100, 0.03));
+ ASSERT_EQ(7298, BloomFilter::OptimalNumOfBits(1000, 0.03));
+ ASSERT_EQ(72984, BloomFilter::OptimalNumOfBits(10000, 0.03));
+ ASSERT_EQ(729844, BloomFilter::OptimalNumOfBits(100000, 0.03));
+ ASSERT_EQ(7298440, BloomFilter::OptimalNumOfBits(1000000, 0.03));
+ ASSERT_EQ(6235224, BloomFilter::OptimalNumOfBits(1000000, 0.05));
+ ASSERT_EQ(1870567268, BloomFilter::OptimalNumOfBits(300000000, 0.05));
+ ASSERT_EQ(1437758756, BloomFilter::OptimalNumOfBits(300000000, 0.1));
+ ASSERT_EQ(432808512, BloomFilter::OptimalNumOfBits(300000000, 0.5));
+ ASSERT_EQ(1393332198, BloomFilter::OptimalNumOfBits(3000000000, 0.8));
+ ASSERT_EQ(657882327, BloomFilter::OptimalNumOfBits(3000000000, 0.9));
+ ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(3000000000, 1));
+}
+
+TEST(BloomFilterTest, TestBloomNumHashFunctions) {
+ ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(-1, -1));
+ ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(0, 0));
+ ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10, 0));
+ ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10, 10));
+ ASSERT_EQ(7, BloomFilter::OptimalNumOfHashFunctions(10, 100));
+ ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(100, 100));
+ ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000, 100));
+ ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10000, 100));
+ ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(100000, 100));
+ ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000000, 100));
+ ASSERT_EQ(3634, BloomFilter::OptimalNumOfHashFunctions(100, 64 * 1024 *
8));
+ ASSERT_EQ(363, BloomFilter::OptimalNumOfHashFunctions(1000, 64 * 1024 *
8));
+ ASSERT_EQ(36, BloomFilter::OptimalNumOfHashFunctions(10000, 64 * 1024 *
8));
+ ASSERT_EQ(4, BloomFilter::OptimalNumOfHashFunctions(100000, 64 * 1024 *
8));
+ ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000000, 64 * 1024 *
8));
+}
+
+TEST(BloomFilterTest, TestBloomFilter) {
+ int32_t items = 100;
+ auto pool = GetDefaultPool();
+ auto bloom_filter = std::make_shared<BloomFilter>(100, 1024);
+
+ std::mt19937_64 engine(std::random_device{}()); //
NOLINT(whitespace/braces)
+ std::uniform_int_distribution<int32_t> distribution(0, items);
+
+ // segments 1
+ auto seg1 = MemorySegment::AllocateHeapMemory(1024, pool.get());
+ ASSERT_OK(bloom_filter->SetMemorySegment(seg1));
+
+ std::set<int32_t> test_data1;
+ for (int32_t i = 0; i < items; i++) {
+ int32_t random = distribution(engine);
+ test_data1.insert(random);
+ ASSERT_OK(bloom_filter->AddHash(random));
+ }
+ for (const auto& value : test_data1) {
+ ASSERT_TRUE(bloom_filter->TestHash(value));
+ }
+
+ // segments 2
+ std::set<int32_t> test_data2;
+ auto seg2 = MemorySegment::AllocateHeapMemory(1024, pool.get());
+ ASSERT_OK(bloom_filter->SetMemorySegment(seg2));
+ for (int32_t i = 0; i < items; i++) {
+ int32_t random = distribution(engine);
+ test_data2.insert(random);
+ ASSERT_OK(bloom_filter->AddHash(random));
+ }
+ for (const auto& value : test_data2) {
+ ASSERT_TRUE(bloom_filter->TestHash(value));
+ }
+ // switch to segment1
+ ASSERT_OK(bloom_filter->SetMemorySegment(seg1));
+ for (const auto& value : test_data1) {
+ ASSERT_TRUE(bloom_filter->TestHash(value));
+ }
+
+ // clear segment1
+ bloom_filter->Reset();
+ for (const auto& value : test_data1) {
+ ASSERT_FALSE(bloom_filter->TestHash(value));
+ }
+
+ // switch to segment2 and clear
+ ASSERT_OK(bloom_filter->SetMemorySegment(seg2));
+ bloom_filter->Reset();
+ for (const auto& value : test_data2) {
+ ASSERT_FALSE(bloom_filter->TestHash(value));
+ }
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/crc32c.cpp
b/src/paimon/common/utils/crc32c.cpp
new file mode 100644
index 0000000..1cba744
--- /dev/null
+++ b/src/paimon/common/utils/crc32c.cpp
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/crc32c.h"
+
+#include "arrow/util/crc32.h"
+
+namespace paimon {
+uint32_t CRC32C::calculate(const char* data, size_t length, uint32_t crc) {
+#if defined(PAIMON_HAVE_SSE4_2)
+ return crc32c_hw(data, length, crc);
+#else
+ return arrow::internal::crc32(crc, data, length);
+#endif
+}
+
+#if defined(PAIMON_HAVE_SSE4_2)
+uint32_t CRC32C::crc32c_hw(const char* data, size_t length, uint32_t crc) {
+ crc = ~crc;
+
+ while (length && (reinterpret_cast<uintptr_t>(data) & 7)) {
+ crc = _mm_crc32_u8(crc, *data++);
+ length--;
+ }
+
+ while (length >= 8) {
+ crc = _mm_crc32_u64(crc, *reinterpret_cast<const uint64_t*>(data));
+ data += 8;
+ length -= 8;
+ }
+
+ while (length >= 4) {
+ crc = _mm_crc32_u32(crc, *reinterpret_cast<const uint32_t*>(data));
+ data += 4;
+ length -= 4;
+ }
+
+ while (length >= 2) {
+ crc = _mm_crc32_u16(crc, *reinterpret_cast<const uint16_t*>(data));
+ data += 2;
+ length -= 2;
+ }
+
+ while (length--) {
+ crc = _mm_crc32_u8(crc, *data++);
+ }
+
+ return ~crc;
+}
+#endif
+} // namespace paimon
diff --git a/src/paimon/common/utils/crc32c.h b/src/paimon/common/utils/crc32c.h
new file mode 100644
index 0000000..7c4db75
--- /dev/null
+++ b/src/paimon/common/utils/crc32c.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#if defined(PAIMON_HAVE_SSE4_2)
+#include <nmmintrin.h>
+#endif
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// CRC32C
+class PAIMON_EXPORT CRC32C {
+ public:
+ static uint32_t calculate(const char* data, size_t length, uint32_t crc =
0);
+
+ private:
+#if defined(PAIMON_HAVE_SSE4_2)
+ /// Simd implementation for crc32c.
+ ///
+ /// @param data data to be calculated
+ /// @param length length of data
+ /// @param crc initial crc value
+ /// @return crc32c value
+ static uint32_t crc32c_hw(const char* data, size_t length, uint32_t crc);
+#endif
+};
+} // namespace paimon
diff --git a/src/paimon/common/utils/crc32c_test.cpp
b/src/paimon/common/utils/crc32c_test.cpp
new file mode 100644
index 0000000..10dbbbf
--- /dev/null
+++ b/src/paimon/common/utils/crc32c_test.cpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/crc32c.h"
+
+#include <cstring>
+#include <limits>
+#include <random>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "arrow/util/crc32.h"
+#include "gtest/gtest.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+
+TEST(CRC32CTest, TestSimple) {
+ char a = 'a';
+ ASSERT_EQ(CRC32C::calculate(&a, 1), 3904355907);
+
+ std::string data = "hello paimon c++";
+ ASSERT_EQ(CRC32C::calculate(data.c_str(), data.size()), 1311805437);
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/delta_varint_compressor.cpp
b/src/paimon/common/utils/delta_varint_compressor.cpp
new file mode 100644
index 0000000..89f908d
--- /dev/null
+++ b/src/paimon/common/utils/delta_varint_compressor.cpp
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/delta_varint_compressor.h"
+
+#include <cstdint>
+#include <utility>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+std::vector<char> DeltaVarintCompressor::Compress(const std::vector<int64_t>&
data) {
+ if (data.empty()) {
+ return {};
+ }
+
+ // 1. Delta encoding (use unsigned subtraction to avoid signed overflow UB)
+ std::vector<int64_t> deltas;
+ deltas.reserve(data.size());
+ deltas.push_back(data[0]);
+ for (size_t i = 1; i < data.size(); i++) {
+ uint64_t unsigned_delta =
+ static_cast<uint64_t>(data[i]) - static_cast<uint64_t>(data[i -
1]);
+ deltas.push_back(static_cast<int64_t>(unsigned_delta));
+ }
+
+ // 2. ZigZag + Varint
+ std::vector<char> out;
+ out.reserve(data.size() * 10);
+ for (const auto& delta : deltas) {
+ EncodeVarint(delta, &out);
+ }
+ return out;
+}
+
+Result<std::vector<int64_t>> DeltaVarintCompressor::Decompress(const
std::vector<char>& bytes) {
+ if (bytes.empty()) {
+ return std::vector<int64_t>();
+ }
+
+ // 1. Decode ZigZag + Varint → delta
+ std::vector<int64_t> deltas;
+ deltas.reserve(bytes.size());
+ size_t pos = 0;
+ while (pos < bytes.size()) {
+ PAIMON_ASSIGN_OR_RAISE(int64_t delta, DecodeVarint(bytes, &pos));
+ deltas.push_back(delta);
+ }
+
+ // 2. Delta decoding (use unsigned addition to avoid signed overflow UB)
+ std::vector<int64_t> result(deltas.size());
+ result[0] = deltas[0];
+ for (size_t i = 1; i < result.size(); i++) {
+ uint64_t reconstructed =
+ static_cast<uint64_t>(result[i - 1]) +
static_cast<uint64_t>(deltas[i]);
+ result[i] = static_cast<int64_t>(reconstructed);
+ }
+ return result;
+}
+
+void DeltaVarintCompressor::EncodeVarint(int64_t value, std::vector<char>*
out) {
+ uint64_t tmp = ZigZag(value);
+ // Check if multiple bytes are needed
+ while (tmp & ~0x7F) {
+ // Set MSB to 1 (continuation)
+ out->push_back(static_cast<char>((tmp & 0x7F) | 0x80));
+ // Unsigned right shift
+ tmp >>= 7;
+ }
+ // Final byte with MSB set to 0
+ out->push_back(static_cast<char>(tmp));
+}
+
+Result<int64_t> DeltaVarintCompressor::DecodeVarint(const std::vector<char>&
in, size_t* pos) {
+ uint64_t result = 0;
+ int32_t shift = 0;
+ while (true) {
+ if (*pos >= in.size()) {
+ return Status::Invalid("Unexpected end of input");
+ }
+ char byte = in[(*pos)++];
+ // Extract 7 bits
+ result |= (static_cast<uint64_t>(byte & 0x7F) << shift);
+ // MSB is 0, end of encoding
+ if ((byte & 0x80) == 0) {
+ break;
+ }
+ shift += 7;
+ if (shift > 63) {
+ return Status::Invalid("Varint overflow");
+ }
+ }
+ return UnZigZag(result);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/utils/delta_varint_compressor.h
b/src/paimon/common/utils/delta_varint_compressor.h
new file mode 100644
index 0000000..a66cbcf
--- /dev/null
+++ b/src/paimon/common/utils/delta_varint_compressor.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// Combining Delta Encoding and Varints Encoding, suitable for integer
sequences that are
+/// increasing or not significantly different.
+class PAIMON_EXPORT DeltaVarintCompressor {
+ public:
+ // Compresses an int64_t array using delta encoding, ZigZag
transformation, and Varints encoding
+ static std::vector<char> Compress(const std::vector<int64_t>& data);
+ // Decompresses a byte array back to the original long array
+ static Result<std::vector<int64_t>> Decompress(const std::vector<char>&
bytes);
+
+ private:
+ // Encodes a long value using ZigZag and Varints
+ static void EncodeVarint(int64_t value, std::vector<char>* out);
+ // Decodes a Varints-encoded value and reverses ZigZag transformation
+ static Result<int64_t> DecodeVarint(const std::vector<char>& in, size_t*
pos);
+
+ inline static uint64_t ZigZag(int64_t value) {
+ return ((static_cast<uint64_t>(value) << 1) ^ (value >> 63));
+ }
+ inline static int64_t UnZigZag(uint64_t value) {
+ return (value >> 1) ^ -(value & 1);
+ }
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/utils/delta_varint_compressor_test.cpp
b/src/paimon/common/utils/delta_varint_compressor_test.cpp
new file mode 100644
index 0000000..ac0017b
--- /dev/null
+++ b/src/paimon/common/utils/delta_varint_compressor_test.cpp
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/delta_varint_compressor.h"
+
+#include <cstdio>
+#include <fstream>
+#include <limits>
+#include <memory>
+#include <random>
+#include <string>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(DeltaVarintCompressorTest, TestCompatibleWithJava) {
+ std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>();
+
+ for (int32_t i = 0; i < 5; ++i) {
+ std::string file_prefix = paimon::test::GetDataDir() +
+ "/delta_varint_compressor.data/case-000" +
std::to_string(i);
+
+ // Read original data from text file
+ std::string original_file = file_prefix + ".txt";
+ std::ifstream in(original_file);
+ std::vector<int64_t> original;
+ int64_t value;
+ while (in >> value) {
+ original.push_back(value);
+ }
+ auto compressed = DeltaVarintCompressor::Compress(original);
+
+ // Read expected compressed bytes from binary file
+ std::string binary_file = file_prefix + ".bin";
+ std::string expected_bytes;
+ ASSERT_OK(fs->ReadFile(binary_file, &expected_bytes));
+ ASSERT_EQ(expected_bytes, std::string(compressed.data(),
compressed.size()));
+
+ // Decompress and verify
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+ ASSERT_EQ(original, decompressed);
+ }
+}
+
+/// Test cases from Java Paimon
+// Test case for normal compression and decompression
+TEST(DeltaVarintCompressorTest, TestNormalCase1) {
+ std::vector<int64_t> original = {80, 50, 90, 80, 70};
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ ASSERT_EQ(6u, compressed.size());
+}
+
+TEST(DeltaVarintCompressorTest, TestNormalCase2) {
+ std::vector<int64_t> original = {100, 50, 150, 100, 200};
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ ASSERT_EQ(8u, compressed.size());
+}
+
+TEST(DeltaVarintCompressorTest, TestRandomRoundTrip) {
+ std::mt19937 gen(123456789);
+ std::uniform_int_distribution<uint64_t> dist;
+
+ for (int32_t iter = 0; iter < 10000; ++iter) {
+ std::vector<int64_t> original;
+ for (int32_t i = 0; i < 100; ++i) {
+ original.push_back(static_cast<int64_t>(dist(gen)));
+ }
+
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ }
+}
+
+// Test case for empty array
+TEST(DeltaVarintCompressorTest, TestEmptyArray) {
+ std::vector<int64_t> original;
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ ASSERT_EQ(0u, compressed.size());
+}
+
+// Test case for single-element array
+TEST(DeltaVarintCompressorTest, TestSingleElement) {
+ std::vector<int64_t> original = {42};
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ // Calculate expected size: Varint encoding for 42 (0x2A -> 1 byte)
+ ASSERT_EQ(1u, compressed.size());
+}
+
+// Test case for extreme values (INT64.MIN_VALUE and MAX_VALUE)
+TEST(DeltaVarintCompressorTest, TestExtremeValues) {
+ std::vector<int64_t> original(
+ {std::numeric_limits<int64_t>::min(),
std::numeric_limits<int64_t>::max()});
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ // Expected size: 10 bytes (MIN_VALUE) + 1 bytes (delta overflow) = 11
bytes
+ ASSERT_EQ(11u, compressed.size());
+}
+
+// Test case for negative deltas with ZigZag optimization
+TEST(DeltaVarintCompressorTest, TestNegativeDeltas) {
+ std::vector<int64_t> original = {100, -50, -150, -100};
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ // Verify ZigZag optimization: -1 → 1 (1 byte)
+ // Delta sequence: [100, -150, -100, 50] → ZigZag → Each encoded in 1-2
bytes
+ ASSERT_LE(compressed.size(), 8u);
+}
+
+// Test case for unsorted data (worse compression ratio)
+TEST(DeltaVarintCompressorTest, TestUnsortedData) {
+ std::vector<int64_t> original = {1000, 5, 9999, 12345, 6789};
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ // Larger deltas → more bytes (e.g., 9994 → 3 bytes)
+ ASSERT_GT(compressed.size(), 5u); // Worse than sorted case
+}
+
+// Test case for corrupted input (invalid Varint)
+TEST(DeltaVarintCompressorTest, TestCorruptedInput) {
+ std::vector<char> corrupted = {static_cast<char>(0x80),
static_cast<char>(0x80),
+ static_cast<char>(0x80)};
+ ASSERT_NOK(DeltaVarintCompressor::Decompress(corrupted));
+}
+
+/// Test cases from Python Paimon
+// Test case for arrays with zero values
+TEST(DeltaVarintCompressorTest, TestZeroValues) {
+ std::vector<int64_t> original = {0, 0, 0, 0, 0};
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ // All deltas are 0, so should compress very well
+ ASSERT_LE(compressed.size(), 5u);
+}
+
+// Test case for ascending sequence (optimal for delta compression)
+TEST(DeltaVarintCompressorTest, TestAscendingSequence) {
+ std::vector<int64_t> original = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ // All deltas are 1, so should compress very well
+ ASSERT_LE(compressed.size(), 15u); // Much smaller than original
+}
+
+// Test case for descending sequence
+TEST(DeltaVarintCompressorTest, TestDescendingSequence) {
+ std::vector<int64_t> original = {10, 9, 8, 7, 6, 5, 4, 3, 2, 1};
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ // All deltas are -1, should still compress well with ZigZag
+ ASSERT_LE(compressed.size(), 15u);
+}
+
+// Test case for large positive values
+TEST(DeltaVarintCompressorTest, TestLargePositiveValues) {
+ std::vector<int64_t> original = {1000000, 2000000, 3000000, 4000000};
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ // Large values but consistent deltas should still compress reasonably
+ ASSERT_GT(compressed.size(), 4u); // Will be larger due to big numbers
+}
+
+// Test case for mixed positive and negative values
+TEST(DeltaVarintCompressorTest, TestMixedPositiveNegative) {
+ std::vector<int64_t> original = {100, -200, 300, -400, 500};
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ // Mixed signs create larger deltas
+ ASSERT_GT(compressed.size(), 5u);
+}
+
+// Test that compression actually reduces size for suitable data
+TEST(DeltaVarintCompressorTest, TestCompressionEfficiency) {
+ // Create a sequence with small deltas
+ std::vector<int64_t> original;
+ std::mt19937 gen(42); // Fixed seed for reproducibility
+ std::uniform_int_distribution<int32_t> delta_dist(-10, 10);
+
+ int64_t base = 1000;
+ for (int32_t i = 0; i < 100; ++i) {
+ base += delta_dist(gen); // Small deltas
+ original.push_back(base);
+ }
+
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(original, decompressed);
+ // For small deltas, compression should be effective
+ // Original would need 8 bytes per int64_t (800 bytes), compressed should
be much smaller
+ ASSERT_LT(compressed.size(), original.size() * 4); // At least 50%
compression
+}
+
+// Test that multiple compress/decompress cycles are consistent
+TEST(DeltaVarintCompressorTest, TestRoundTripConsistency) {
+ std::vector<int64_t> original = {1, 10, 100, 1000, 10000};
+
+ // First round trip
+ auto compressed1 = DeltaVarintCompressor::Compress(original);
+ ASSERT_OK_AND_ASSIGN(auto decompressed1,
DeltaVarintCompressor::Decompress(compressed1));
+
+ // Second round trip
+ auto compressed2 = DeltaVarintCompressor::Compress(decompressed1);
+ ASSERT_OK_AND_ASSIGN(auto decompressed2,
DeltaVarintCompressor::Decompress(compressed2));
+
+ // All should be identical
+ ASSERT_EQ(original, decompressed1);
+ ASSERT_EQ(original, decompressed2);
+ ASSERT_EQ(compressed1, compressed2);
+}
+
+// Test boundary values for varint encoding
+TEST(DeltaVarintCompressorTest, TestBoundaryValues) {
+ // Test values around varint boundaries (127, 16383, etc.)
+ std::vector<int64_t> boundary_values = {0, 1, 127, 128,
255, 256, 16383,
+ 16384, 32767, 32768, -1,
-127, -128, -255,
+ -256, -16383, -16384, -32767,
-32768};
+
+ auto compressed = DeltaVarintCompressor::Compress(boundary_values);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+ ASSERT_EQ(boundary_values, decompressed);
+}
+
+// Test ZigZag encoding compatibility with Java implementation
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityZigZagEncoding) {
+ // Test cases that verify ZigZag encoding matches Java's implementation
+ // ZigZag mapping: 0->0, -1->1, 1->2, -2->3, 2->4, -3->5, 3->6, etc.
+ std::vector<std::pair<int64_t, uint64_t>> zigzag_test_cases = {
+ {0, 0}, // 0 -> 0
+ {-1, 1}, // -1 -> 1
+ {1, 2}, // 1 -> 2
+ {-2, 3}, // -2 -> 3
+ {2, 4}, // 2 -> 4
+ {-3, 5}, // -3 -> 5
+ {3, 6}, // 3 -> 6
+ {-64, 127}, // -64 -> 127
+ {64, 128}, // 64 -> 128
+ {-65, 129}, // -65 -> 129
+ };
+
+ for (const auto& [original_value, expected_zigzag] : zigzag_test_cases) {
+ // Test single value compression to verify ZigZag encoding
+ std::vector<int64_t> single_value = {original_value};
+ auto compressed = DeltaVarintCompressor::Compress(single_value);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+
+ ASSERT_EQ(single_value, decompressed)
+ << "ZigZag encoding failed for value " << original_value;
+ ASSERT_EQ(expected_zigzag,
DeltaVarintCompressor::ZigZag(original_value))
+ << "ZigZag encoding failed for value " << original_value;
+ }
+}
+
+// Test with known test vectors that should match Java implementation
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityKnownVectors) {
+ // Test vectors with expected compressed output (hexadecimal)
+ std::vector<std::pair<std::vector<int64_t>, std::string>> test_vectors = {
+ // Simple cases
+ {{0}, "00"}, // 0 -> ZigZag(0) = 0 -> Varint(0) = 0x00
+ {{1}, "02"}, // 1 -> ZigZag(1) = 2 -> Varint(2) = 0x02
+ {{-1}, "01"}, // -1 -> ZigZag(-1) = 1 -> Varint(1) = 0x01
+ {{2}, "04"}, // 2 -> ZigZag(2) = 4 -> Varint(4) = 0x04
+ {{-2}, "03"}, // -2 -> ZigZag(-2) = 3 -> Varint(3) = 0x03
+
+ // Delta encoding cases
+ {{0, 1}, "0002"}, // [0, 1] -> [0, delta=1] -> [0x00, 0x02]
+ {{1, 2}, "0202"}, // [1, 2] -> [1, delta=1] -> [0x02, 0x02]
+ {{0, -1}, "0001"}, // [0, -1] -> [0, delta=-1] -> [0x00, 0x01]
+ {{1, 0}, "0201"}, // [1, 0] -> [1, delta=-1] -> [0x02, 0x01]
+
+ // Larger values
+ {{127}, "fe01"}, // 127 -> ZigZag(127) = 254 -> Varint(254) = 0xfe01
+ {{-127}, "fd01"}, // -127 -> ZigZag(-127) = 253 -> Varint(253) =
0xfd01
+ {{128}, "8002"}, // 128 -> ZigZag(128) = 256 -> Varint(256) = 0x8002
+ {{-128}, "ff01"}, // -128 -> ZigZag(-128) = 255 -> Varint(255) =
0xff01
+ };
+
+ auto bytes_to_hex = [](const std::vector<char>& bytes) -> std::string {
+ std::string hex;
+ for (char byte : bytes) {
+ char buf[3];
+ snprintf(buf, sizeof(buf), "%02x", static_cast<unsigned
char>(byte));
+ hex += buf;
+ }
+ return hex;
+ };
+
+ for (const auto& [original, expected_hex] : test_vectors) {
+ auto compressed = DeltaVarintCompressor::Compress(original);
+ std::string actual_hex = bytes_to_hex(compressed);
+
+ ASSERT_EQ(expected_hex, actual_hex)
+ << "Binary compatibility failed for original data. "
+ << "Expected: " << expected_hex << ", Got: " << actual_hex;
+
+ // Also verify round-trip
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+ ASSERT_EQ(original, decompressed) << "Round-trip failed for original
data";
+ }
+}
+
+// Test compatibility with Java for large numbers (64-bit range)
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityLargeNumbers) {
+ // Test cases covering the full 64-bit signed integer range
+ std::vector<int64_t> large_number_cases = {
+ 2147483647LL, // Integer.MAX_VALUE
+ -2147483648LL, // Integer.MIN_VALUE
+ 9223372036854775807LL, // Long.MAX_VALUE
+ -9223372036854775807LL, // Long.MIN_VALUE + 1 (avoid overflow)
+ 4294967295LL, // 2^32 - 1
+ -4294967296LL, // -2^32
+ };
+
+ for (int64_t value : large_number_cases) {
+ // Test individual values
+ std::vector<int64_t> single_value = {value};
+ auto compressed = DeltaVarintCompressor::Compress(single_value);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+ ASSERT_EQ(single_value, decompressed) << "Large number compatibility
failed for " << value;
+ }
+
+ // Test as a sequence to verify delta encoding with large numbers
+ auto compressed_seq = DeltaVarintCompressor::Compress(large_number_cases);
+ ASSERT_OK_AND_ASSIGN(auto decompressed_seq,
DeltaVarintCompressor::Decompress(compressed_seq));
+ ASSERT_EQ(large_number_cases, decompressed_seq) << "Large number sequence
compatibility failed";
+}
+
+// Test Varint encoding boundaries that match Java implementation
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityVarintBoundaries) {
+ // Test values at Varint encoding boundaries
+ std::vector<int64_t> varint_boundary_cases = {
+ // 1-byte Varint boundary
+ 63, // ZigZag(63) = 126, fits in 1 byte
+ 64, // ZigZag(64) = 128, needs 2 bytes
+ -64, // ZigZag(-64) = 127, fits in 1 byte
+ -65, // ZigZag(-65) = 129, needs 2 bytes
+
+ // 2-byte Varint boundary
+ 8191, // ZigZag(8191) = 16382, fits in 2 bytes
+ 8192, // ZigZag(8192) = 16384, needs 3 bytes
+ -8192, // ZigZag(-8192) = 16383, fits in 2 bytes
+ -8193, // ZigZag(-8193) = 16385, needs 3 bytes
+
+ // 3-byte Varint boundary
+ 1048575, // ZigZag(1048575) = 2097150, fits in 3 bytes
+ 1048576, // ZigZag(1048576) = 2097152, needs 4 bytes
+ };
+
+ for (int64_t value : varint_boundary_cases) {
+ std::vector<int64_t> single_value = {value};
+ auto compressed = DeltaVarintCompressor::Compress(single_value);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+ ASSERT_EQ(single_value, decompressed)
+ << "Varint boundary compatibility failed for " << value;
+ }
+}
+
+// Test delta encoding edge cases for Java compatibility
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityDeltaEdgeCases) {
+ // Edge cases that test delta encoding behavior
+ std::vector<std::vector<int64_t>> delta_edge_cases = {
+ // Maximum positive delta
+ {0, std::numeric_limits<int64_t>::max()},
+ // Maximum negative delta
+ {std::numeric_limits<int64_t>::max(), 0},
+ // Alternating large deltas
+ {0, 1000000, -1000000, 2000000, -2000000},
+ // Sequence with zero deltas
+ {42, 42, 42, 42},
+ // Mixed small and large deltas
+ {0, 1, 1000000, 1000001, 0},
+ };
+
+ for (const auto& test_case : delta_edge_cases) {
+ auto compressed = DeltaVarintCompressor::Compress(test_case);
+ ASSERT_OK_AND_ASSIGN(auto decompressed,
DeltaVarintCompressor::Decompress(compressed));
+ ASSERT_EQ(test_case, decompressed) << "Delta edge case compatibility
failed";
+ }
+}
+
+// Test error conditions that should match Java behavior
+TEST(DeltaVarintCompressorTest, TestJavaCompatibilityErrorConditions) {
+ // Test cases for error handling - our implementation gracefully handles
+ // truncated data by returning errors, which is acceptable behavior
+
+ // Test with various truncated/invalid byte sequences
+ std::vector<std::vector<char>> invalid_cases = {
+ {static_cast<char>(0x80)}, // Single
incomplete byte
+ {static_cast<char>(0x80), static_cast<char>(0x80)}, // Incomplete
3-byte varint
+ {static_cast<char>(0xFF), static_cast<char>(0xFF),
static_cast<char>(0xFF),
+ static_cast<char>(0xFF), static_cast<char>(0xFF),
static_cast<char>(0xFF),
+ static_cast<char>(0xFF), static_cast<char>(0xFF),
static_cast<char>(0xFF),
+ static_cast<char>(0x80)}, // Long sequence
+ };
+
+ for (const auto& invalid_data : invalid_cases) {
+ // Our implementation handles invalid data by returning an error
+ // This is acceptable behavior for robustness
+ auto result = DeltaVarintCompressor::Decompress(invalid_data);
+ ASSERT_NOK(result) << "Should return error for invalid data";
+ }
+
+ // Test that valid empty input returns empty list
+ std::vector<char> empty_input;
+ ASSERT_OK_AND_ASSIGN(auto empty_result,
DeltaVarintCompressor::Decompress(empty_input));
+ ASSERT_TRUE(empty_result.empty()) << "Empty input should return empty
list";
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/murmurhash_utils.h
b/src/paimon/common/utils/murmurhash_utils.h
index cebf41b..52831c9 100644
--- a/src/paimon/common/utils/murmurhash_utils.h
+++ b/src/paimon/common/utils/murmurhash_utils.h
@@ -7,14 +7,13 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
/*
diff --git a/src/paimon/common/utils/murmurhash_utils_test.cpp
b/src/paimon/common/utils/murmurhash_utils_test.cpp
index b4a475f..d0c09ea 100644
--- a/src/paimon/common/utils/murmurhash_utils_test.cpp
+++ b/src/paimon/common/utils/murmurhash_utils_test.cpp
@@ -7,14 +7,13 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
#include "paimon/common/utils/murmurhash_utils.h"
diff --git a/src/paimon/common/utils/var_length_int_utils.h
b/src/paimon/common/utils/var_length_int_utils.h
new file mode 100644
index 0000000..ec76460
--- /dev/null
+++ b/src/paimon/common/utils/var_length_int_utils.h
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <cstring>
+
+#include "fmt/format.h"
+#include "paimon/macros.h"
+#include "paimon/result.h"
+namespace paimon {
+
+/// Variable-length integer encoding/decoding utilities.
+///
+/// Encoding format (same as protobuf unsigned varint):
+/// - Each byte stores 7 payload bits in bits [6:0].
+/// - Bit 7 (0x80) is the continuation flag: 1 = more bytes follow, 0 = last
byte.
+/// - A varint32 uses at most 5 bytes; a varint64 uses at most 9 bytes.
+///
+/// Based on the LongPacker from PalDB (https://github.com/linkedin/PalDB),
+/// licensed under Apache 2.0.
+class VarLengthIntUtils {
+ public:
+ VarLengthIntUtils() = delete;
+ ~VarLengthIntUtils() = delete;
+
+ static constexpr int32_t kMaxVarIntSize = 5;
+ static constexpr int32_t kMaxVarLongSize = 9;
+
+ // ==================== Encoding (writes to char*) ====================
+
+ /// Encodes a non-negative int32 as varint into `dest`.
+ /// Returns the number of bytes written.
+ static Result<int32_t> EncodeInt(int32_t value, char* dest) {
+ if (PAIMON_UNLIKELY(value < 0)) {
+ return Status::Invalid(
+ fmt::format("negative value: v={} for VarLengthInt Encoding",
value));
+ }
+ int32_t num_bytes = 0;
+ while ((value & ~0x7F) != 0) {
+ dest[num_bytes] = static_cast<char>((value & 0x7F) | 0x80);
+ value >>= 7;
+ ++num_bytes;
+ }
+ dest[num_bytes] = static_cast<char>(value);
+ return num_bytes + 1;
+ }
+
+ /// Encodes a non-negative int64 as varint into `dest`.
+ /// Returns the number of bytes written.
+ static Result<int32_t> EncodeLong(int64_t value, char* dest) {
+ if (PAIMON_UNLIKELY(value < 0)) {
+ return Status::Invalid(
+ fmt::format("negative value: v={} for VarLengthInt Encoding",
value));
+ }
+ int32_t num_bytes = 0;
+ while ((value & ~0x7FLL) != 0) {
+ dest[num_bytes] = static_cast<char>(static_cast<int32_t>(value &
0x7F) | 0x80);
+ value >>= 7;
+ ++num_bytes;
+ }
+ dest[num_bytes] = static_cast<char>(value);
+ return num_bytes + 1;
+ }
+
+ // ==================== Decoding (reads from const char*)
====================
+
+ /// Decodes a varint32 from `data` at `*offset`, advancing `*offset` past
the consumed bytes.
+ /// Inlines a 1-byte fast path (values 0-127), which is the most common
case.
+ static inline Result<int32_t> DecodeInt(const char* data, int32_t* offset)
{
+ auto first_byte = static_cast<uint8_t>(data[*offset]);
+ if (PAIMON_LIKELY((first_byte & 0x80) == 0)) {
+ ++(*offset);
+ return static_cast<int32_t>(first_byte);
+ }
+ // Multi-byte: fall through to generic loop.
+ // NOTE: EncodeInt only encodes non-negative values, so a decoded
negative result
+ // indicates malformed data.
+ uint32_t result = 0;
+ for (int32_t shift = 0; shift < 32; shift += 7) {
+ auto byte_val = static_cast<uint8_t>(data[*offset]);
+ ++(*offset);
+ result |= static_cast<uint32_t>(byte_val & 0x7F) << shift;
+ if ((byte_val & 0x80) == 0) {
+ auto signed_result = static_cast<int32_t>(result);
+ if (PAIMON_UNLIKELY(signed_result < 0)) {
+ return Status::Invalid("Malformed varint32: decoded
negative value");
+ }
+ return signed_result;
+ }
+ }
+ return Status::Invalid("Malformed varint32: too many continuation
bytes");
+ }
+
+ /// Decodes a varint64 from `data` at `*offset`, advancing `*offset` past
the consumed bytes.
+ /// Inlines a 1-byte fast path (values 0-127), which is the most common
case.
+ static inline Result<int64_t> DecodeLong(const char* data, int32_t*
offset) {
+ auto first_byte = static_cast<uint8_t>(data[*offset]);
+ if (PAIMON_LIKELY((first_byte & 0x80) == 0)) {
+ ++(*offset);
+ return static_cast<int64_t>(first_byte);
+ }
+ // Multi-byte: fall through to generic loop.
+ // NOTE: EncodeLong only encodes non-negative values, so a decoded
negative result
+ // indicates malformed data.
+ uint64_t result = 0;
+ for (int32_t shift = 0; shift < 64; shift += 7) {
+ auto byte_val = static_cast<uint8_t>(data[*offset]);
+ ++(*offset);
+ result |= static_cast<uint64_t>(byte_val & 0x7F) << shift;
+ if ((byte_val & 0x80) == 0) {
+ auto signed_result = static_cast<int64_t>(result);
+ if (PAIMON_UNLIKELY(signed_result < 0)) {
+ return Status::Invalid("Malformed varint64: decoded
negative value");
+ }
+ return signed_result;
+ }
+ }
+ return Status::Invalid("Malformed varint64: too many continuation
bytes");
+ }
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/utils/var_length_int_utils_test.cpp
b/src/paimon/common/utils/var_length_int_utils_test.cpp
new file mode 100644
index 0000000..c2404e2
--- /dev/null
+++ b/src/paimon/common/utils/var_length_int_utils_test.cpp
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/var_length_int_utils.h"
+
+#include <cstring>
+#include <limits>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(VarLengthIntUtilsTest, TestEncodeAndDecodeInt) {
+ std::vector<int32_t> test_values = {0, 1, 127, 128, 255,
256,
+ 1000, 10000, 100000, 1000000,
10000000, 2147483647};
+
+ for (int32_t value : test_values) {
+ char buffer[VarLengthIntUtils::kMaxVarIntSize];
+ std::memset(buffer, 0, sizeof(buffer));
+
+ ASSERT_OK_AND_ASSIGN(int32_t encoded_length,
VarLengthIntUtils::EncodeInt(value, buffer));
+
+ int32_t offset = 0;
+ ASSERT_OK_AND_ASSIGN(int32_t decoded_value,
VarLengthIntUtils::DecodeInt(buffer, &offset));
+
+ ASSERT_EQ(value, decoded_value) << "Encoded and decoded values don't
match for: " << value;
+ ASSERT_EQ(offset, encoded_length) << "Offset doesn't match encoded
length for: " << value;
+ }
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeAndDecodeLong) {
+ std::vector<int64_t> test_values = {0ll,
+ 127ll,
+ 128ll,
+ 16383ll,
+ 16384ll,
+ 2097151ll,
+ 2097152ll,
+ 268435455ll,
+ 268435456ll,
+ 1234567890123456789ll,
+ 202405170000000000ll,
+ 999999999999999999ll,
+ std::numeric_limits<int64_t>::max()};
+
+ for (int64_t value : test_values) {
+ char buffer[VarLengthIntUtils::kMaxVarLongSize + 1];
+ std::memset(buffer, 0, sizeof(buffer));
+
+ ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t encoded_length,
+ VarLengthIntUtils::EncodeLong(value, buffer));
+
+ int32_t offset = 0;
+ ASSERT_OK_AND_ASSIGN(int64_t decoded_value,
VarLengthIntUtils::DecodeLong(buffer, &offset));
+
+ ASSERT_EQ(value, decoded_value) << "Encoded and decoded values don't
match for: " << value;
+ }
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeNegativeValue) {
+ char buffer[VarLengthIntUtils::kMaxVarIntSize];
+ ASSERT_NOK_WITH_MSG(VarLengthIntUtils::EncodeInt(-1, buffer),
+ "negative value: v=-1 for VarLengthInt Encoding");
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeNegativeLongValue) {
+ char buffer[VarLengthIntUtils::kMaxVarLongSize + 1];
+ ASSERT_NOK_WITH_MSG(VarLengthIntUtils::EncodeLong(-1, buffer),
+ "negative value: v=-1 for VarLengthInt Encoding");
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeIntSequential) {
+ char buffer[VarLengthIntUtils::kMaxVarIntSize * 2];
+ std::memset(buffer, 0, sizeof(buffer));
+ int32_t value1 = 100;
+ int32_t value2 = 200;
+
+ ASSERT_OK_AND_ASSIGN(auto length1, VarLengthIntUtils::EncodeInt(value1,
buffer));
+ ASSERT_OK_AND_ASSIGN([[maybe_unused]] auto length2,
+ VarLengthIntUtils::EncodeInt(value2, buffer +
length1));
+
+ int32_t offset = 0;
+ ASSERT_OK_AND_ASSIGN(int32_t decoded_result1,
VarLengthIntUtils::DecodeInt(buffer, &offset));
+ ASSERT_EQ(value1, decoded_result1);
+ ASSERT_OK_AND_ASSIGN(int32_t decoded_result2,
VarLengthIntUtils::DecodeInt(buffer, &offset));
+ ASSERT_EQ(value2, decoded_result2);
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeBytesNumber) {
+ std::vector<int32_t> values = {
+ 0x7F, // 127 - fits in 1 byte
+ 0x80, // 128 - needs 2 bytes
+ 0x4000, // 16384 - needs 3 bytes
+ 0x200000, // 2097152 - needs 4 bytes
+ 2147483647 // 2147483647 - needs 5 bytes
+ };
+
+ for (int32_t i = 0; i < static_cast<int32_t>(values.size()); ++i) {
+ char buffer[VarLengthIntUtils::kMaxVarIntSize];
+ std::memset(buffer, 0, sizeof(buffer));
+ ASSERT_OK_AND_ASSIGN(int32_t encoded_length,
+ VarLengthIntUtils::EncodeInt(values[i], buffer));
+ ASSERT_EQ(encoded_length, i + 1);
+ }
+}
+
+TEST(VarLengthIntUtilsTest, TestEncodeLongBytesNumber) {
+ std::vector<int64_t> values = {
+ 0x7F, // 127 - fits in 1 byte
+ 0x80, // 128 - needs 2 bytes
+ 0x4000, // 16384 - needs 3 bytes
+ 0x200000, // 2097152 - needs 4 bytes
+ 2147483647, // 2147483647 - needs 5 bytes
+ 34359738368ll, // 0x800000000 - needs 6 bytes
+ 562949953421311ll, // 0x1FFFFFFFFFFFFF - needs 7
bytes
+ 72057594037927935ll, // 0xFFFFFFFFFFFFFF - needs 8
bytes
+ std::numeric_limits<int64_t>::max() // needs 9 bytes
+ };
+
+ for (int32_t i = 0; i < static_cast<int32_t>(values.size()); ++i) {
+ char buffer[VarLengthIntUtils::kMaxVarLongSize + 1];
+ std::memset(buffer, 0, sizeof(buffer));
+ ASSERT_OK_AND_ASSIGN(int32_t encoded_length,
+ VarLengthIntUtils::EncodeLong(values[i], buffer));
+ ASSERT_EQ(encoded_length, i + 1) << values[i];
+ }
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/xxhash_test.cpp
b/src/paimon/common/utils/xxhash_test.cpp
new file mode 100644
index 0000000..5d2daca
--- /dev/null
+++ b/src/paimon/common/utils/xxhash_test.cpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "xxhash.h" // NOLINT(build/include_subdir)
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(XXHashTest, TestCompatibleWithJava) {
+ auto file_system = std::make_unique<LocalFileSystem>();
+ auto file_name = paimon::test::GetDataDir() + "/xxhash.data";
+ std::string bytes;
+ ASSERT_OK(file_system->ReadFile(file_name, &bytes));
+ auto lines = StringUtils::Split(bytes, "\n");
+ // 1000 random str and empty str
+ ASSERT_EQ(1001, lines.size());
+ for (const auto& line : lines) {
+ auto data_and_hash = StringUtils::Split(line, ",",
/*ignore_empty=*/false);
+ ASSERT_EQ(2, data_and_hash.size());
+ const auto& str = data_and_hash[0];
+ int64_t expected_hash = std::stoull(data_and_hash[1], nullptr, 16);
+ ASSERT_EQ(expected_hash, XXH64(str.data(), str.size(), /*seed=*/0));
+ }
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/testing/utils/testharness.cpp
b/src/paimon/testing/utils/testharness.cpp
index fa62690..9c61ac0 100644
--- a/src/paimon/testing/utils/testharness.cpp
+++ b/src/paimon/testing/utils/testharness.cpp
@@ -7,14 +7,13 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
@@ -29,12 +28,104 @@
// Assert utilities is adapted from RocksDB
// https://github.com/facebook/rocksdb/blob/main/test_util/testharness.cc
+// Copyright 2024 Google LLC.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// UniqueTestDirectory utility is adapted from LiteRT
+// https://github.com/google-ai-edge/LiteRT/blob/main/litert/test/common.cc
+
#include "paimon/testing/utils/testharness.h"
+#include <unistd.h>
+
+#include <algorithm>
+#include <cassert>
+#include <cstdlib>
+#include <exception>
+#include <fstream>
+#include <iostream>
+#include <random>
+#include <vector>
+
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/common/utils/uuid.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/file_system_factory.h"
#include "paimon/status.h"
namespace paimon::test {
+std::string GetDataDir() {
+ const auto result = std::getenv("PAIMON_TEST_DATA");
+ if (!result || !result[0]) {
+ return "test/test_data/";
+ }
+ return std::string(result);
+}
+
+std::map<std::string, std::string> GetJindoTestOptions() {
+ const char* home = std::getenv("HOME");
+ std::string home_str = "";
+ if (home) {
+ home_str = std::string(home);
+ }
+ std::string config_file_path = home_str + "/.osscredentials";
+ std::ifstream config_file(config_file_path);
+ std::string access_key_id = "";
+ std::string access_key_secret = "";
+ if (config_file.is_open()) {
+ std::string line;
+ while (std::getline(config_file, line)) {
+ std::vector<std::string> key_value = StringUtils::Split(line, "=");
+ if (key_value.size() != 2) {
+ continue;
+ }
+ if (key_value[0].find("accessid") != std::string::npos) {
+ StringUtils::Trim(&key_value[1]);
+ access_key_id = key_value[1];
+ }
+ if (key_value[0].find("accesskey") != std::string::npos) {
+ StringUtils::Trim(&key_value[1]);
+ access_key_secret = key_value[1];
+ }
+ }
+ config_file.close();
+ }
+ std::map<std::string, std::string> options = {
+ {"fs.oss.bucket.paimon-unittest.endpoint",
"oss-cn-hangzhou-zmf.aliyuncs.com"},
+ {"fs.oss.bucket.paimon-unittest.accessKeyId", access_key_id},
+ {"fs.oss.bucket.paimon-unittest.accessKeySecret", access_key_secret},
+ {"fs.oss.user", "paimon"},
+ };
+ return options;
+}
+std::string GetJindoTestDir() {
+ static const std::string dir = "oss://paimon-unittest/temp/";
+ return dir;
+}
+
+int64_t RandomNumber(int64_t min, int64_t max) {
+ static thread_local std::mt19937 generator(
+ std::random_device{}()); // NOLINT(whitespace/braces)
+ std::uniform_int_distribution<int64_t> distribution(min, max);
+ return distribution(generator);
+}
+
+std::string GetPidStr() {
+ return std::to_string(getpid());
+}
+
::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s) {
if (s.ok()) {
return ::testing::AssertionSuccess();
@@ -43,4 +134,69 @@ namespace paimon::test {
}
}
+std::unique_ptr<UniqueTestDirectory> UniqueTestDirectory::Create(const
std::string& fs_identifier) {
+ static const size_t kMaxTries = 1000;
+ std::string tmp_dir = fs_identifier == "jindo"
+ ? GetJindoTestDir()
+ :
std::filesystem::temp_directory_path().string() + "/";
+ std::map<std::string, std::string> fs_options =
+ fs_identifier == "jindo" ? GetJindoTestOptions() :
std::map<std::string, std::string>();
+ auto fs = FileSystemFactory::Get(fs_identifier, tmp_dir, fs_options);
+ if (!fs.ok()) {
+ return nullptr;
+ }
+ for (size_t i = 0; i < kMaxTries; ++i) {
+ std::string uuid;
+ if (!UUID::Generate(&uuid)) {
+ continue;
+ }
+ std::string test_dir = tmp_dir + "paimon_test_" + uuid;
+ auto is_exist = fs.value()->Exists(test_dir);
+ if (!is_exist.ok() || is_exist.value()) {
+ continue;
+ }
+ auto status = fs.value()->Mkdirs(test_dir);
+ if (status.ok()) {
+ return std::unique_ptr<UniqueTestDirectory>(
+ new UniqueTestDirectory(test_dir, std::move(fs).value()));
+ }
+ }
+ return nullptr;
+}
+
+UniqueTestDirectory::~UniqueTestDirectory() {
+ auto is_exist = fs_->Exists(tmpdir_);
+ assert(is_exist.ok());
+ if (is_exist.value()) {
+ [[maybe_unused]] auto status = fs_->Delete(tmpdir_,
/*recursive=*/true);
+ assert(status.ok());
+ }
+ fs_.reset();
+}
+
+bool TestUtil::CopyDirectory(const std::filesystem::path& source,
+ const std::filesystem::path& destination) {
+ namespace fs = std::filesystem;
+ try {
+ if (!fs::exists(destination)) {
+ fs::create_directories(destination);
+ }
+
+ for (const auto& entry : fs::directory_iterator(source)) {
+ const auto& source_path = entry.path();
+ auto destination_path = destination / source_path.filename();
+
+ if (fs::is_directory(source_path)) {
+ CopyDirectory(source_path, destination_path);
+ } else if (fs::is_regular_file(source_path)) {
+ fs::copy(source_path, destination_path,
fs::copy_options::overwrite_existing);
+ }
+ }
+ } catch (const std::exception& e) {
+ std::cerr << "Error while copying directory: " << e.what() <<
std::endl;
+ return false;
+ }
+ return true;
+}
+
} // namespace paimon::test
diff --git a/src/paimon/testing/utils/testharness.h
b/src/paimon/testing/utils/testharness.h
index e945b75..5c92fa1 100644
--- a/src/paimon/testing/utils/testharness.h
+++ b/src/paimon/testing/utils/testharness.h
@@ -7,14 +7,13 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
@@ -29,17 +28,49 @@
// Assert utilities is adapted from RocksDB
// https://github.com/facebook/rocksdb/blob/main/test_util/testharness.h
+// Copyright 2024 Google LLC.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// UniqueTestDirectory utility is adapted from LiteRT
+// https://github.com/google-ai-edge/LiteRT/blob/main/litert/test/common.h
+
#pragma once
+#include <filesystem>
+#include <map>
+#include <memory>
#include <string>
+#include <utility>
#include "gtest/gtest.h"
#include "paimon/macros.h"
#include "paimon/result.h"
#include "paimon/status.h"
+namespace paimon {
+class FileSystem;
+class Status;
+} // namespace paimon
+
namespace paimon::test {
+std::string GetDataDir();
+std::map<std::string, std::string> GetJindoTestOptions();
+std::string GetJindoTestDir();
+
+int64_t RandomNumber(int64_t min, int64_t max);
+
::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s);
#define ASSERT_OK(expr)
\
@@ -75,4 +106,35 @@ namespace paimon::test {
#define EXPECT_OK(s) EXPECT_PRED_FORMAT1(paimon::test::AssertStatus, s)
#define EXPECT_NOK(s) EXPECT_FALSE((s).ok())
+class UniqueTestDirectory {
+ public:
+ static std::unique_ptr<UniqueTestDirectory> Create(const std::string&
fs_identifier = "local");
+ ~UniqueTestDirectory();
+
+ UniqueTestDirectory(const UniqueTestDirectory&) = delete;
+ UniqueTestDirectory(UniqueTestDirectory&&) = default;
+ UniqueTestDirectory& operator=(const UniqueTestDirectory&) = delete;
+ UniqueTestDirectory& operator=(UniqueTestDirectory&&) = default;
+
+ const std::string& Str() const {
+ return tmpdir_;
+ }
+
+ std::shared_ptr<FileSystem> GetFileSystem() const {
+ return fs_;
+ }
+
+ private:
+ UniqueTestDirectory(const std::string& tmpdir,
std::shared_ptr<FileSystem>&& fs)
+ : tmpdir_(tmpdir), fs_(std::move(fs)) {}
+ std::string tmpdir_;
+ std::shared_ptr<FileSystem> fs_;
+};
+
+class TestUtil {
+ public:
+ static bool CopyDirectory(const std::filesystem::path& source,
+ const std::filesystem::path& destination);
+};
+
} // namespace paimon::test