This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ced20a  feat: Migrate bucket module (#71)
3ced20a is described below

commit 3ced20a42ac4e01a47b4e78c1930e3e432d760d0
Author: lxy <[email protected]>
AuthorDate: Wed Jun 10 13:12:27 2026 +0800

    feat: Migrate bucket module (#71)
---
 include/paimon/bucket/bucket_function_type.h       |  52 +++
 include/paimon/bucket/bucket_id_calculator.h       |  99 +++++
 src/paimon/core/bucket/bucket_function.h           |  40 ++
 src/paimon/core/bucket/bucket_id_calculator.cpp    | 339 +++++++++++++++
 .../core/bucket/bucket_id_calculator_test.cpp      | 471 +++++++++++++++++++++
 src/paimon/core/bucket/bucket_select_converter.cpp | 225 ++++++++++
 src/paimon/core/bucket/bucket_select_converter.h   |  84 ++++
 .../core/bucket/bucket_select_converter_test.cpp   | 270 ++++++++++++
 src/paimon/core/bucket/default_bucket_function.h   |  37 ++
 .../core/bucket/default_bucket_function_test.cpp   |  84 ++++
 src/paimon/core/bucket/hive_bucket_function.cpp    | 147 +++++++
 src/paimon/core/bucket/hive_bucket_function.h      |  66 +++
 .../core/bucket/hive_bucket_function_test.cpp      | 453 ++++++++++++++++++++
 src/paimon/core/bucket/hive_hasher.h               | 160 +++++++
 src/paimon/core/bucket/mod_bucket_function.cpp     |  72 ++++
 src/paimon/core/bucket/mod_bucket_function.h       |  48 +++
 .../core/bucket/mod_bucket_function_test.cpp       | 183 ++++++++
 17 files changed, 2830 insertions(+)

diff --git a/include/paimon/bucket/bucket_function_type.h 
b/include/paimon/bucket/bucket_function_type.h
new file mode 100644
index 0000000..7e764ea
--- /dev/null
+++ b/include/paimon/bucket/bucket_function_type.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+
+#include "paimon/defs.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// Specifies the bucket function type for paimon bucket.
+/// This determines how rows are assigned to buckets during data writing.
+enum class BucketFunctionType {
+    /// The default bucket function which will use arithmetic:
+    /// bucket_id = abs(hash_bucket_binary_row % numBuckets) to get bucket.
+    DEFAULT = 1,
+    /// The modulus bucket function which will use modulus arithmetic:
+    /// bucket_id = floorMod(bucket_key_value, numBuckets) to get bucket.
+    /// Note: the bucket key must be a single field of INT or BIGINT datatype.
+    MOD = 2,
+    /// The hive bucket function which will use hive-compatible hash 
arithmetic to get bucket.
+    HIVE = 3
+};
+
+/// Describes a field's type information needed for Hive hashing.
+struct PAIMON_EXPORT HiveFieldInfo {
+    FieldType type;
+    int32_t precision = 0;  // Used for DECIMAL type
+    int32_t scale = 0;      // Used for DECIMAL type
+
+    explicit HiveFieldInfo(FieldType t) : type(t) {}
+    HiveFieldInfo(FieldType t, int32_t p, int32_t s) : type(t), precision(p), 
scale(s) {}
+};
+
+}  // namespace paimon
diff --git a/include/paimon/bucket/bucket_id_calculator.h 
b/include/paimon/bucket/bucket_id_calculator.h
new file mode 100644
index 0000000..5f94573
--- /dev/null
+++ b/include/paimon/bucket/bucket_id_calculator.h
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "paimon/bucket/bucket_function_type.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+struct ArrowSchema;
+struct ArrowArray;
+
+namespace paimon {
+class BucketFunction;
+class MemoryPool;
+
+/// Calculator for determining bucket ids based on the given bucket keys.
+///
+/// @note `BucketIdCalculator` is compatible with the Java implementation and 
uses
+/// hash-based distribution to ensure even data distribution across buckets.
+class PAIMON_EXPORT BucketIdCalculator {
+ public:
+    /// Create `BucketIdCalculator` with default bucket function.
+    /// @param is_pk_table Whether this is for a primary key table.
+    /// @param num_buckets Number of buckets.
+    /// @param pool Memory pool for memory allocation.
+    static Result<std::unique_ptr<BucketIdCalculator>> Create(
+        bool is_pk_table, int32_t num_buckets, const 
std::shared_ptr<MemoryPool>& pool);
+
+    /// Create `BucketIdCalculator` with a custom bucket function.
+    /// @param is_pk_table Whether this is for a primary key table.
+    /// @param num_buckets Number of buckets.
+    /// @param bucket_function The bucket function to use for bucket 
assignment.
+    /// @param pool Memory pool for memory allocation.
+    static Result<std::unique_ptr<BucketIdCalculator>> Create(
+        bool is_pk_table, int32_t num_buckets, std::unique_ptr<BucketFunction> 
bucket_function,
+        const std::shared_ptr<MemoryPool>& pool);
+
+    /// Create `BucketIdCalculator` with MOD bucket function.
+    /// @param is_pk_table Whether this is for a primary key table.
+    /// @param num_buckets Number of buckets.
+    /// @param bucket_key_type The type of the single bucket key field. Must 
be INT or BIGINT.
+    /// @param pool Memory pool for memory allocation.
+    static Result<std::unique_ptr<BucketIdCalculator>> CreateMod(
+        bool is_pk_table, int32_t num_buckets, FieldType bucket_key_type,
+        const std::shared_ptr<MemoryPool>& pool);
+
+    /// Create `BucketIdCalculator` with HIVE bucket function.
+    /// @param is_pk_table Whether this is for a primary key table.
+    /// @param num_buckets Number of buckets.
+    /// @param field_infos The detailed type info of all fields in the bucket 
key row.
+    /// @param pool Memory pool for memory allocation.
+    static Result<std::unique_ptr<BucketIdCalculator>> CreateHive(
+        bool is_pk_table, int32_t num_buckets, const 
std::vector<HiveFieldInfo>& field_infos,
+        const std::shared_ptr<MemoryPool>& pool);
+
+    /// Calculate bucket ids for the given bucket keys.
+    /// @param bucket_keys Arrow struct array containing the bucket key values.
+    /// @param bucket_schema Arrow schema describing the structure of 
bucket_keys.
+    /// @param bucket_ids Output array to store calculated bucket ids.
+    /// @note 1. bucket_keys is a struct array, the order of fields needs to 
be consistent with
+    /// "bucket-key" options in table schema. 2. bucket_keys and bucket_schema 
match each other. 3.
+    /// bucket_ids is allocated enough space, at least >= bucket_keys->length
+    Status CalculateBucketIds(ArrowArray* bucket_keys, ArrowSchema* 
bucket_schema,
+                              int32_t* bucket_ids) const;
+
+    /// Destructor
+    ~BucketIdCalculator();
+
+ private:
+    BucketIdCalculator(int32_t num_buckets, std::unique_ptr<BucketFunction> 
bucket_function,
+                       const std::shared_ptr<MemoryPool>& pool);
+
+ private:
+    int32_t num_buckets_;
+    std::unique_ptr<BucketFunction> bucket_function_;
+    std::shared_ptr<MemoryPool> pool_;
+};
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/bucket_function.h 
b/src/paimon/core/bucket/bucket_function.h
new file mode 100644
index 0000000..f02f469
--- /dev/null
+++ b/src/paimon/core/bucket/bucket_function.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+
+namespace paimon {
+
+class BinaryRow;
+
+/// Abstract interface for bucket functions.
+/// A bucket function determines which bucket a row should be assigned to.
+class BucketFunction {
+ public:
+    virtual ~BucketFunction() = default;
+
+    /// Compute the bucket for the given row.
+    /// @param row The binary row to compute the bucket for.
+    /// @param num_buckets The total number of buckets.
+    /// @return The bucket index (0-based).
+    virtual int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const = 
0;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/bucket_id_calculator.cpp 
b/src/paimon/core/bucket/bucket_id_calculator.cpp
new file mode 100644
index 0000000..6bede5a
--- /dev/null
+++ b/src/paimon/core/bucket/bucket_id_calculator.cpp
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/bucket/bucket_id_calculator.h"
+
+#include <cassert>
+#include <cmath>
+#include <cstdlib>
+#include <cstring>
+#include <functional>
+#include <optional>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_decimal.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/core/bucket/bucket_function.h"
+#include "paimon/core/bucket/default_bucket_function.h"
+#include "paimon/core/bucket/hive_bucket_function.h"
+#include "paimon/core/bucket/mod_bucket_function.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+
+namespace paimon {
+namespace {
+#define CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id) \
+    if (typed_array->IsNull(row_id)) {                              \
+        row_writer->SetNullAt(col_id);                              \
+        return;                                                     \
+    }
+
+using WriteFunction = std::function<void(int32_t, BinaryRowWriter*)>;
+static Result<WriteFunction> WriteBucketRow(int32_t col_id,
+                                            const 
std::shared_ptr<arrow::Array>& field) {
+    arrow::Type::type type = field->type()->id();
+    switch (type) {
+        case arrow::Type::type::BOOL: {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::BooleanArray*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array](int32_t row_id,
+                                                              BinaryRowWriter* 
row_writer) {
+                CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id);
+                row_writer->WriteBoolean(col_id, typed_array->Value(row_id));
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::INT8: {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::Int8Array*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array](int32_t row_id,
+                                                              BinaryRowWriter* 
row_writer) {
+                CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id);
+                row_writer->WriteByte(col_id, typed_array->Value(row_id));
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::INT16: {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::Int16Array*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array](int32_t row_id,
+                                                              BinaryRowWriter* 
row_writer) {
+                CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id);
+                row_writer->WriteShort(col_id, typed_array->Value(row_id));
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::INT32: {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::Int32Array*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array](int32_t row_id,
+                                                              BinaryRowWriter* 
row_writer) {
+                CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id);
+                row_writer->WriteInt(col_id, typed_array->Value(row_id));
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::INT64: {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::Int64Array*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array](int32_t row_id,
+                                                              BinaryRowWriter* 
row_writer) {
+                CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id);
+                row_writer->WriteLong(col_id, typed_array->Value(row_id));
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::FLOAT: {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::FloatArray*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array](int32_t row_id,
+                                                              BinaryRowWriter* 
row_writer) {
+                CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id);
+                row_writer->WriteFloat(col_id, typed_array->Value(row_id));
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::DOUBLE: {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::DoubleArray*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array](int32_t row_id,
+                                                              BinaryRowWriter* 
row_writer) {
+                CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id);
+                row_writer->WriteDouble(col_id, typed_array->Value(row_id));
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::DATE32: {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::Date32Array*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array](int32_t row_id,
+                                                              BinaryRowWriter* 
row_writer) {
+                CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id);
+                row_writer->WriteInt(col_id, typed_array->Value(row_id));
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::STRING: {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::StringArray*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array](int32_t row_id,
+                                                              BinaryRowWriter* 
row_writer) {
+                CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id);
+                std::string_view value = typed_array->GetView(row_id);
+                row_writer->WriteStringView(col_id, value);
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::BINARY: {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::BinaryArray*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array](int32_t row_id,
+                                                              BinaryRowWriter* 
row_writer) {
+                CHECK_AND_SET_NULL(typed_array, row_writer, row_id, col_id);
+                std::string_view value = typed_array->GetView(row_id);
+                row_writer->WriteStringView(col_id, value);
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            auto timestamp_type =
+                
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field->type());
+            assert(timestamp_type);
+            int32_t precision = 
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+            DateTimeUtils::TimeType time_type =
+                DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::TimestampArray*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [typed_array, col_id, precision, 
time_type](
+                                            int32_t row_id, BinaryRowWriter* 
row_writer) {
+                if (typed_array->IsNull(row_id)) {
+                    if (!Timestamp::IsCompact(precision)) {
+                        row_writer->WriteTimestamp(col_id, std::nullopt, 
precision);
+                    } else {
+                        row_writer->SetNullAt(col_id);
+                    }
+                    return;
+                }
+                int64_t ts_value = typed_array->Value(row_id);
+                auto [milli, nano] = DateTimeUtils::TimestampConverter(
+                    ts_value, time_type, DateTimeUtils::TimeType::MILLISECOND,
+                    DateTimeUtils::TimeType::NANOSECOND);
+                row_writer->WriteTimestamp(col_id, Timestamp(milli, nano), 
precision);
+            };
+            return writer_func;
+        }
+        case arrow::Type::type::DECIMAL: {
+            const auto* decimal_type =
+                arrow::internal::checked_cast<const 
arrow::Decimal128Type*>(field->type().get());
+            assert(decimal_type);
+            auto precision = decimal_type->precision();
+            auto scale = decimal_type->scale();
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::Decimal128Array*>(field.get());
+            assert(typed_array);
+            WriteFunction writer_func = [col_id, typed_array, precision, 
scale](
+                                            int32_t row_id, BinaryRowWriter* 
row_writer) {
+                if (typed_array->IsNull(row_id)) {
+                    if (!Decimal::IsCompact(precision)) {
+                        row_writer->WriteDecimal(col_id, std::nullopt, 
precision);
+                    } else {
+                        row_writer->SetNullAt(col_id);
+                    }
+                    return;
+                }
+                arrow::Decimal128 decimal128(typed_array->GetValue(row_id));
+                Decimal decimal(precision, scale,
+                                static_cast<Decimal::int128_t>(
+                                    static_cast<Decimal::uint128_t>(
+                                        
static_cast<uint64_t>(decimal128.high_bits()))
+                                        << 64 |
+                                    decimal128.low_bits()));
+                row_writer->WriteDecimal(col_id, decimal, precision);
+            };
+            return writer_func;
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("type {} not support in write bucket row", 
field->type()->ToString()));
+    }
+}
+}  // namespace
+
+BucketIdCalculator::BucketIdCalculator(int32_t num_buckets,
+                                       std::unique_ptr<BucketFunction> 
bucket_function,
+                                       const std::shared_ptr<MemoryPool>& pool)
+    : num_buckets_(num_buckets), bucket_function_(std::move(bucket_function)), 
pool_(pool) {}
+
+BucketIdCalculator::~BucketIdCalculator() = default;
+
+Result<std::unique_ptr<BucketIdCalculator>> BucketIdCalculator::Create(
+    bool is_pk_table, int32_t num_buckets, const std::shared_ptr<MemoryPool>& 
pool) {
+    return Create(is_pk_table, num_buckets, 
std::make_unique<DefaultBucketFunction>(), pool);
+}
+
+Result<std::unique_ptr<BucketIdCalculator>> BucketIdCalculator::Create(
+    bool is_pk_table, int32_t num_buckets, std::unique_ptr<BucketFunction> 
bucket_function,
+    const std::shared_ptr<MemoryPool>& pool) {
+    if (num_buckets == 0 || num_buckets < -2) {
+        return Status::Invalid("num buckets must be -1 or -2 or greater than 
0");
+    }
+    if (is_pk_table && num_buckets == -1) {
+        return Status::Invalid(
+            "DynamicBucketMode or CrossPartitionBucketMode cannot calculate 
bucket id in "
+            "primary key table");
+    }
+    if (!is_pk_table && num_buckets == -2) {
+        return Status::Invalid("Append table not support PostponeBucketMode");
+    }
+    return std::unique_ptr<BucketIdCalculator>(
+        new BucketIdCalculator(num_buckets, std::move(bucket_function), pool));
+}
+
+Result<std::unique_ptr<BucketIdCalculator>> BucketIdCalculator::CreateMod(
+    bool is_pk_table, int32_t num_buckets, FieldType bucket_key_type,
+    const std::shared_ptr<MemoryPool>& pool) {
+    PAIMON_ASSIGN_OR_RAISE(auto mod_func, 
ModBucketFunction::Create(bucket_key_type));
+    return Create(is_pk_table, num_buckets, std::move(mod_func), pool);
+}
+
+Result<std::unique_ptr<BucketIdCalculator>> BucketIdCalculator::CreateHive(
+    bool is_pk_table, int32_t num_buckets, const std::vector<HiveFieldInfo>& 
field_infos,
+    const std::shared_ptr<MemoryPool>& pool) {
+    PAIMON_ASSIGN_OR_RAISE(auto hive_func, 
HiveBucketFunction::Create(field_infos));
+    return Create(is_pk_table, num_buckets, std::move(hive_func), pool);
+}
+
+Status BucketIdCalculator::CalculateBucketIds(ArrowArray* bucket_keys, 
ArrowSchema* bucket_schema,
+                                              int32_t* bucket_ids) const {
+    ScopeGuard guard([bucket_keys, bucket_schema]() {
+        ArrowArrayRelease(bucket_keys);
+        ArrowSchemaRelease(bucket_schema);
+    });
+    if (num_buckets_ == -1 || num_buckets_ == 1) {
+        memset(bucket_ids, 0, bucket_keys->length * sizeof(int32_t));
+        return Status::OK();
+    }
+    if (num_buckets_ == -2) {
+        for (int32_t i = 0; i < bucket_keys->length; i++) {
+            bucket_ids[i] = -2;
+        }
+        return Status::OK();
+    }
+
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
bucket_array,
+                                      arrow::ImportArray(bucket_keys, 
bucket_schema));
+    const auto* struct_array =
+        arrow::internal::checked_cast<const 
arrow::StructArray*>(bucket_array.get());
+    if (!struct_array) {
+        return Status::Invalid("bucket keys is not a struct array");
+    }
+    std::vector<WriteFunction> write_functions;
+    int32_t num_fields = struct_array->num_fields();
+    write_functions.reserve(num_fields);
+    for (int32_t col = 0; col < num_fields; col++) {
+        PAIMON_ASSIGN_OR_RAISE(WriteFunction write_func,
+                               WriteBucketRow(col, struct_array->field(col)));
+        write_functions.push_back(std::move(write_func));
+    }
+
+    BinaryRow bucket_row(num_fields);
+    BinaryRowWriter row_writer(&bucket_row, /*initial_size=*/1024, 
pool_.get());
+    for (int32_t row = 0; row < struct_array->length(); row++) {
+        row_writer.Reset();
+        for (int32_t col = 0; col < num_fields; col++) {
+            write_functions[col](row, &row_writer);
+        }
+        row_writer.Complete();
+        bucket_ids[row] = bucket_function_->Bucket(bucket_row, num_buckets_);
+    }
+    guard.Release();
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/bucket_id_calculator_test.cpp 
b/src/paimon/core/bucket/bucket_id_calculator_test.cpp
new file mode 100644
index 0000000..97284a6
--- /dev/null
+++ b/src/paimon/core/bucket/bucket_id_calculator_test.cpp
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/bucket/bucket_id_calculator.h"
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/json_simple.h"
+#include "arrow/util/checked_cast.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/core/bucket/default_bucket_function.h"
+#include "paimon/core/bucket/mod_bucket_function.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+class BucketIdCalculatorTest : public ::testing::Test {
+ public:
+    void SetUp() override {}
+    void TearDown() override {}
+    Result<std::vector<int32_t>> CalculateBucketIds(
+        bool is_pk_table, int32_t num_buckets, const 
std::shared_ptr<arrow::Schema>& bucket_schema,
+        const std::shared_ptr<arrow::Array>& bucket_array) const {
+        ::ArrowArray c_bucket_array;
+        EXPECT_TRUE(arrow::ExportArray(*bucket_array, &c_bucket_array).ok());
+        ::ArrowSchema c_bucket_schema;
+        EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, 
&c_bucket_schema).ok());
+        std::vector<int32_t> bucket_ids(bucket_array->length());
+        EXPECT_OK_AND_ASSIGN(auto bucket_id_cal, BucketIdCalculator::Create(
+                                                     is_pk_table, num_buckets, 
GetDefaultPool()));
+        PAIMON_RETURN_NOT_OK(bucket_id_cal->CalculateBucketIds(
+            /*bucket_keys=*/&c_bucket_array, 
/*bucket_schema=*/&c_bucket_schema,
+            /*bucket_ids=*/bucket_ids.data()));
+        return bucket_ids;
+    }
+
+    Result<std::vector<int32_t>> CalculateBucketIds(
+        bool is_pk_table, int32_t num_buckets, const 
std::shared_ptr<arrow::Schema>& bucket_schema,
+        const std::string& data_str) const {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto bucket_array,
+                                          
arrow::ipc::internal::json::ArrayFromJSON(
+                                              
arrow::struct_(bucket_schema->fields()), data_str));
+        return CalculateBucketIds(is_pk_table, num_buckets, bucket_schema, 
bucket_array);
+    }
+
+    Result<std::vector<int32_t>> CalculateBucketIds(
+        bool is_pk_table, int32_t num_buckets, std::unique_ptr<BucketFunction> 
bucket_function,
+        const std::shared_ptr<arrow::Schema>& bucket_schema,
+        const std::shared_ptr<arrow::Array>& bucket_array) const {
+        ::ArrowArray c_bucket_array;
+        EXPECT_TRUE(arrow::ExportArray(*bucket_array, &c_bucket_array).ok());
+        ::ArrowSchema c_bucket_schema;
+        EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, 
&c_bucket_schema).ok());
+        std::vector<int32_t> bucket_ids(bucket_array->length());
+        EXPECT_OK_AND_ASSIGN(auto bucket_id_cal, BucketIdCalculator::Create(
+                                                     is_pk_table, num_buckets,
+                                                     
std::move(bucket_function), GetDefaultPool()));
+        PAIMON_RETURN_NOT_OK(bucket_id_cal->CalculateBucketIds(
+            /*bucket_keys=*/&c_bucket_array, 
/*bucket_schema=*/&c_bucket_schema,
+            /*bucket_ids=*/bucket_ids.data()));
+        return bucket_ids;
+    }
+
+    Result<std::vector<int32_t>> CalculateBucketIds(
+        bool is_pk_table, int32_t num_buckets, std::unique_ptr<BucketFunction> 
bucket_function,
+        const std::shared_ptr<arrow::Schema>& bucket_schema, const 
std::string& data_str) const {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto bucket_array,
+                                          
arrow::ipc::internal::json::ArrayFromJSON(
+                                              
arrow::struct_(bucket_schema->fields()), data_str));
+        return CalculateBucketIds(is_pk_table, num_buckets, 
std::move(bucket_function),
+                                  bucket_schema, bucket_array);
+    }
+};
+
+TEST_F(BucketIdCalculatorTest, TestCompatibleWithJava) {
+    // 5000 random records, first 12 column is record data, the last column is 
the bucket id
+    // calculated by Java FixedBucketRowKeyExtractor
+    std::string data_path = paimon::test::GetDataDir() + 
"/record_for_bucket_id.data";
+    std::string content;
+    auto fs = std::make_unique<LocalFileSystem>();
+    ASSERT_OK(fs->ReadFile(data_path, &content));
+    content = content.substr(0, content.length() - 2);
+    content = "[" + content + "]";
+
+    arrow::FieldVector raw_bucket_fields = {
+        arrow::field("v0", arrow::boolean()),
+        arrow::field("v1", arrow::int8()),
+        arrow::field("v2", arrow::int16()),
+        arrow::field("v3", arrow::int32()),
+        arrow::field("v4", arrow::int64()),
+        arrow::field("v5", arrow::float32()),
+        arrow::field("v6", arrow::float64()),
+        arrow::field("v7", arrow::date32()),
+        arrow::field("v8", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("v9", arrow::decimal128(30, 20)),
+        arrow::field("v10", arrow::utf8()),
+        arrow::field("v11", arrow::binary())};
+    auto bucket_schema = arrow::schema(raw_bucket_fields);
+
+    arrow::FieldVector bucket_fields_with_id = bucket_schema->fields();
+    bucket_fields_with_id.push_back(arrow::field("bucket_id", arrow::int32()));
+    auto bucket_array_with_id = std::dynamic_pointer_cast<arrow::StructArray>(
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_fields_with_id),
 content)
+            .ValueOrDie());
+
+    // exclude bucket id array
+    arrow::ArrayVector bucket_fields(bucket_array_with_id->fields().begin(),
+                                     bucket_array_with_id->fields().end() - 1);
+    auto bucket_array =
+        arrow::StructArray::Make(bucket_fields, 
bucket_schema->fields()).ValueOrDie();
+
+    ASSERT_OK_AND_ASSIGN(std::vector<int32_t> result,
+                         CalculateBucketIds(/*is_pk_table=*/true, 
/*num_buckets=*/12345,
+                                            bucket_schema, bucket_array));
+
+    auto bucket_id_array = arrow::internal::checked_cast<arrow::Int32Array*>(
+        bucket_array_with_id->field(bucket_schema->num_fields()).get());
+    ASSERT_TRUE(bucket_id_array);
+    // test compatible with java
+    for (int32_t i = 0; i < bucket_array->length(); i++) {
+        ASSERT_EQ(bucket_id_array->Value(i), result[i]);
+    }
+}
+
+TEST_F(BucketIdCalculatorTest, TestCompatibleWithJavaWithNull) {
+    // 5000 random records, first 13 column is record data, the last column is 
the bucket id
+    // calculated by Java FixedBucketRowKeyExtractor. Besides, the first row 
is all null.
+    std::string data_path = paimon::test::GetDataDir() + 
"/record_with_null_for_bucket_id.data";
+    std::string content;
+    auto fs = std::make_unique<LocalFileSystem>();
+    ASSERT_OK(fs->ReadFile(data_path, &content));
+    content = content.substr(0, content.length() - 2);
+    content = "[" + content + "]";
+
+    arrow::FieldVector raw_bucket_fields = {
+        arrow::field("v0", arrow::boolean()),
+        arrow::field("v1", arrow::int8()),
+        arrow::field("v2", arrow::int16()),
+        arrow::field("v3", arrow::int32()),
+        arrow::field("v4", arrow::int64()),
+        arrow::field("v5", arrow::float32()),
+        arrow::field("v6", arrow::float64()),
+        arrow::field("v7", arrow::date32()),
+        arrow::field("v8", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("v9", arrow::decimal128(5, 2)),
+        arrow::field("v10", arrow::decimal128(30, 2)),
+        arrow::field("v11", arrow::utf8()),
+        arrow::field("v12", arrow::binary())};
+    auto bucket_schema = arrow::schema(raw_bucket_fields);
+
+    arrow::FieldVector bucket_fields_with_id = bucket_schema->fields();
+    bucket_fields_with_id.push_back(arrow::field("bucket_id", arrow::int32()));
+    auto bucket_array_with_id = std::dynamic_pointer_cast<arrow::StructArray>(
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_fields_with_id),
 content)
+            .ValueOrDie());
+
+    // exclude bucket id array
+    arrow::ArrayVector bucket_fields(bucket_array_with_id->fields().begin(),
+                                     bucket_array_with_id->fields().end() - 1);
+    auto bucket_array =
+        arrow::StructArray::Make(bucket_fields, 
bucket_schema->fields()).ValueOrDie();
+
+    ASSERT_OK_AND_ASSIGN(std::vector<int32_t> result,
+                         CalculateBucketIds(/*is_pk_table=*/false, 
/*num_buckets=*/12345,
+                                            bucket_schema, bucket_array));
+
+    auto bucket_id_array = arrow::internal::checked_cast<arrow::Int32Array*>(
+        bucket_array_with_id->field(bucket_schema->num_fields()).get());
+    ASSERT_TRUE(bucket_id_array);
+    // test compatible with java
+    for (int32_t i = 0; i < bucket_array->length(); i++) {
+        ASSERT_EQ(bucket_id_array->Value(i), result[i]);
+    }
+}
+
+TEST_F(BucketIdCalculatorTest, TestCompatibleWithJavaWithTimestamp) {
+    // 5000 random records, first 8 column is record data, the last column is 
the bucket id
+    // calculated by Java FixedBucketRowKeyExtractor. Besides, the first row 
is all null.
+    std::string data_path = paimon::test::GetDataDir() + 
"record_with_timestamp_for_bucket_id.data";
+    std::string content;
+    auto fs = std::make_unique<LocalFileSystem>();
+    ASSERT_OK(fs->ReadFile(data_path, &content));
+    content = content.substr(0, content.length() - 2);
+    content = "[" + content + "]";
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    arrow::FieldVector raw_bucket_fields = {
+        arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+        arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+        arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+        arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND, 
timezone)),
+        arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI, 
timezone)),
+        arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO, 
timezone)),
+        arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO, 
timezone)),
+    };
+    auto bucket_schema = std::make_shared<arrow::Schema>(raw_bucket_fields);
+
+    arrow::FieldVector bucket_fields_with_id = bucket_schema->fields();
+    bucket_fields_with_id.push_back(arrow::field("bucket_id", arrow::int32()));
+    auto bucket_array_with_id = std::dynamic_pointer_cast<arrow::StructArray>(
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_fields_with_id),
 content)
+            .ValueOrDie());
+
+    // exclude bucket id array
+    arrow::ArrayVector bucket_fields(bucket_array_with_id->fields().begin(),
+                                     bucket_array_with_id->fields().end() - 1);
+    auto bucket_array =
+        arrow::StructArray::Make(bucket_fields, 
bucket_schema->fields()).ValueOrDie();
+
+    ASSERT_OK_AND_ASSIGN(std::vector<int32_t> result,
+                         CalculateBucketIds(/*is_pk_table=*/false, 
/*num_buckets=*/12345,
+                                            bucket_schema, bucket_array));
+
+    auto bucket_id_array = arrow::internal::checked_cast<arrow::Int32Array*>(
+        bucket_array_with_id->field(bucket_schema->num_fields()).get());
+    ASSERT_TRUE(bucket_id_array);
+    // test compatible with java
+    for (int32_t i = 0; i < bucket_array->length(); i++) {
+        ASSERT_EQ(bucket_id_array->Value(i), result[i]);
+    }
+}
+
+TEST_F(BucketIdCalculatorTest, TestInvalidCase) {
+    {
+        // test invalid bucket id
+        ASSERT_NOK_WITH_MSG(
+            BucketIdCalculator::Create(/*is_pk_table=*/true, 
/*num_buckets=*/0, GetDefaultPool()),
+            "num buckets must be -1 or -2 or greater than 0");
+    }
+    {
+        // test invalid bucket mode with pk table
+        ASSERT_NOK_WITH_MSG(
+            BucketIdCalculator::Create(/*is_pk_table=*/true, 
/*num_buckets=*/-1, GetDefaultPool()),
+            "DynamicBucketMode or CrossPartitionBucketMode cannot calculate 
bucket id");
+    }
+    {
+        // test invalid bucket mode with append table
+        ASSERT_NOK_WITH_MSG(
+            BucketIdCalculator::Create(/*is_pk_table=*/false, 
/*num_buckets=*/-2, GetDefaultPool()),
+            "Append table not support PostponeBucketMode");
+    }
+    {
+        // test invalid bucket_keys
+        auto bucket_schema =
+            arrow::schema(arrow::FieldVector({arrow::field("b0", 
arrow::int32())}));
+        auto bucket_array =
+            arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[10, 
11, 12, 13]")
+                .ValueOrDie();
+        ASSERT_NOK_WITH_MSG(
+            CalculateBucketIds(/*is_pk_table=*/false, 10, bucket_schema, 
bucket_array),
+            "ArrowArray struct has 0 children");
+    }
+    {
+        // test invalid data type
+        auto bucket_schema = arrow::schema(arrow::FieldVector(
+            {arrow::field("b0", arrow::int32()), arrow::field("b1", 
arrow::list(arrow::int64()))}));
+        ASSERT_NOK_WITH_MSG(
+            CalculateBucketIds(/*is_pk_table=*/true, 10, bucket_schema, "[[10, 
[1, 1, 2]]]"),
+            "type list<item: int64> not support in write bucket row");
+    }
+}
+
+TEST_F(BucketIdCalculatorTest, TestUnawareBucket) {
+    auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", 
arrow::int32())}));
+    ASSERT_OK_AND_ASSIGN(
+        std::vector<int32_t> result,
+        CalculateBucketIds(/*is_pk_table=*/false, -1, bucket_schema, "[[10], 
[-1], [50]]"));
+    std::vector<int32_t> expected = {0, 0, 0};
+    ASSERT_EQ(expected, result);
+}
+
+TEST_F(BucketIdCalculatorTest, TestPostponeBucket) {
+    auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", 
arrow::int32())}));
+    ASSERT_OK_AND_ASSIGN(
+        std::vector<int32_t> result,
+        CalculateBucketIds(/*is_pk_table=*/true, -2, bucket_schema, "[[10], 
[-1], [50]]"));
+    std::vector<int32_t> expected = {-2, -2, -2};
+    ASSERT_EQ(expected, result);
+}
+
+TEST_F(BucketIdCalculatorTest, TestVariantType) {
+    arrow::FieldVector raw_bucket_fields = {
+        arrow::field("v0", arrow::boolean()),
+        arrow::field("v1", arrow::int8()),
+        arrow::field("v2", arrow::int16()),
+        arrow::field("v3", arrow::int32()),
+        arrow::field("v4", arrow::int64()),
+        arrow::field("v5", arrow::float32()),
+        arrow::field("v6", arrow::float64()),
+        arrow::field("v7", arrow::date32()),
+        arrow::field("v8", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("v9", arrow::decimal128(30, 20)),
+        arrow::field("v10", arrow::utf8()),
+        arrow::field("v11", arrow::binary())};
+    auto bucket_schema = arrow::schema(raw_bucket_fields);
+
+    auto bucket_array = std::dynamic_pointer_cast<arrow::StructArray>(
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()),
 R"([
+        [true, 10, 200, 65536, 123456789, 0.0, 0.0, 2000, -86399999999500, 
"2134.48690000000000000009", "olá mundo,你好世界。Two roads diverged in a wood, and 
I took the one less traveled by, And that has made all the difference.", 
"Alice"],
+        [false, -128, -32768, -2147483648, -9223372036854775808, 
-3.4028235E38, -1.7976931348623157E308, -719528, -9223372036854775808, 
"-999999999999999999.99999999999999999999", "Alice", "olá mundo,你好世界。Two roads 
diverged in a wood, and I took the one less traveled by, And that has made all 
the difference."],
+        [true, 127, 32767, 2147483647, 9223372036854775807, 3.4028235E38, 
1.7976931348623157E308, 2932896, 9223372036854775807, 
"999999999999999999.99999999999999999999", "Alice", "olá mundo,你好世界。Two roads 
diverged in a wood, and I took the one less traveled by, And that has made all 
the difference."],
+        [true, 0, 0, 0, 0, 1.4E-45, 4.9E-324, 0, 0, "0.00000000000000000000", 
"Alice", "olá mundo,你好世界。Two roads diverged in a wood, and I took the one less 
traveled by, And that has made all the difference."]
+])")
+            .ValueOrDie());
+    ASSERT_OK_AND_ASSIGN(
+        std::vector<int32_t> result,
+        CalculateBucketIds(/*is_pk_table=*/true, 12345, bucket_schema, 
bucket_array));
+    std::vector<int32_t> expected = {11275, 12272, 6549, 11795};
+    ASSERT_EQ(expected, result);
+    // test calculate multiple times
+    ASSERT_OK_AND_ASSIGN(
+        std::vector<int32_t> result2,
+        CalculateBucketIds(/*is_pk_table=*/true, 12345, bucket_schema, 
bucket_array));
+    ASSERT_EQ(expected, result2);
+}
+
+TEST_F(BucketIdCalculatorTest, TestWithModBucketFunction) {
+    auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", 
arrow::int32())}));
+    ASSERT_OK_AND_ASSIGN(auto mod_func, 
ModBucketFunction::Create(FieldType::INT));
+    ASSERT_OK_AND_ASSIGN(
+        std::vector<int32_t> result,
+        CalculateBucketIds(/*is_pk_table=*/true, /*num_buckets=*/10, 
std::move(mod_func),
+                           bucket_schema, "[[10], [-1], [50], [-13], [0]]"));
+    // Java Math.floorMod semantics:
+    // floorMod(10, 10) = 0
+    // floorMod(-1, 10) = 9
+    // floorMod(50, 10) = 0
+    // floorMod(-13, 10) = 7
+    // floorMod(0, 10) = 0
+    std::vector<int32_t> expected = {0, 9, 0, 7, 0};
+    ASSERT_EQ(expected, result);
+}
+
+TEST_F(BucketIdCalculatorTest, TestWithDefaultBucketFunctionExplicit) {
+    auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", 
arrow::int32())}));
+    // Calculate with explicit DefaultBucketFunction
+    auto default_func = std::make_unique<DefaultBucketFunction>();
+    ASSERT_OK_AND_ASSIGN(
+        std::vector<int32_t> result_explicit,
+        CalculateBucketIds(/*is_pk_table=*/true, /*num_buckets=*/10, 
std::move(default_func),
+                           bucket_schema, "[[10], [-1], [50], [-13], [0]]"));
+    // Calculate with default (no BucketFunction passed)
+    ASSERT_OK_AND_ASSIGN(std::vector<int32_t> result_default,
+                         CalculateBucketIds(/*is_pk_table=*/true, 
/*num_buckets=*/10, bucket_schema,
+                                            "[[10], [-1], [50], [-13], [0]]"));
+    ASSERT_EQ(result_default, result_explicit);
+}
+
+TEST_F(BucketIdCalculatorTest, TestCreateWithDefaultBucketFunction) {
+    auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", 
arrow::int32())}));
+    std::string data_str = "[[10], [-1], [50], [-13], [0]]";
+
+    // Calculate with explicit DefaultBucketFunction via Create
+    auto default_func = std::make_unique<DefaultBucketFunction>();
+    ASSERT_OK_AND_ASSIGN(auto calc_explicit,
+                         BucketIdCalculator::Create(/*is_pk_table=*/true, 
/*num_buckets=*/10,
+                                                    std::move(default_func), 
GetDefaultPool()));
+
+    // Calculate with the default Create (no BucketFunction)
+    ASSERT_OK_AND_ASSIGN(
+        auto calc_default,
+        BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, 
GetDefaultPool()));
+
+    auto bucket_array1 =
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()),
 data_str)
+            .ValueOrDie();
+    ::ArrowArray c_array1;
+    EXPECT_TRUE(arrow::ExportArray(*bucket_array1, &c_array1).ok());
+    ::ArrowSchema c_schema1;
+    EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema1).ok());
+    std::vector<int32_t> result_explicit(bucket_array1->length());
+    ASSERT_OK(calc_explicit->CalculateBucketIds(&c_array1, &c_schema1, 
result_explicit.data()));
+
+    auto bucket_array2 =
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()),
 data_str)
+            .ValueOrDie();
+    ::ArrowArray c_array2;
+    EXPECT_TRUE(arrow::ExportArray(*bucket_array2, &c_array2).ok());
+    ::ArrowSchema c_schema2;
+    EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema2).ok());
+    std::vector<int32_t> result_default(bucket_array2->length());
+    ASSERT_OK(calc_default->CalculateBucketIds(&c_array2, &c_schema2, 
result_default.data()));
+
+    ASSERT_EQ(result_default, result_explicit);
+}
+
+TEST_F(BucketIdCalculatorTest, TestCreateWithModBucketFunction) {
+    auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", 
arrow::int32())}));
+    std::string data_str = "[[10], [-1], [50], [-13], [0]]";
+
+    // Calculate with CreateMod
+    ASSERT_OK_AND_ASSIGN(auto calc_mod,
+                         BucketIdCalculator::CreateMod(/*is_pk_table=*/true, 
/*num_buckets=*/10,
+                                                       FieldType::INT, 
GetDefaultPool()));
+
+    // Calculate with explicit ModBucketFunction
+    ASSERT_OK_AND_ASSIGN(auto mod_func, 
ModBucketFunction::Create(FieldType::INT));
+    ASSERT_OK_AND_ASSIGN(std::vector<int32_t> result_explicit,
+                         CalculateBucketIds(/*is_pk_table=*/true, 
/*num_buckets=*/10,
+                                            std::move(mod_func), 
bucket_schema, data_str));
+
+    auto bucket_array =
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()),
 data_str)
+            .ValueOrDie();
+    ::ArrowArray c_array;
+    EXPECT_TRUE(arrow::ExportArray(*bucket_array, &c_array).ok());
+    ::ArrowSchema c_schema;
+    EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema).ok());
+    std::vector<int32_t> result_mod(bucket_array->length());
+    ASSERT_OK(calc_mod->CalculateBucketIds(&c_array, &c_schema, 
result_mod.data()));
+
+    ASSERT_EQ(result_explicit, result_mod);
+    // Verify expected values (Java Math.floorMod semantics)
+    std::vector<int32_t> expected = {0, 9, 0, 7, 0};
+    ASSERT_EQ(expected, result_mod);
+}
+
+TEST_F(BucketIdCalculatorTest, TestCreateWithHiveBucketFunction) {
+    auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", 
arrow::int32())}));
+    std::string data_str = "[[42], [0], [100]]";
+
+    std::vector<HiveFieldInfo> field_infos = {HiveFieldInfo(FieldType::INT)};
+
+    // Calculate with CreateHive
+    ASSERT_OK_AND_ASSIGN(auto calc_hive,
+                         BucketIdCalculator::CreateHive(/*is_pk_table=*/true, 
/*num_buckets=*/5,
+                                                        field_infos, 
GetDefaultPool()));
+
+    auto bucket_array =
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()),
 data_str)
+            .ValueOrDie();
+    ::ArrowArray c_array;
+    EXPECT_TRUE(arrow::ExportArray(*bucket_array, &c_array).ok());
+    ::ArrowSchema c_schema;
+    EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema).ok());
+    std::vector<int32_t> result(bucket_array->length());
+    ASSERT_OK(calc_hive->CalculateBucketIds(&c_array, &c_schema, 
result.data()));
+
+    // Verify all bucket ids are in valid range
+    for (auto bucket_id : result) {
+        ASSERT_GE(bucket_id, 0);
+        ASSERT_LT(bucket_id, 5);
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/bucket/bucket_select_converter.cpp 
b/src/paimon/core/bucket/bucket_select_converter.cpp
new file mode 100644
index 0000000..01bf860
--- /dev/null
+++ b/src/paimon/core/bucket/bucket_select_converter.cpp
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/bucket/bucket_select_converter.h"
+
+#include <cassert>
+#include <set>
+#include <string>
+#include <utility>
+
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/core/bucket/bucket_function.h"
+#include "paimon/core/bucket/default_bucket_function.h"
+#include "paimon/core/bucket/hive_bucket_function.h"
+#include "paimon/core/bucket/mod_bucket_function.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/leaf_predicate.h"
+#include "paimon/predicate/predicate.h"
+#include "paimon/predicate/predicate_utils.h"
+
+namespace paimon {
+
+Result<std::optional<int32_t>> BucketSelectConverter::Convert(
+    const std::shared_ptr<Predicate>& predicate, const 
std::vector<std::string>& bucket_key_names,
+    const std::vector<std::shared_ptr<arrow::DataType>>& 
bucket_key_arrow_types,
+    BucketFunctionType bucket_function_type, int32_t num_buckets, MemoryPool* 
pool) {
+    assert(pool);
+    if (!predicate || bucket_key_names.empty() || num_buckets <= 0) {
+        return std::optional<int32_t>(std::nullopt);
+    }
+
+    if (bucket_key_names.size() != bucket_key_arrow_types.size()) {
+        return Status::Invalid(
+            "bucket_key_names and bucket_key_arrow_types must have the same 
size");
+    }
+
+    // Derive FieldTypes from Arrow types
+    std::vector<FieldType> bucket_key_types;
+    bucket_key_types.reserve(bucket_key_arrow_types.size());
+    for (const auto& arrow_type : bucket_key_arrow_types) {
+        PAIMON_ASSIGN_OR_RAISE(FieldType ft, 
FieldTypeUtils::ConvertToFieldType(arrow_type->id()));
+        bucket_key_types.push_back(ft);
+    }
+
+    auto literals_opt = ExtractEqualLiterals(predicate, bucket_key_names);
+    if (!literals_opt.has_value()) {
+        return std::optional<int32_t>(std::nullopt);
+    }
+
+    const auto& literals_map = literals_opt.value();
+    auto num_fields = static_cast<int32_t>(bucket_key_names.size());
+
+    // Build a BinaryRow from the extracted literals
+    BinaryRow row(num_fields);
+    BinaryRowWriter writer(&row, /*initial_size=*/1024, pool);
+    writer.Reset();
+
+    for (int32_t i = 0; i < num_fields; i++) {
+        const auto& field_name = bucket_key_names[i];
+        const auto& literal = literals_map.at(field_name);
+        PAIMON_RETURN_NOT_OK(
+            WriteLiteralToRow(i, literal, bucket_key_types[i], 
bucket_key_arrow_types[i], &writer));
+    }
+    writer.Complete();
+
+    // Create the bucket function and compute the bucket
+    PAIMON_ASSIGN_OR_RAISE(
+        std::unique_ptr<BucketFunction> bucket_function,
+        CreateBucketFunction(bucket_function_type, bucket_key_types, 
bucket_key_arrow_types));
+    int32_t bucket = bucket_function->Bucket(row, num_buckets);
+    return std::optional<int32_t>(bucket);
+}
+
+std::optional<std::map<std::string, Literal>> 
BucketSelectConverter::ExtractEqualLiterals(
+    const std::shared_ptr<Predicate>& predicate, const 
std::vector<std::string>& bucket_key_names) {
+    std::set<std::string> key_set(bucket_key_names.begin(), 
bucket_key_names.end());
+    std::map<std::string, Literal> result;
+
+    auto splits = PredicateUtils::SplitAnd(predicate);
+    for (const auto& split : splits) {
+        auto* leaf = dynamic_cast<const LeafPredicate*>(split.get());
+        if (!leaf) {
+            continue;
+        }
+        // TODO(liangjie.liang): Support IN and OR predicates to enable 
multi-bucket pruning
+        if (leaf->GetFunction().GetType() != Function::Type::EQUAL) {
+            continue;
+        }
+        const auto& field_name = leaf->FieldName();
+        if (key_set.find(field_name) == key_set.end()) {
+            continue;
+        }
+        const auto& literals = leaf->Literals();
+        if (literals.size() != 1 || literals[0].IsNull()) {
+            continue;
+        }
+        // Only record the first EQUAL for each field
+        if (result.find(field_name) == result.end()) {
+            result.emplace(field_name, literals[0]);
+        }
+    }
+
+    // Check all bucket key fields are covered
+    for (const auto& key_name : bucket_key_names) {
+        if (result.find(key_name) == result.end()) {
+            return std::nullopt;
+        }
+    }
+    return result;
+}
+
+Status BucketSelectConverter::WriteLiteralToRow(int32_t pos, const Literal& 
literal,
+                                                FieldType field_type,
+                                                const 
std::shared_ptr<arrow::DataType>& arrow_type,
+                                                BinaryRowWriter* writer) {
+    switch (field_type) {
+        case FieldType::BOOLEAN:
+            writer->WriteBoolean(pos, literal.GetValue<bool>());
+            break;
+        case FieldType::TINYINT:
+            writer->WriteByte(pos, literal.GetValue<int8_t>());
+            break;
+        case FieldType::SMALLINT:
+            writer->WriteShort(pos, literal.GetValue<int16_t>());
+            break;
+        case FieldType::INT:
+        case FieldType::DATE:
+            writer->WriteInt(pos, literal.GetValue<int32_t>());
+            break;
+        case FieldType::BIGINT:
+            writer->WriteLong(pos, literal.GetValue<int64_t>());
+            break;
+        case FieldType::FLOAT:
+            writer->WriteFloat(pos, literal.GetValue<float>());
+            break;
+        case FieldType::DOUBLE:
+            writer->WriteDouble(pos, literal.GetValue<double>());
+            break;
+        case FieldType::STRING:
+        case FieldType::BINARY: {
+            auto value = literal.GetValue<std::string>();
+            writer->WriteStringView(pos, std::string_view{value});
+            break;
+        }
+        case FieldType::TIMESTAMP: {
+            auto ts = literal.GetValue<Timestamp>();
+            auto timestamp_type =
+                
arrow::internal::checked_pointer_cast<arrow::TimestampType>(arrow_type);
+            int32_t precision = 
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+            writer->WriteTimestamp(pos, ts, precision);
+            break;
+        }
+        case FieldType::DECIMAL: {
+            auto dec = literal.GetValue<Decimal>();
+            const auto* decimal_type =
+                arrow::internal::checked_cast<const 
arrow::Decimal128Type*>(arrow_type.get());
+            int32_t precision = decimal_type->precision();
+            writer->WriteDecimal(pos, dec, precision);
+            break;
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("unsupported field type {} for bucket select 
conversion",
+                            static_cast<int>(field_type)));
+    }
+    return Status::OK();
+}
+
+Result<std::unique_ptr<BucketFunction>> 
BucketSelectConverter::CreateBucketFunction(
+    BucketFunctionType type, const std::vector<FieldType>& bucket_key_types,
+    const std::vector<std::shared_ptr<arrow::DataType>>& 
bucket_key_arrow_types) {
+    switch (type) {
+        case BucketFunctionType::DEFAULT:
+            return 
std::unique_ptr<BucketFunction>(std::make_unique<DefaultBucketFunction>());
+        case BucketFunctionType::MOD: {
+            if (bucket_key_types.size() != 1) {
+                return Status::Invalid("MOD bucket function requires exactly 
one bucket key field");
+            }
+            return ModBucketFunction::Create(bucket_key_types[0]);
+        }
+        case BucketFunctionType::HIVE: {
+            std::vector<HiveFieldInfo> field_infos;
+            field_infos.reserve(bucket_key_types.size());
+            for (size_t i = 0; i < bucket_key_types.size(); i++) {
+                if (bucket_key_types[i] == FieldType::DECIMAL) {
+                    const auto* decimal_type =
+                        arrow::internal::checked_cast<const 
arrow::Decimal128Type*>(
+                            bucket_key_arrow_types[i].get());
+                    field_infos.emplace_back(bucket_key_types[i], 
decimal_type->precision(),
+                                             decimal_type->scale());
+                } else {
+                    field_infos.emplace_back(bucket_key_types[i]);
+                }
+            }
+            return HiveBucketFunction::Create(field_infos);
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("unsupported bucket function type: {}", 
static_cast<int>(type)));
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/bucket_select_converter.h 
b/src/paimon/core/bucket/bucket_select_converter.h
new file mode 100644
index 0000000..25eb31a
--- /dev/null
+++ b/src/paimon/core/bucket/bucket_select_converter.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "arrow/type_fwd.h"
+#include "paimon/bucket/bucket_function_type.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+class BinaryRowWriter;
+class BucketFunction;
+class MemoryPool;
+class Predicate;
+
+/// Converts predicates on bucket key fields to a target bucket ID.
+/// When all bucket key fields have EQUAL predicates, the converter computes
+/// which bucket the data must reside in, enabling bucket pruning during scan.
+class BucketSelectConverter {
+ public:
+    BucketSelectConverter() = delete;
+    ~BucketSelectConverter() = delete;
+
+    /// Convert predicates to a target bucket ID.
+    /// @param predicate The predicate (possibly compound AND) to analyze.
+    /// @param bucket_key_names Ordered bucket key field names.
+    /// @param bucket_key_arrow_types Ordered Arrow data types for bucket key 
fields.
+    ///        FieldType is derived from these automatically.
+    /// @param bucket_function_type The bucket function type (DEFAULT, MOD, 
HIVE).
+    /// @param num_buckets The total number of buckets.
+    /// @param pool Memory pool for BinaryRow construction.
+    /// @return The target bucket ID, or nullopt if predicates don't fully 
constrain all bucket
+    /// keys.
+    static Result<std::optional<int32_t>> Convert(
+        const std::shared_ptr<Predicate>& predicate,
+        const std::vector<std::string>& bucket_key_names,
+        const std::vector<std::shared_ptr<arrow::DataType>>& 
bucket_key_arrow_types,
+        BucketFunctionType bucket_function_type, int32_t num_buckets, 
MemoryPool* pool);
+
+ private:
+    /// Extract single literal per bucket key field from EQUAL predicates.
+    /// Splits the predicate by AND and looks for EQUAL leaf predicates on 
bucket key fields.
+    /// @return A map from field name to literal, or nullopt if not all bucket 
keys are constrained.
+    static std::optional<std::map<std::string, Literal>> ExtractEqualLiterals(
+        const std::shared_ptr<Predicate>& predicate,
+        const std::vector<std::string>& bucket_key_names);
+
+    /// Write a Literal value to a BinaryRowWriter at the given position.
+    static Status WriteLiteralToRow(int32_t pos, const Literal& literal, 
FieldType field_type,
+                                    const std::shared_ptr<arrow::DataType>& 
arrow_type,
+                                    BinaryRowWriter* writer);
+
+    /// Create the appropriate BucketFunction for the given type.
+    static Result<std::unique_ptr<BucketFunction>> CreateBucketFunction(
+        BucketFunctionType type, const std::vector<FieldType>& 
bucket_key_types,
+        const std::vector<std::shared_ptr<arrow::DataType>>& 
bucket_key_arrow_types);
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/bucket_select_converter_test.cpp 
b/src/paimon/core/bucket/bucket_select_converter_test.cpp
new file mode 100644
index 0000000..94c2f60
--- /dev/null
+++ b/src/paimon/core/bucket/bucket_select_converter_test.cpp
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/bucket/bucket_select_converter.h"
+
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "gtest/gtest.h"
+#include "paimon/core/bucket/default_bucket_function.h"
+#include "paimon/core/bucket/mod_bucket_function.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class BucketSelectConverterTest : public ::testing::Test {
+ protected:
+    std::shared_ptr<MemoryPool> pool_ = GetDefaultPool();
+};
+
+TEST_F(BucketSelectConverterTest, SingleIntEqualDefault) {
+    int32_t num_buckets = 10;
+    Literal lit(static_cast<int32_t>(42));
+    auto predicate = PredicateBuilder::Equal(0, "id", FieldType::INT, lit);
+
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"id"}, {arrow::int32()},
+                                          BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_TRUE(result.has_value());
+
+    // Verify by computing the expected bucket manually
+    auto row = BinaryRowGenerator::GenerateRow({static_cast<int32_t>(42)}, 
pool_.get());
+    DefaultBucketFunction func;
+    ASSERT_EQ(func.Bucket(row, num_buckets), result.value());
+}
+
+TEST_F(BucketSelectConverterTest, SingleStringEqualDefault) {
+    int32_t num_buckets = 8;
+    std::string val = "hello_world";
+    Literal lit(FieldType::STRING, val.c_str(), val.size());
+    auto predicate = PredicateBuilder::Equal(0, "name", FieldType::STRING, 
lit);
+
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"name"}, {arrow::utf8()},
+                                          BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_TRUE(result.has_value());
+
+    // Verify
+    auto row = BinaryRowGenerator::GenerateRow({val}, pool_.get());
+    DefaultBucketFunction func;
+    ASSERT_EQ(func.Bucket(row, num_buckets), result.value());
+}
+
+TEST_F(BucketSelectConverterTest, MultiKeyAndPredicate) {
+    int32_t num_buckets = 5;
+    Literal lit_id(static_cast<int32_t>(100));
+    Literal lit_name(FieldType::STRING, "test", 4);
+    auto pred_id = PredicateBuilder::Equal(0, "id", FieldType::INT, lit_id);
+    auto pred_name = PredicateBuilder::Equal(1, "name", FieldType::STRING, 
lit_name);
+    ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({pred_id, 
pred_name}));
+
+    ASSERT_OK_AND_ASSIGN(
+        auto result,
+        BucketSelectConverter::Convert(predicate, {"id", "name"}, 
{arrow::int32(), arrow::utf8()},
+                                       BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_TRUE(result.has_value());
+
+    // Verify
+    auto row = BinaryRowGenerator::GenerateRow({static_cast<int32_t>(100), 
std::string("test")},
+                                               pool_.get());
+    DefaultBucketFunction func;
+    ASSERT_EQ(func.Bucket(row, num_buckets), result.value());
+}
+
+TEST_F(BucketSelectConverterTest, MissingBucketKeyReturnsNullopt) {
+    int32_t num_buckets = 5;
+    Literal lit(static_cast<int32_t>(42));
+    auto predicate = PredicateBuilder::Equal(0, "id", FieldType::INT, lit);
+
+    ASSERT_OK_AND_ASSIGN(
+        auto result,
+        BucketSelectConverter::Convert(predicate, {"id", "name"}, 
{arrow::int32(), arrow::utf8()},
+                                       BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_FALSE(result.has_value());
+}
+
+TEST_F(BucketSelectConverterTest, NonEqualPredicateReturnsNullopt) {
+    int32_t num_buckets = 5;
+    Literal lit(static_cast<int32_t>(42));
+    auto predicate = PredicateBuilder::GreaterThan(0, "id", FieldType::INT, 
lit);
+
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"id"}, {arrow::int32()},
+                                          BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_FALSE(result.has_value());
+}
+
+TEST_F(BucketSelectConverterTest, OrPredicateReturnsNullopt) {
+    int32_t num_buckets = 5;
+    Literal lit1(static_cast<int32_t>(1));
+    Literal lit2(static_cast<int32_t>(2));
+    auto pred1 = PredicateBuilder::Equal(0, "id", FieldType::INT, lit1);
+    auto pred2 = PredicateBuilder::Equal(0, "id", FieldType::INT, lit2);
+    ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::Or({pred1, pred2}));
+
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"id"}, {arrow::int32()},
+                                          BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_FALSE(result.has_value());
+}
+
+TEST_F(BucketSelectConverterTest, ModBucketFunction) {
+    int32_t num_buckets = 7;
+    Literal lit(static_cast<int32_t>(42));
+    auto predicate = PredicateBuilder::Equal(0, "id", FieldType::INT, lit);
+
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"id"}, {arrow::int32()},
+                                          BucketFunctionType::MOD, 
num_buckets, pool_.get()));
+    ASSERT_TRUE(result.has_value());
+
+    // Verify: MOD function uses floorMod
+    auto row = BinaryRowGenerator::GenerateRow({static_cast<int32_t>(42)}, 
pool_.get());
+    ASSERT_OK_AND_ASSIGN(auto mod_func, 
ModBucketFunction::Create(FieldType::INT));
+    ASSERT_EQ(mod_func->Bucket(row, num_buckets), result.value());
+}
+
+TEST_F(BucketSelectConverterTest, NullLiteralReturnsNullopt) {
+    int32_t num_buckets = 5;
+    Literal lit(FieldType::INT);  // null literal
+    auto predicate = PredicateBuilder::Equal(0, "id", FieldType::INT, lit);
+
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"id"}, {arrow::int32()},
+                                          BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_FALSE(result.has_value());
+}
+
+TEST_F(BucketSelectConverterTest, DynamicBucketModeReturnsNullopt) {
+    Literal lit(static_cast<int32_t>(42));
+    auto predicate = PredicateBuilder::Equal(0, "id", FieldType::INT, lit);
+
+    ASSERT_OK_AND_ASSIGN(
+        auto result, BucketSelectConverter::Convert(predicate, {"id"}, 
{arrow::int32()},
+                                                    
BucketFunctionType::DEFAULT, -1, pool_.get()));
+    ASSERT_FALSE(result.has_value());
+}
+
+TEST_F(BucketSelectConverterTest, NullPredicateReturnsNullopt) {
+    ASSERT_OK_AND_ASSIGN(
+        auto result, BucketSelectConverter::Convert(nullptr, {"id"}, 
{arrow::int32()},
+                                                    
BucketFunctionType::DEFAULT, 5, pool_.get()));
+    ASSERT_FALSE(result.has_value());
+}
+
+TEST_F(BucketSelectConverterTest, BigintKeyDefault) {
+    int32_t num_buckets = 16;
+    Literal lit(static_cast<int64_t>(123456789L));
+    auto predicate = PredicateBuilder::Equal(0, "user_id", FieldType::BIGINT, 
lit);
+
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"user_id"}, 
{arrow::int64()},
+                                          BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_TRUE(result.has_value());
+
+    // Verify
+    auto row = 
BinaryRowGenerator::GenerateRow({static_cast<int64_t>(123456789L)}, 
pool_.get());
+    DefaultBucketFunction func;
+    ASSERT_EQ(func.Bucket(row, num_buckets), result.value());
+}
+
+TEST_F(BucketSelectConverterTest, AndWithExtraPredicateStillWorks) {
+    // AND(EQUAL(id, 42), GREATER_THAN(value, 100))
+    // Only id is bucket key, value is not — should still derive bucket from id
+    int32_t num_buckets = 5;
+    Literal lit_id(static_cast<int32_t>(42));
+    Literal lit_val(static_cast<int32_t>(100));
+    auto pred_id = PredicateBuilder::Equal(0, "id", FieldType::INT, lit_id);
+    auto pred_val = PredicateBuilder::GreaterThan(1, "value", FieldType::INT, 
lit_val);
+    ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({pred_id, 
pred_val}));
+
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"id"}, {arrow::int32()},
+                                          BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_TRUE(result.has_value());
+
+    auto row = BinaryRowGenerator::GenerateRow({static_cast<int32_t>(42)}, 
pool_.get());
+    DefaultBucketFunction func;
+    ASSERT_EQ(func.Bucket(row, num_buckets), result.value());
+}
+
+TEST_F(BucketSelectConverterTest, TimestampMillisPrecision) {
+    // TIMESTAMP with millisecond precision (compact storage, precision=3)
+    int32_t num_buckets = 10;
+    Timestamp ts = Timestamp::FromEpochMillis(1700000000000L);
+    Literal lit(ts);
+    auto predicate = PredicateBuilder::Equal(0, "ts", FieldType::TIMESTAMP, 
lit);
+
+    auto arrow_type = arrow::timestamp(arrow::TimeUnit::MILLI);
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"ts"}, {arrow_type},
+                                          BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_TRUE(result.has_value());
+
+    // Verify: precision=3 uses compact WriteTimestamp
+    auto row = BinaryRowGenerator::GenerateRow({TimestampType(ts, 3)}, 
pool_.get());
+    DefaultBucketFunction func;
+    ASSERT_EQ(func.Bucket(row, num_buckets), result.value());
+}
+
+TEST_F(BucketSelectConverterTest, TimestampMicrosPrecision) {
+    // TIMESTAMP with microsecond precision (non-compact storage, precision=6)
+    int32_t num_buckets = 10;
+    Timestamp ts(1700000000000L, 123456);
+    Literal lit(ts);
+    auto predicate = PredicateBuilder::Equal(0, "ts", FieldType::TIMESTAMP, 
lit);
+
+    auto arrow_type = arrow::timestamp(arrow::TimeUnit::MICRO);
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"ts"}, {arrow_type},
+                                          BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_TRUE(result.has_value());
+
+    // Verify: precision=6 uses non-compact WriteTimestamp (different layout 
than precision=3)
+    auto row = BinaryRowGenerator::GenerateRow({TimestampType(ts, 6)}, 
pool_.get());
+    DefaultBucketFunction func;
+    ASSERT_EQ(func.Bucket(row, num_buckets), result.value());
+}
+
+TEST_F(BucketSelectConverterTest, DecimalKey) {
+    int32_t num_buckets = 10;
+    Decimal dec = Decimal::FromUnscaledLong(12345L, 10, 2);
+    Literal lit(dec);
+    auto predicate = PredicateBuilder::Equal(0, "amount", FieldType::DECIMAL, 
lit);
+
+    auto arrow_type = arrow::decimal128(10, 2);
+    ASSERT_OK_AND_ASSIGN(auto result, BucketSelectConverter::Convert(
+                                          predicate, {"amount"}, {arrow_type},
+                                          BucketFunctionType::DEFAULT, 
num_buckets, pool_.get()));
+    ASSERT_TRUE(result.has_value());
+
+    // Verify
+    auto row = BinaryRowGenerator::GenerateRow({dec}, pool_.get());
+    DefaultBucketFunction func;
+    ASSERT_EQ(func.Bucket(row, num_buckets), result.value());
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/bucket/default_bucket_function.h 
b/src/paimon/core/bucket/default_bucket_function.h
new file mode 100644
index 0000000..4b8f205
--- /dev/null
+++ b/src/paimon/core/bucket/default_bucket_function.h
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cmath>
+#include <cstdint>
+
+#include "paimon/common/data/binary_row.h"
+#include "paimon/core/bucket/bucket_function.h"
+
+namespace paimon {
+
+/// Default bucket function that uses the hash code of the row to determine 
the bucket.
+class DefaultBucketFunction : public BucketFunction {
+ public:
+    int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const override {
+        return std::abs(row.HashCode() % num_buckets);
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/default_bucket_function_test.cpp 
b/src/paimon/core/bucket/default_bucket_function_test.cpp
new file mode 100644
index 0000000..16d18ca
--- /dev/null
+++ b/src/paimon/core/bucket/default_bucket_function_test.cpp
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/bucket/default_bucket_function.h"
+
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(DefaultBucketFunctionTest, TestBasicHashMod) {
+    auto pool = GetDefaultPool();
+    DefaultBucketFunction func;
+
+    // Create a row with a single INT field
+    BinaryRow row(1);
+    BinaryRowWriter writer(&row, 0, pool.get());
+    writer.WriteInt(0, 42);
+    writer.Complete();
+
+    int32_t num_buckets = 5;
+    int32_t bucket = func.Bucket(row, num_buckets);
+    ASSERT_GE(bucket, 0);
+    ASSERT_LT(bucket, num_buckets);
+
+    // Verify it matches the expected formula: abs(hashCode % numBuckets)
+    int32_t expected = std::abs(row.HashCode() % num_buckets);
+    ASSERT_EQ(expected, bucket);
+}
+
+TEST(DefaultBucketFunctionTest, TestDifferentNumBuckets) {
+    auto pool = GetDefaultPool();
+    DefaultBucketFunction func;
+
+    BinaryRow row(1);
+    BinaryRowWriter writer(&row, 0, pool.get());
+    writer.WriteInt(0, 100);
+    writer.Complete();
+
+    for (int32_t num_buckets = 1; num_buckets <= 10; num_buckets++) {
+        int32_t bucket = func.Bucket(row, num_buckets);
+        ASSERT_GE(bucket, 0);
+        ASSERT_LT(bucket, num_buckets);
+        ASSERT_EQ(std::abs(row.HashCode() % num_buckets), bucket);
+    }
+}
+
+TEST(DefaultBucketFunctionTest, TestMultiFieldRow) {
+    auto pool = GetDefaultPool();
+    DefaultBucketFunction func;
+
+    BinaryRow row(3);
+    BinaryRowWriter writer(&row, 0, pool.get());
+    writer.WriteInt(0, 1);
+    writer.WriteLong(1, 2);
+    writer.WriteInt(2, 3);
+    writer.Complete();
+
+    int32_t num_buckets = 7;
+    int32_t bucket = func.Bucket(row, num_buckets);
+    ASSERT_GE(bucket, 0);
+    ASSERT_LT(bucket, num_buckets);
+    ASSERT_EQ(std::abs(row.HashCode() % num_buckets), bucket);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/bucket/hive_bucket_function.cpp 
b/src/paimon/core/bucket/hive_bucket_function.cpp
new file mode 100644
index 0000000..72f0f00
--- /dev/null
+++ b/src/paimon/core/bucket/hive_bucket_function.cpp
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/bucket/hive_bucket_function.h"
+
+#include <cassert>
+#include <cmath>
+#include <cstring>
+#include <limits>
+
+#include "fmt/format.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/core/bucket/hive_hasher.h"
+#include "paimon/status.h"
+
+namespace paimon {
+HiveBucketFunction::HiveBucketFunction(const std::vector<HiveFieldInfo>& 
field_infos)
+    : field_infos_(field_infos) {}
+
+Result<std::unique_ptr<HiveBucketFunction>> HiveBucketFunction::Create(
+    const std::vector<FieldType>& field_types) {
+    std::vector<HiveFieldInfo> field_infos;
+    field_infos.reserve(field_types.size());
+    for (const auto& type : field_types) {
+        field_infos.emplace_back(type);
+    }
+    return Create(field_infos);
+}
+
+Result<std::unique_ptr<HiveBucketFunction>> HiveBucketFunction::Create(
+    const std::vector<HiveFieldInfo>& field_infos) {
+    if (field_infos.empty()) {
+        return Status::Invalid("HiveBucketFunction requires at least one 
field");
+    }
+    for (const auto& info : field_infos) {
+        switch (info.type) {
+            case FieldType::BOOLEAN:
+            case FieldType::TINYINT:
+            case FieldType::SMALLINT:
+            case FieldType::INT:
+            case FieldType::BIGINT:
+            case FieldType::FLOAT:
+            case FieldType::DOUBLE:
+            case FieldType::STRING:
+            case FieldType::BINARY:
+            case FieldType::DECIMAL:
+            case FieldType::DATE:
+                break;
+            default:
+                return Status::Invalid(fmt::format("Unsupported type as Hive 
bucket key type: {}",
+                                                   
FieldTypeUtils::FieldTypeToString(info.type)));
+        }
+    }
+    return std::unique_ptr<HiveBucketFunction>(new 
HiveBucketFunction(field_infos));
+}
+
+int32_t HiveBucketFunction::Bucket(const BinaryRow& row, int32_t num_buckets) 
const {
+    static constexpr int32_t SEED = 0;
+    int32_t hash = SEED;
+    for (int32_t i = 0; i < row.GetFieldCount(); i++) {
+        hash = (31 * hash) + ComputeHash(row, i);
+    }
+    return Mod(hash & std::numeric_limits<int32_t>::max(), num_buckets);
+}
+
+int32_t HiveBucketFunction::ComputeHash(const BinaryRow& row, int32_t 
field_index) const {
+    if (row.IsNullAt(field_index)) {
+        return 0;
+    }
+
+    const auto& info = field_infos_[field_index];
+    switch (info.type) {
+        case FieldType::BOOLEAN:
+            return HiveHasher::HashInt(row.GetBoolean(field_index) ? 1 : 0);
+        case FieldType::TINYINT:
+            return 
HiveHasher::HashInt(static_cast<int32_t>(row.GetByte(field_index)));
+        case FieldType::SMALLINT:
+            return 
HiveHasher::HashInt(static_cast<int32_t>(row.GetShort(field_index)));
+        case FieldType::INT:
+        case FieldType::DATE:
+            return HiveHasher::HashInt(row.GetInt(field_index));
+        case FieldType::BIGINT:
+            return HiveHasher::HashLong(row.GetLong(field_index));
+        case FieldType::FLOAT: {
+            float float_value = row.GetFloat(field_index);
+            int32_t bits;
+            if (float_value == -0.0f) {
+                bits = 0;
+            } else {
+                std::memcpy(&bits, &float_value, sizeof(bits));
+            }
+            return HiveHasher::HashInt(bits);
+        }
+        case FieldType::DOUBLE: {
+            double double_value = row.GetDouble(field_index);
+            int64_t bits;
+            if (double_value == -0.0) {
+                bits = 0L;
+            } else {
+                std::memcpy(&bits, &double_value, sizeof(bits));
+            }
+            return HiveHasher::HashLong(bits);
+        }
+        case FieldType::STRING: {
+            std::string_view sv = row.GetStringView(field_index);
+            return HiveHasher::HashBytes(sv.data(), 
static_cast<int32_t>(sv.size()));
+        }
+        case FieldType::BINARY: {
+            std::string_view sv = row.GetStringView(field_index);
+            return HiveHasher::HashBytes(sv.data(), 
static_cast<int32_t>(sv.size()));
+        }
+        case FieldType::DECIMAL: {
+            Decimal decimal = row.GetDecimal(field_index, info.precision, 
info.scale);
+            return HiveHasher::HashDecimal(decimal);
+        }
+        default:
+            // This should never happen since Create() validates the types.
+            assert(false);
+            return 0;
+    }
+}
+
+int32_t HiveBucketFunction::Mod(int32_t value, int32_t divisor) {
+    int32_t remainder = value % divisor;
+    if (remainder < 0) {
+        return (remainder + divisor) % divisor;
+    }
+    return remainder;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/hive_bucket_function.h 
b/src/paimon/core/bucket/hive_bucket_function.h
new file mode 100644
index 0000000..82f0c40
--- /dev/null
+++ b/src/paimon/core/bucket/hive_bucket_function.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "paimon/bucket/bucket_function_type.h"
+#include "paimon/core/bucket/bucket_function.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Hive-compatible bucket function.
+/// This implements the same bucket assignment logic as Hive, using Hive's 
hash functions
+/// to ensure compatibility between Paimon and Hive bucketed tables.
+///
+/// The hash is computed by iterating over all fields in the row:
+///   hash = (31 * hash) + computeHash(field_value)
+/// Then the bucket is: (hash & INT32_MAX) % numBuckets
+class HiveBucketFunction : public BucketFunction {
+ public:
+    /// Create a HiveBucketFunction with the given field types.
+    /// @param field_types The types of all fields in the bucket key row.
+    /// @return A Result containing the HiveBucketFunction or an error status.
+    static Result<std::unique_ptr<HiveBucketFunction>> Create(
+        const std::vector<FieldType>& field_types);
+
+    /// Create a HiveBucketFunction with detailed field info (including 
decimal precision/scale).
+    /// @param field_infos The detailed type info of all fields in the bucket 
key row.
+    /// @return A Result containing the HiveBucketFunction or an error status.
+    static Result<std::unique_ptr<HiveBucketFunction>> Create(
+        const std::vector<HiveFieldInfo>& field_infos);
+
+    int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const override;
+
+ private:
+    explicit HiveBucketFunction(const std::vector<HiveFieldInfo>& field_infos);
+
+    /// Compute the Hive hash for a single field value.
+    int32_t ComputeHash(const BinaryRow& row, int32_t field_index) const;
+
+    /// Mod operation that always returns non-negative result.
+    static int32_t Mod(int32_t value, int32_t divisor);
+
+    std::vector<HiveFieldInfo> field_infos_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/hive_bucket_function_test.cpp 
b/src/paimon/core/bucket/hive_bucket_function_test.cpp
new file mode 100644
index 0000000..73a94c5
--- /dev/null
+++ b/src/paimon/core/bucket/hive_bucket_function_test.cpp
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/bucket/hive_bucket_function.h"
+
+#include <limits>
+
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/core/bucket/hive_hasher.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class HiveBucketFunctionTest : public ::testing::Test {
+ protected:
+    /// Helper to create a BinaryRow with INT, STRING, BINARY, DECIMAL(10,4) 
fields.
+    /// Matches the Java test: toBinaryRow(rowType, 7, "hello", {1,2,3}, 
Decimal("12.3400", 10, 4))
+    BinaryRow CreateMixedRow(int32_t int_val, const std::string& str_val,
+                             const std::vector<char>& binary_val, int64_t 
decimal_unscaled,
+                             int32_t decimal_precision, int32_t decimal_scale) 
{
+        auto pool = GetDefaultPool();
+        BinaryRow row(4);
+        BinaryRowWriter writer(&row, 0, pool.get());
+
+        // Field 0: INT
+        writer.WriteInt(0, int_val);
+
+        // Field 1: STRING
+        writer.WriteStringView(1, std::string_view{str_val});
+
+        // Field 2: BINARY
+        writer.WriteStringView(2, std::string_view(binary_val.data(), 
binary_val.size()));
+
+        // Field 3: DECIMAL (compact, precision <= 18)
+        writer.WriteDecimal(
+            3, Decimal::FromUnscaledLong(decimal_unscaled, decimal_precision, 
decimal_scale),
+            decimal_precision);
+
+        writer.Complete();
+        return row;
+    }
+
+    /// Helper to create a BinaryRow with all null fields.
+    BinaryRow CreateNullRow(int32_t num_fields) {
+        auto pool = GetDefaultPool();
+        BinaryRow row(num_fields);
+        BinaryRowWriter writer(&row, 0, pool.get());
+        for (int32_t i = 0; i < num_fields; i++) {
+            writer.SetNullAt(i);
+        }
+        writer.Complete();
+        return row;
+    }
+
+    BinaryRow CreateIntRow(int32_t value) {
+        auto pool = GetDefaultPool();
+        return BinaryRowGenerator::GenerateRow({value}, pool.get());
+    }
+
+    BinaryRow CreateBooleanRow(bool value) {
+        auto pool = GetDefaultPool();
+        return BinaryRowGenerator::GenerateRow({value}, pool.get());
+    }
+
+    BinaryRow CreateLongRow(int64_t value) {
+        auto pool = GetDefaultPool();
+        return BinaryRowGenerator::GenerateRow({value}, pool.get());
+    }
+
+    BinaryRow CreateFloatRow(float value) {
+        auto pool = GetDefaultPool();
+        return BinaryRowGenerator::GenerateRow({value}, pool.get());
+    }
+
+    BinaryRow CreateDoubleRow(double value) {
+        auto pool = GetDefaultPool();
+        return BinaryRowGenerator::GenerateRow({value}, pool.get());
+    }
+
+    BinaryRow CreateStringRow(const std::string& value) {
+        auto pool = GetDefaultPool();
+        return BinaryRowGenerator::GenerateRow({value}, pool.get());
+    }
+};
+
+/// Test matching Java: testHiveBucketFunction
+/// RowType: INT, STRING, BYTES, DECIMAL(10,4)
+/// Values: 7, "hello", {1,2,3}, Decimal("12.3400", 10, 4)
+TEST_F(HiveBucketFunctionTest, TestHiveBucketFunction) {
+    std::vector<HiveFieldInfo> field_infos = {
+        HiveFieldInfo(FieldType::INT),
+        HiveFieldInfo(FieldType::STRING),
+        HiveFieldInfo(FieldType::BINARY),
+        HiveFieldInfo(FieldType::DECIMAL, 10, 4),
+    };
+    ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_infos));
+
+    // Decimal("12.3400", 10, 4) => unscaled = 123400
+    BinaryRow row = CreateMixedRow(7, "hello", {1, 2, 3}, 123400, 10, 4);
+
+    // Verify individual hash components:
+    // HiveHasher.hashBytes("hello") = 99162322
+    ASSERT_EQ(99162322, HiveHasher::HashBytes("hello", 5));
+    // HiveHasher.hashBytes({1,2,3}) = 1026
+    ASSERT_EQ(1026, HiveHasher::HashBytes("\x01\x02\x03", 3));
+    // BigDecimal("12.34").hashCode() = 1234 * 31 + 2 = 38256
+    // (After normalizing "12.3400" -> "12.34", unscaled=1234, scale=2)
+    ASSERT_EQ(38256, HiveHasher::HashDecimal(Decimal::FromUnscaledLong(123400, 
10, 4)));
+
+    // expectedHash = 31*(31*(31*7 + 99162322) + 1026) + 38256 = 805989529 
(with int32 overflow)
+    // bucket = (805989529 & INT32_MAX) % 8 = 1
+    ASSERT_EQ(1, func->Bucket(row, 8));
+}
+
+/// Test matching Java: testHiveBucketFunctionWithNulls
+TEST_F(HiveBucketFunctionTest, TestHiveBucketFunctionWithNulls) {
+    std::vector<FieldType> field_types = {FieldType::INT, FieldType::STRING};
+    ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types));
+
+    BinaryRow row = CreateNullRow(2);
+
+    // All nulls => hash = 0, bucket = 0
+    ASSERT_EQ(0, func->Bucket(row, 4));
+}
+
+/// Test unsupported type returns error on Create
+TEST_F(HiveBucketFunctionTest, TestUnsupportedType) {
+    // TIMESTAMP type should fail
+    std::vector<FieldType> field_types = {FieldType::TIMESTAMP};
+    auto result = HiveBucketFunction::Create(field_types);
+    ASSERT_NOK_WITH_MSG(result.status(), "Unsupported type");
+}
+
+/// Test empty field types returns error
+TEST_F(HiveBucketFunctionTest, TestEmptyFieldTypes) {
+    std::vector<FieldType> field_types = {};
+    auto result = HiveBucketFunction::Create(field_types);
+    ASSERT_NOK_WITH_MSG(result.status(), "at least one field");
+}
+
+/// Test single INT field
+TEST_F(HiveBucketFunctionTest, TestSingleIntField) {
+    std::vector<FieldType> field_types = {FieldType::INT};
+    ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types));
+
+    // hash = 31*0 + 42 = 42, bucket = (42 & INT32_MAX) % 5 = 2
+    ASSERT_EQ(2, func->Bucket(CreateIntRow(42), 5));
+    // hash = 31*0 + 0 = 0, bucket = 0
+    ASSERT_EQ(0, func->Bucket(CreateIntRow(0), 5));
+}
+
+/// Test BOOLEAN field
+TEST_F(HiveBucketFunctionTest, TestBooleanField) {
+    std::vector<FieldType> field_types = {FieldType::BOOLEAN};
+    ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types));
+
+    // true => hashInt(1) = 1, bucket = 1 % 4 = 1
+    ASSERT_EQ(1, func->Bucket(CreateBooleanRow(true), 4));
+    // false => hashInt(0) = 0, bucket = 0
+    ASSERT_EQ(0, func->Bucket(CreateBooleanRow(false), 4));
+}
+
+/// Test BIGINT field
+TEST_F(HiveBucketFunctionTest, TestBigintField) {
+    std::vector<FieldType> field_types = {FieldType::BIGINT};
+    ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types));
+
+    // Java Long.hashCode(100L) = (int)(100 ^ (100 >>> 32)) = 100
+    // bucket = 100 % 7 = 2
+    ASSERT_EQ(2, func->Bucket(CreateLongRow(100L), 7));
+}
+
+/// Test FLOAT field with -0.0f
+TEST_F(HiveBucketFunctionTest, TestFloatNegativeZero) {
+    std::vector<FieldType> field_types = {FieldType::FLOAT};
+    ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types));
+
+    // -0.0f should be treated as 0 => hashInt(0) = 0
+    ASSERT_EQ(func->Bucket(CreateFloatRow(0.0f), 5), 
func->Bucket(CreateFloatRow(-0.0f), 5));
+}
+
+/// Test DOUBLE field with -0.0
+TEST_F(HiveBucketFunctionTest, TestDoubleNegativeZero) {
+    std::vector<FieldType> field_types = {FieldType::DOUBLE};
+    ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types));
+
+    // -0.0 should be treated as 0L => hashLong(0) = 0
+    ASSERT_EQ(func->Bucket(CreateDoubleRow(0.0), 5), 
func->Bucket(CreateDoubleRow(-0.0), 5));
+}
+
+/// Test STRING field
+TEST_F(HiveBucketFunctionTest, TestStringField) {
+    std::vector<FieldType> field_types = {FieldType::STRING};
+    ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types));
+
+    // hashBytes("hello") = 99162322
+    // bucket = (99162322 & INT32_MAX) % 10 = 99162322 % 10 = 2
+    ASSERT_EQ(2, func->Bucket(CreateStringRow("hello"), 10));
+}
+
+/// Test different num_buckets produce valid results
+TEST_F(HiveBucketFunctionTest, TestDifferentNumBuckets) {
+    std::vector<FieldType> field_types = {FieldType::INT};
+    ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types));
+
+    for (int32_t num_buckets = 1; num_buckets <= 20; num_buckets++) {
+        int32_t bucket = func->Bucket(CreateIntRow(12345), num_buckets);
+        ASSERT_GE(bucket, 0);
+        ASSERT_LT(bucket, num_buckets);
+    }
+}
+
+/// Test compatibility with Java HiveBucketFunction across multiple data types.
+/// Expected values are computed from the Java implementation:
+///   hash = 0 (seed)
+///   for each field: hash = 31 * hash + computeHash(field)
+///   bucket = (hash & INT32_MAX) % numBuckets
+///
+/// Java computeHash per type:
+///   BOOLEAN: hashInt(value ? 1 : 0)
+///   INT/DATE: hashInt(value)  [identity]
+///   BIGINT: hashLong(value) = (int)(value ^ (value >>> 32))
+///   FLOAT: hashInt(Float.floatToIntBits(value)), -0.0f treated as 0
+///   DOUBLE: hashLong(Double.doubleToLongBits(value)), -0.0 treated as 0L
+///   STRING/BINARY: hashBytes(bytes)
+///   DECIMAL: BigDecimal.hashCode() after normalization
+TEST_F(HiveBucketFunctionTest, TestCompatibleWithJava) {
+    auto pool = GetDefaultPool();
+    const int32_t num_buckets = 128;
+
+    // Case 1: Single INT field with various values
+    // Java: hash = 31*0 + hashInt(v) = v
+    // bucket = (v & INT32_MAX) % 128
+    {
+        std::vector<FieldType> field_types = {FieldType::INT};
+        ASSERT_OK_AND_ASSIGN(auto func, 
HiveBucketFunction::Create(field_types));
+
+        // hashInt(0) = 0, bucket = 0
+        ASSERT_EQ(0, func->Bucket(CreateIntRow(0), num_buckets));
+        // hashInt(1) = 1, bucket = 1
+        ASSERT_EQ(1, func->Bucket(CreateIntRow(1), num_buckets));
+        // hashInt(127) = 127, bucket = 127
+        ASSERT_EQ(127, func->Bucket(CreateIntRow(127), num_buckets));
+        // hashInt(128) = 128, bucket = 0
+        ASSERT_EQ(0, func->Bucket(CreateIntRow(128), num_buckets));
+        // hashInt(-1) = -1, (-1 & INT32_MAX) = 2147483647, 2147483647 % 128 = 
127
+        ASSERT_EQ(127, func->Bucket(CreateIntRow(-1), num_buckets));
+        // hashInt(INT32_MIN) = -2147483648, (-2147483648 & INT32_MAX) = 0, 
bucket = 0
+        ASSERT_EQ(0, 
func->Bucket(CreateIntRow(std::numeric_limits<int32_t>::min()), num_buckets));
+        // hashInt(INT32_MAX) = 2147483647, (2147483647 & INT32_MAX) = 
2147483647, % 128 = 127
+        ASSERT_EQ(127,
+                  
func->Bucket(CreateIntRow(std::numeric_limits<int32_t>::max()), num_buckets));
+    }
+
+    // Case 2: Single BOOLEAN field
+    // Java: hashInt(true ? 1 : 0)
+    {
+        std::vector<FieldType> field_types = {FieldType::BOOLEAN};
+        ASSERT_OK_AND_ASSIGN(auto func, 
HiveBucketFunction::Create(field_types));
+
+        // true => hashInt(1) = 1, bucket = 1 % 128 = 1
+        ASSERT_EQ(1, func->Bucket(CreateBooleanRow(true), num_buckets));
+        // false => hashInt(0) = 0, bucket = 0
+        ASSERT_EQ(0, func->Bucket(CreateBooleanRow(false), num_buckets));
+    }
+
+    // Case 3: Single BIGINT field
+    // Java: hashLong(v) = (int)(v ^ (v >>> 32))
+    {
+        std::vector<FieldType> field_types = {FieldType::BIGINT};
+        ASSERT_OK_AND_ASSIGN(auto func, 
HiveBucketFunction::Create(field_types));
+
+        // hashLong(0) = 0, bucket = 0
+        ASSERT_EQ(0, func->Bucket(CreateLongRow(0L), num_buckets));
+        // hashLong(100) = (int)(100 ^ 0) = 100, bucket = 100 % 128 = 100
+        ASSERT_EQ(100, func->Bucket(CreateLongRow(100L), num_buckets));
+        // hashLong(4294967296L) = (int)(4294967296 ^ 1) = 1, bucket = 1
+        // 4294967296L = 0x100000000, >>> 32 = 1, xor = 0x100000001, (int) = 1
+        ASSERT_EQ(1, func->Bucket(CreateLongRow(4294967296L), num_buckets));
+        // hashLong(LONG_MAX) = (int)(0x7FFFFFFFFFFFFFFF ^ 0x7FFFFFFF) = 
(int)0x7FFFFF80000000
+        // = (int)(0x7FFFFFFF80000000) => low 32 bits = 0x80000000 = 
-2147483648
+        // Actually: 0x7FFFFFFFFFFFFFFF ^ (0x7FFFFFFFFFFFFFFF >>> 32)
+        //         = 0x7FFFFFFFFFFFFFFF ^ 0x7FFFFFFF
+        //         = 0x7FFFFFFF80000000
+        // (int) = 0x80000000 = -2147483648
+        // (-2147483648 & INT32_MAX) = 0, bucket = 0
+        ASSERT_EQ(0, 
func->Bucket(CreateLongRow(std::numeric_limits<int64_t>::max()), num_buckets));
+        // hashLong(-1) = (int)(-1 ^ (0xFFFFFFFFFFFFFFFF >>> 32))
+        //              = (int)(-1 ^ 0xFFFFFFFF) = (int)(0) = 0
+        ASSERT_EQ(0, func->Bucket(CreateLongRow(-1L), num_buckets));
+    }
+
+    // Case 4: Single FLOAT field
+    // Java: hashInt(Float.floatToIntBits(v)), -0.0f => 0
+    {
+        std::vector<FieldType> field_types = {FieldType::FLOAT};
+        ASSERT_OK_AND_ASSIGN(auto func, 
HiveBucketFunction::Create(field_types));
+
+        // 0.0f => bits = 0, hashInt(0) = 0
+        ASSERT_EQ(0, func->Bucket(CreateFloatRow(0.0f), num_buckets));
+        // -0.0f => treated as 0, hashInt(0) = 0
+        ASSERT_EQ(0, func->Bucket(CreateFloatRow(-0.0f), num_buckets));
+        // 1.0f => Float.floatToIntBits(1.0f) = 0x3F800000 = 1065353216
+        // 1065353216 & INT32_MAX = 1065353216, % 128 = 0
+        ASSERT_EQ(0, func->Bucket(CreateFloatRow(1.0f), num_buckets));
+        // -1.0f => Float.floatToIntBits(-1.0f) = 0xBF800000 = -1082130432
+        // (-1082130432 & INT32_MAX) = 1065353216, % 128 = 0
+        ASSERT_EQ(0, func->Bucket(CreateFloatRow(-1.0f), num_buckets));
+    }
+
+    // Case 5: Single DOUBLE field
+    // Java: hashLong(Double.doubleToLongBits(v)), -0.0 => 0L
+    {
+        std::vector<FieldType> field_types = {FieldType::DOUBLE};
+        ASSERT_OK_AND_ASSIGN(auto func, 
HiveBucketFunction::Create(field_types));
+
+        // 0.0 => bits = 0L, hashLong(0) = 0
+        ASSERT_EQ(0, func->Bucket(CreateDoubleRow(0.0), num_buckets));
+        // -0.0 => treated as 0L, hashLong(0) = 0
+        ASSERT_EQ(0, func->Bucket(CreateDoubleRow(-0.0), num_buckets));
+        // 1.0 => Double.doubleToLongBits(1.0) = 0x3FF0000000000000 = 
4607182418800017408
+        // hashLong = (int)(4607182418800017408 ^ (4607182418800017408 >>> 32))
+        //          = (int)(0x3FF0000000000000 ^ 0x3FF00000)
+        //          = (int)(0x3FF000003FF00000)
+        //          = (int)(0x3FF00000) = 1072693248
+        // 1072693248 % 128 = 0
+        ASSERT_EQ(0, func->Bucket(CreateDoubleRow(1.0), num_buckets));
+    }
+
+    // Case 6: Single STRING field
+    // Java: hashBytes(bytes)
+    {
+        std::vector<FieldType> field_types = {FieldType::STRING};
+        ASSERT_OK_AND_ASSIGN(auto func, 
HiveBucketFunction::Create(field_types));
+
+        // hashBytes("hello") = 99162322 (verified in TestHiveBucketFunction)
+        // 99162322 & INT32_MAX = 99162322, % 128 = 82
+        ASSERT_EQ(82, func->Bucket(CreateStringRow("hello"), num_buckets));
+        // hashBytes("") = 0, bucket = 0
+        ASSERT_EQ(0, func->Bucket(CreateStringRow(""), num_buckets));
+        // hashBytes("a") = 97, bucket = 97
+        ASSERT_EQ(97, func->Bucket(CreateStringRow("a"), num_buckets));
+    }
+
+    // Case 7: Single DATE field (same as INT)
+    // Java: hashInt(daysSinceEpoch)
+    {
+        std::vector<HiveFieldInfo> field_infos = 
{HiveFieldInfo(FieldType::DATE)};
+        ASSERT_OK_AND_ASSIGN(auto func, 
HiveBucketFunction::Create(field_infos));
+
+        // DATE is stored as int32 days since epoch, hashed same as INT
+        // date = 2000 (days), hashInt(2000) = 2000, 2000 % 128 = 80
+        ASSERT_EQ(80, func->Bucket(CreateIntRow(2000), num_buckets));
+    }
+
+    // Case 8: Multi-field row (INT, STRING, BINARY, DECIMAL)
+    // This is the same as TestHiveBucketFunction but with num_buckets=128
+    // Java step-by-step (all arithmetic in int32 with overflow):
+    //   hash = 0
+    //   hash = 31*0 + hashInt(7) = 7
+    //   hash = 31*7 + hashBytes("hello") = 217 + 99162322 = 99162539
+    //   hash = 31*99162539 + hashBytes({1,2,3}) = int32(-1220928587) + 1026 = 
-1220927561
+    //   hash = 31*(-1220927561) + hashDecimal(12.3400) = int32(805951273) + 
38256 = 805989529
+    //   bucket = (805989529 & INT32_MAX) % 128 = 25
+    {
+        std::vector<HiveFieldInfo> field_infos = {
+            HiveFieldInfo(FieldType::INT),
+            HiveFieldInfo(FieldType::STRING),
+            HiveFieldInfo(FieldType::BINARY),
+            HiveFieldInfo(FieldType::DECIMAL, 10, 4),
+        };
+        ASSERT_OK_AND_ASSIGN(auto func, 
HiveBucketFunction::Create(field_infos));
+
+        BinaryRow row = CreateMixedRow(7, "hello", {1, 2, 3}, 123400, 10, 4);
+        // Already verified: func->Bucket(row, 8) == 1
+        ASSERT_EQ(1, func->Bucket(row, 8));
+        // With 128 buckets: 805989529 % 128 = 25
+        ASSERT_EQ(25, func->Bucket(row, num_buckets));
+    }
+
+    // Case 9: All-null row
+    // Java: all nulls => hash = 0, bucket = 0
+    {
+        std::vector<HiveFieldInfo> field_infos = {
+            HiveFieldInfo(FieldType::INT),
+            HiveFieldInfo(FieldType::STRING),
+            HiveFieldInfo(FieldType::BIGINT),
+        };
+        ASSERT_OK_AND_ASSIGN(auto func, 
HiveBucketFunction::Create(field_infos));
+
+        BinaryRow row = CreateNullRow(3);
+        ASSERT_EQ(0, func->Bucket(row, num_buckets));
+    }
+
+    // Case 10: Multi-field row with BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, 
STRING
+    // Java step-by-step:
+    //   field 0: BOOLEAN true => hashInt(1) = 1
+    //   field 1: INT 42 => hashInt(42) = 42
+    //   field 2: BIGINT 100 => hashLong(100) = 100
+    //   field 3: FLOAT 0.0f => hashInt(0) = 0
+    //   field 4: DOUBLE 0.0 => hashLong(0) = 0
+    //   field 5: STRING "a" => hashBytes("a") = 97
+    //
+    //   hash = 0
+    //   hash = 31*0 + 1 = 1
+    //   hash = 31*1 + 42 = 73
+    //   hash = 31*73 + 100 = 2363
+    //   hash = 31*2363 + 0 = 73253
+    //   hash = 31*73253 + 0 = 2270843
+    //   hash = 31*2270843 + 97 = 70396230
+    //   bucket = (70396230 & INT32_MAX) % 128 = 70396230 % 128 = 70
+    {
+        std::vector<HiveFieldInfo> field_infos = {
+            HiveFieldInfo(FieldType::BOOLEAN), HiveFieldInfo(FieldType::INT),
+            HiveFieldInfo(FieldType::BIGINT),  HiveFieldInfo(FieldType::FLOAT),
+            HiveFieldInfo(FieldType::DOUBLE),  
HiveFieldInfo(FieldType::STRING),
+        };
+        ASSERT_OK_AND_ASSIGN(auto func, 
HiveBucketFunction::Create(field_infos));
+
+        BinaryRow row(6);
+        BinaryRowWriter writer(&row, 0, pool.get());
+        writer.WriteBoolean(0, true);
+        writer.WriteInt(1, 42);
+        writer.WriteLong(2, 100L);
+        writer.WriteFloat(3, 0.0f);
+        writer.WriteDouble(4, 0.0);
+        writer.WriteStringView(5, std::string_view("a"));
+        writer.Complete();
+
+        ASSERT_EQ(70, func->Bucket(row, num_buckets));
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/bucket/hive_hasher.h 
b/src/paimon/core/bucket/hive_hasher.h
new file mode 100644
index 0000000..d86be40
--- /dev/null
+++ b/src/paimon/core/bucket/hive_hasher.h
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <cstring>
+
+#include "paimon/data/decimal.h"
+
+namespace paimon {
+
+/// Hive-compatible hash utility functions.
+/// This class provides hash functions that are compatible with Hive's 
ObjectInspectorUtils
+/// hash implementation, ensuring consistent bucket assignment between Paimon 
C++ and Java.
+class HiveHasher {
+ public:
+    /// Hash an int value (identity function, same as Hive).
+    static int32_t HashInt(int32_t input) {
+        return input;
+    }
+
+    /// Hash a long value (same as Java's Long.hashCode).
+    static int32_t HashLong(int64_t input) {
+        return static_cast<int32_t>(input ^ (static_cast<uint64_t>(input) >> 
32));
+    }
+
+    /// Hash a byte array.
+    static int32_t HashBytes(const char* bytes, int32_t length) {
+        int32_t result = 0;
+        for (int32_t i = 0; i < length; i++) {
+            result = (result * 31) + static_cast<int32_t>(bytes[i]);
+        }
+        return result;
+    }
+
+    /// Normalize a Decimal value for Hive-compatible hashing.
+    /// This implements the same logic as HiveHasher.normalizeDecimal in Java.
+    ///
+    /// The normalization process:
+    /// 1. Strip trailing zeros
+    /// 2. Check if integer digits exceed max precision (38)
+    /// 3. Limit scale to min(38, min(38 - intDigits, currentScale))
+    /// 4. Round if necessary using HALF_UP
+    ///
+    /// @param decimal The decimal value to normalize.
+    /// @return The hash code of the normalized decimal, computed as Java 
BigDecimal.hashCode().
+    static int32_t HashDecimal(const Decimal& decimal) {
+        // Java BigDecimal.hashCode() = unscaledValue.intValue() * 31 + scale
+        // For compact decimals (precision <= 18), we can use the long value 
directly.
+        // For non-compact decimals, we need to handle the 128-bit value.
+
+        // First normalize: strip trailing zeros and limit scale
+        int32_t scale = decimal.Scale();
+        auto value = decimal.Value();
+
+        // Strip trailing zeros
+        if (value == 0) {
+            // BigDecimal.ZERO.hashCode() = 0 * 31 + 0 = 0
+            return 0;
+        }
+
+        // Strip trailing zeros by dividing by 10 while remainder is 0
+        while (scale > 0 && value != 0) {
+            auto quotient = value / 10;
+            auto remainder = value - quotient * 10;
+            if (remainder != 0) {
+                break;
+            }
+            value = quotient;
+            scale--;
+        }
+
+        // After stripping, check if value is zero
+        if (value == 0) {
+            return 0;
+        }
+
+        // Count integer digits
+        auto abs_value = value < 0 ? -value : value;
+        int32_t total_digits = 0;
+        auto temp = abs_value;
+        while (temp > 0) {
+            temp /= 10;
+            total_digits++;
+        }
+        int32_t int_digits = total_digits - scale;
+
+        if (int_digits > HIVE_DECIMAL_MAX_PRECISION) {
+            // Overflow, return 0 (null equivalent)
+            return 0;
+        }
+
+        int32_t max_scale = HIVE_DECIMAL_MAX_SCALE;
+        if (HIVE_DECIMAL_MAX_PRECISION - int_digits < max_scale) {
+            max_scale = HIVE_DECIMAL_MAX_PRECISION - int_digits;
+        }
+        if (scale < max_scale) {
+            max_scale = scale;
+        }
+
+        if (scale > max_scale) {
+            // Need to round: scale down with HALF_UP rounding
+            int32_t scale_diff = scale - max_scale;
+            for (int32_t i = 0; i < scale_diff; i++) {
+                auto quotient = value / 10;
+                auto remainder = value - quotient * 10;
+                if (remainder < 0) remainder = -remainder;
+                if (remainder >= 5) {
+                    value = quotient + (value < 0 ? -1 : 1);
+                } else {
+                    value = quotient;
+                }
+            }
+            scale = max_scale;
+
+            // Strip trailing zeros again after rounding
+            while (scale > 0 && value != 0) {
+                auto quotient = value / 10;
+                auto remainder = value - quotient * 10;
+                if (remainder != 0) {
+                    break;
+                }
+                value = quotient;
+                scale--;
+            }
+
+            if (value == 0) {
+                return 0;
+            }
+        }
+
+        // Compute Java BigDecimal.hashCode():
+        // hashCode = intValue(unscaledValue) * 31 + scale
+        // intValue() returns the low 32 bits of the value
+        auto int_value = static_cast<int32_t>(static_cast<int64_t>(value));
+        return int_value * 31 + scale;
+    }
+
+ private:
+    static constexpr int32_t HIVE_DECIMAL_MAX_PRECISION = 38;
+    static constexpr int32_t HIVE_DECIMAL_MAX_SCALE = 38;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/mod_bucket_function.cpp 
b/src/paimon/core/bucket/mod_bucket_function.cpp
new file mode 100644
index 0000000..375699e
--- /dev/null
+++ b/src/paimon/core/bucket/mod_bucket_function.cpp
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/bucket/mod_bucket_function.h"
+
+#include <cassert>
+
+#include "fmt/format.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+namespace {
+
+/// Equivalent to Java's Math.floorMod semantics.
+/// The result always has the same sign as the divisor (y), or is zero.
+/// Works for both int32_t and int64_t as T.
+template <typename T>
+inline int32_t FloorMod(T x, int32_t y) {
+    auto mod = static_cast<int64_t>(x) % static_cast<int64_t>(y);
+    // If the signs of mod and y differ and mod is not zero, adjust.
+    if ((mod ^ static_cast<int64_t>(y)) < 0 && mod != 0) {
+        mod += y;
+    }
+    return static_cast<int32_t>(mod);
+}
+
+}  // namespace
+
+ModBucketFunction::ModBucketFunction(FieldType bucket_key_type)
+    : bucket_key_type_(bucket_key_type) {}
+
+Result<std::unique_ptr<ModBucketFunction>> ModBucketFunction::Create(FieldType 
bucket_key_type) {
+    if (bucket_key_type != FieldType::INT && bucket_key_type != 
FieldType::BIGINT) {
+        return Status::Invalid(
+            fmt::format("ModBucketFunction only supports INT or BIGINT bucket 
key type, but got {}",
+                        FieldTypeUtils::FieldTypeToString(bucket_key_type)));
+    }
+    return std::unique_ptr<ModBucketFunction>(new 
ModBucketFunction(bucket_key_type));
+}
+
+int32_t ModBucketFunction::Bucket(const BinaryRow& row, int32_t num_buckets) 
const {
+    switch (bucket_key_type_) {
+        case FieldType::INT:
+            return FloorMod(row.GetInt(0), num_buckets);
+        case FieldType::BIGINT:
+            return FloorMod(row.GetLong(0), num_buckets);
+        default:
+            // This should never happen since Create() validates the type.
+            assert(false);
+            return 0;
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/mod_bucket_function.h 
b/src/paimon/core/bucket/mod_bucket_function.h
new file mode 100644
index 0000000..67747d1
--- /dev/null
+++ b/src/paimon/core/bucket/mod_bucket_function.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "paimon/core/bucket/bucket_function.h"
+#include "paimon/defs.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Mod bucket function that uses modulo operation on the bucket key value.
+/// The bucket key must be a single field of INT or BIGINT type.
+/// This implements Java's Math.floorMod semantics for negative numbers.
+class ModBucketFunction : public BucketFunction {
+ public:
+    /// Create a ModBucketFunction with the given bucket key type.
+    /// @param bucket_key_type The type of the single bucket key field. Must 
be INT or BIGINT.
+    /// @return A Result containing the ModBucketFunction or an error status.
+    static Result<std::unique_ptr<ModBucketFunction>> Create(FieldType 
bucket_key_type);
+
+    int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const override;
+
+ private:
+    explicit ModBucketFunction(FieldType bucket_key_type);
+
+    FieldType bucket_key_type_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/bucket/mod_bucket_function_test.cpp 
b/src/paimon/core/bucket/mod_bucket_function_test.cpp
new file mode 100644
index 0000000..0eec2da
--- /dev/null
+++ b/src/paimon/core/bucket/mod_bucket_function_test.cpp
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/bucket/mod_bucket_function.h"
+
+#include "gtest/gtest.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+namespace {
+
+BinaryRow CreateIntRow(int32_t value) {
+    auto pool = GetDefaultPool();
+    return BinaryRowGenerator::GenerateRow({value}, pool.get());
+}
+
+BinaryRow CreateLongRow(int64_t value) {
+    auto pool = GetDefaultPool();
+    return BinaryRowGenerator::GenerateRow({value}, pool.get());
+}
+
+}  // namespace
+
+TEST(ModBucketFunctionTest, TestIntType) {
+    ASSERT_OK_AND_ASSIGN(auto func, ModBucketFunction::Create(FieldType::INT));
+
+    // 1 % 5 = 1
+    ASSERT_EQ(1, func->Bucket(CreateIntRow(1), 5));
+    // 7 % 5 = 2
+    ASSERT_EQ(2, func->Bucket(CreateIntRow(7), 5));
+    // -2 floorMod 5 = 3 (Java Math.floorMod(-2, 5) = 3)
+    ASSERT_EQ(3, func->Bucket(CreateIntRow(-2), 5));
+}
+
+TEST(ModBucketFunctionTest, TestBigintType) {
+    ASSERT_OK_AND_ASSIGN(auto func, 
ModBucketFunction::Create(FieldType::BIGINT));
+
+    // 8 % 5 = 3
+    ASSERT_EQ(3, func->Bucket(CreateLongRow(8), 5));
+    // 0 % 5 = 0
+    ASSERT_EQ(0, func->Bucket(CreateLongRow(0), 5));
+    // -3 floorMod 5 = 2 (Java Math.floorMod(-3L, 5) = 2)
+    ASSERT_EQ(2, func->Bucket(CreateLongRow(-3), 5));
+}
+
+TEST(ModBucketFunctionTest, TestUnsupportedTypes) {
+    {
+        // STRING type should fail
+        auto result = ModBucketFunction::Create(FieldType::STRING);
+        ASSERT_NOK_WITH_MSG(result.status(), "only supports INT or BIGINT");
+    }
+    {
+        // FLOAT type should fail
+        auto result = ModBucketFunction::Create(FieldType::FLOAT);
+        ASSERT_NOK_WITH_MSG(result.status(), "only supports INT or BIGINT");
+    }
+    {
+        // DOUBLE type should fail
+        auto result = ModBucketFunction::Create(FieldType::DOUBLE);
+        ASSERT_NOK_WITH_MSG(result.status(), "only supports INT or BIGINT");
+    }
+}
+
+TEST(ModBucketFunctionTest, TestIntEdgeCases) {
+    ASSERT_OK_AND_ASSIGN(auto func, ModBucketFunction::Create(FieldType::INT));
+
+    // 0 % 5 = 0
+    ASSERT_EQ(0, func->Bucket(CreateIntRow(0), 5));
+    // 5 % 5 = 0
+    ASSERT_EQ(0, func->Bucket(CreateIntRow(5), 5));
+    // -5 floorMod 5 = 0
+    ASSERT_EQ(0, func->Bucket(CreateIntRow(-5), 5));
+    // 1 % 1 = 0
+    ASSERT_EQ(0, func->Bucket(CreateIntRow(1), 1));
+}
+
+TEST(ModBucketFunctionTest, TestBigintEdgeCases) {
+    ASSERT_OK_AND_ASSIGN(auto func, 
ModBucketFunction::Create(FieldType::BIGINT));
+
+    // Large value
+    ASSERT_EQ(3, func->Bucket(CreateLongRow(1000000003L), 5));
+    // Negative large value: -1000000003 floorMod 5 = 2
+    ASSERT_EQ(2, func->Bucket(CreateLongRow(-1000000003L), 5));
+}
+
+/// Large random compatibility test to ensure alignment with Java's 
Math.floorMod behavior.
+/// The expected values are pre-computed using Java's Math.floorMod.
+TEST(ModBucketFunctionTest, TestCompatibleWithJava) {
+    ASSERT_OK_AND_ASSIGN(auto int_func, 
ModBucketFunction::Create(FieldType::INT));
+    ASSERT_OK_AND_ASSIGN(auto long_func, 
ModBucketFunction::Create(FieldType::BIGINT));
+
+    // Test INT type: pairs of (value, num_buckets) -> expected bucket (Java 
Math.floorMod)
+    // These values cover positive, negative, zero, edge cases, and large 
values.
+    struct IntTestCase {
+        int32_t value;
+        int32_t num_buckets;
+        int32_t expected;
+    };
+    std::vector<IntTestCase> int_cases = {
+        {0, 10, 0},
+        {1, 10, 1},
+        {-1, 10, 9},
+        {10, 10, 0},
+        {-10, 10, 0},
+        {11, 10, 1},
+        {-11, 10, 9},
+        {2147483647, 100, 47},   // INT32_MAX
+        {-2147483647, 100, 53},  // -(INT32_MAX)
+        {2147483647, 7, 1},
+        {-2147483647, 7, 6},
+        {123456789, 1000, 789},
+        {-123456789, 1000, 211},
+        {999, 1, 0},
+        {-999, 1, 0},
+        {42, 3, 0},
+        {-42, 3, 0},
+        {43, 3, 1},
+        {-43, 3, 2},
+        {100, 7, 2},
+        {-100, 7, 5},
+    };
+    for (const auto& tc : int_cases) {
+        ASSERT_EQ(tc.expected, int_func->Bucket(CreateIntRow(tc.value), 
tc.num_buckets))
+            << "INT floorMod(" << tc.value << ", " << tc.num_buckets << ")";
+    }
+
+    // Test BIGINT type: pairs of (value, num_buckets) -> expected bucket 
(Java Math.floorMod)
+    struct LongTestCase {
+        int64_t value;
+        int32_t num_buckets;
+        int32_t expected;
+    };
+    std::vector<LongTestCase> long_cases = {
+        {0L, 10, 0},
+        {1L, 10, 1},
+        {-1L, 10, 9},
+        {10L, 10, 0},
+        {-10L, 10, 0},
+        {9223372036854775807L, 100, 7},    // INT64_MAX
+        {-9223372036854775807L, 100, 93},  // -(INT64_MAX)
+        {9223372036854775807L, 7, 0},
+        {-9223372036854775807L, 7, 0},
+        {1234567890123456789L, 1000, 789},
+        {-1234567890123456789L, 1000, 211},
+        {100L, 7, 2},
+        {-100L, 7, 5},
+        {999999999999L, 13, 0},
+        {-999999999999L, 13, 0},
+    };
+    for (const auto& tc : long_cases) {
+        ASSERT_EQ(tc.expected, long_func->Bucket(CreateLongRow(tc.value), 
tc.num_buckets))
+            << "BIGINT floorMod(" << tc.value << ", " << tc.num_buckets << ")";
+    }
+
+    // Verify that all bucket results are in valid range [0, num_buckets)
+    for (int32_t num_buckets = 1; num_buckets <= 50; num_buckets++) {
+        for (int32_t v = -100; v <= 100; v++) {
+            int32_t bucket = int_func->Bucket(CreateIntRow(v), num_buckets);
+            ASSERT_GE(bucket, 0);
+            ASSERT_LT(bucket, num_buckets);
+        }
+    }
+}
+
+}  // namespace paimon::test

Reply via email to