This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3274d79797 [core] Introduce hive bucket function to align with spark 
hive bucket (#7393)
3274d79797 is described below

commit 3274d79797404df43a99215137905813d6c97479
Author: WenjunMin <[email protected]>
AuthorDate: Wed Mar 11 10:16:19 2026 +0800

    [core] Introduce hive bucket function to align with spark hive bucket 
(#7393)
---
 .../shortcodes/generated/core_configuration.html   |   2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   7 +-
 .../org/apache/paimon/bucket/BucketFunction.java   |   2 +
 .../apache/paimon/bucket/HiveBucketFunction.java   |  98 ++++++++++++++++++
 .../java/org/apache/paimon/bucket/HiveHasher.java  | 114 +++++++++++++++++++++
 .../paimon/bucket/HiveBucketFunctionTest.java      | 105 +++++++++++++++++++
 .../org/apache/paimon/hash/HiveHasherTest.java     |  77 ++++++++++++++
 .../spark/catalog/functions/PaimonFunctions.scala  |  26 ++---
 .../catalog/functions/BucketFunctionTest.java      |  90 ++++++++++++++++
 9 files changed, 506 insertions(+), 15 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 5003d23dac..673d988dab 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -108,7 +108,7 @@ under the License.
             <td><h5>bucket-function.type</h5></td>
             <td style="word-wrap: break-word;">default</td>
             <td><p>Enum</p></td>
-            <td>The bucket function for paimon bucket.<br /><br />Possible 
values:<ul><li>"default": The default bucket function which will use 
arithmetic: bucket_id = Math.abs(hash_bucket_binary_row % numBuckets) to get 
bucket.</li><li>"mod": The modulus bucket function which will use modulus 
arithmetic: bucket_id = Math.floorMod(bucket_key_value, numBuckets) to get 
bucket. Note: the bucket key must be a single field of INT or BIGINT 
datatype.</li></ul></td>
+            <td>The bucket function for paimon bucket.<br /><br />Possible 
values:<ul><li>"default": The default bucket function which will use 
arithmetic: bucket_id = Math.abs(hash_bucket_binary_row % numBuckets) to get 
bucket.</li><li>"mod": The modulus bucket function which will use modulus 
arithmetic: bucket_id = Math.floorMod(bucket_key_value, numBuckets) to get 
bucket. Note: the bucket key must be a single field of INT or BIGINT 
datatype.</li><li>"hive": The hive bucket function wh [...]
         </tr>
         <tr>
             <td><h5>bucket-key</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 70e50efe9c..d5934b65db 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -152,7 +152,10 @@ public class CoreOptions implements Serializable {
         MOD(
                 "mod",
                 "The modulus bucket function which will use modulus 
arithmetic: bucket_id = Math.floorMod(bucket_key_value, numBuckets) to get 
bucket. "
-                        + "Note: the bucket key must be a single field of INT 
or BIGINT datatype.");
+                        + "Note: the bucket key must be a single field of INT 
or BIGINT datatype."),
+        HIVE(
+                "hive",
+                "The hive bucket function which will use hive-compatible hash 
arithmetic to get bucket.");
 
         private final String value;
         private final String description;
@@ -172,6 +175,8 @@ public class CoreOptions implements Serializable {
                 return DEFAULT;
             } else if (MOD.value.equalsIgnoreCase(bucketType)) {
                 return MOD;
+            } else if (HIVE.value.equalsIgnoreCase(bucketType)) {
+                return HIVE;
             }
             throw new IllegalArgumentException(
                     "cannot match type: " + bucketType + " for bucket 
function");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/bucket/BucketFunction.java 
b/paimon-core/src/main/java/org/apache/paimon/bucket/BucketFunction.java
index f54d17d764..6d5149dd6f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/bucket/BucketFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/bucket/BucketFunction.java
@@ -41,6 +41,8 @@ public interface BucketFunction extends Serializable {
                 return new DefaultBucketFunction();
             case MOD:
                 return new ModBucketFunction(bucketKeyType);
+            case HIVE:
+                return new HiveBucketFunction(bucketKeyType);
             default:
                 throw new IllegalArgumentException(
                         "Unsupported bucket type: " + bucketFunctionType);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/bucket/HiveBucketFunction.java 
b/paimon-core/src/main/java/org/apache/paimon/bucket/HiveBucketFunction.java
new file mode 100644
index 0000000000..68f401bba4
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/bucket/HiveBucketFunction.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.bucket;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowUtils;
+
+/** Hive-compatible bucket function. */
+public class HiveBucketFunction implements BucketFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int SEED = 0;
+
+    private final InternalRow.FieldGetter[] fieldGetters;
+
+    public HiveBucketFunction(RowType rowType) {
+        this.fieldGetters = 
InternalRowUtils.createFieldGetters(rowType.getFieldTypes());
+    }
+
+    @Override
+    public int bucket(BinaryRow row, int numBuckets) {
+        assert numBuckets > 0 && row.getRowKind() == RowKind.INSERT : "Num 
bucket is illegal";
+
+        int hash = SEED;
+        for (int i = 0; i < row.getFieldCount(); i++) {
+            hash = (31 * hash) + 
computeHash(fieldGetters[i].getFieldOrNull(row));
+        }
+        return mod(hash & Integer.MAX_VALUE, numBuckets);
+    }
+
+    static int mod(int value, int divisor) {
+        int remainder = value % divisor;
+        if (remainder < 0) {
+            return (remainder + divisor) % divisor;
+        }
+        return remainder;
+    }
+
+    private int computeHash(Object value) {
+        if (value == null) {
+            return 0;
+        }
+
+        if (value instanceof Boolean) {
+            return HiveHasher.hashInt((Boolean) value ? 1 : 0);
+        } else if (value instanceof Byte) {
+            return HiveHasher.hashInt(((Byte) value).intValue());
+        } else if (value instanceof Short) {
+            return HiveHasher.hashInt(((Short) value).intValue());
+        } else if (value instanceof Integer) {
+            return HiveHasher.hashInt((Integer) value);
+        } else if (value instanceof Long) {
+            return HiveHasher.hashLong((Long) value);
+        } else if (value instanceof Float) {
+            float floatValue = (Float) value;
+            return HiveHasher.hashInt(floatValue == -0.0f ? 0 : 
Float.floatToIntBits(floatValue));
+        } else if (value instanceof Double) {
+            double doubleValue = (Double) value;
+            return HiveHasher.hashLong(
+                    doubleValue == -0.0d ? 0L : 
Double.doubleToLongBits(doubleValue));
+        } else if (value instanceof BinaryString) {
+            BinaryString stringValue = (BinaryString) value;
+            return HiveHasher.hashUnsafeBytes(
+                    stringValue.getSegments(),
+                    stringValue.getOffset(),
+                    stringValue.getSizeInBytes());
+        } else if (value instanceof byte[]) {
+            return HiveHasher.hashBytes((byte[]) value);
+        } else if (value instanceof Decimal) {
+            return HiveHasher.normalizeDecimal(((Decimal) 
value).toBigDecimal()).hashCode();
+        }
+
+        throw new UnsupportedOperationException(
+                "Unsupported type as bucket key type " + value.getClass());
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/bucket/HiveHasher.java 
b/paimon-core/src/main/java/org/apache/paimon/bucket/HiveHasher.java
new file mode 100644
index 0000000000..7103f4d9b7
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/bucket/HiveHasher.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.bucket;
+
+import org.apache.paimon.memory.MemorySegment;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+
+/** Hive hash util. */
+public class HiveHasher {
+
+    private static final int HIVE_DECIMAL_MAX_PRECISION = 38;
+    private static final int HIVE_DECIMAL_MAX_SCALE = 38;
+
+    @Override
+    public String toString() {
+        return HiveHasher.class.getSimpleName();
+    }
+
+    public static int hashInt(int input) {
+        return input;
+    }
+
+    public static int hashLong(long input) {
+        return Long.hashCode(input);
+    }
+
+    public static int hashBytes(byte[] bytes) {
+        int result = 0;
+        for (byte value : bytes) {
+            result = (result * 31) + value;
+        }
+        return result;
+    }
+
+    public static int hashUnsafeBytes(MemorySegment[] segments, int offset, 
int length) {
+        int result = 0;
+        for (MemorySegment segment : segments) {
+            int remaining = segment.size() - offset;
+            if (remaining > 0) {
+                int bytesToRead = Math.min(remaining, length);
+                for (int i = 0; i < bytesToRead; i++) {
+                    result = (result * 31) + segment.get(offset + i);
+                }
+                length -= bytesToRead;
+                offset = 0;
+            } else {
+                offset -= segment.size();
+            }
+
+            if (length == 0) {
+                break;
+            }
+        }
+        return result;
+    }
+
+    public static BigDecimal normalizeDecimal(BigDecimal input) {
+        if (input == null) {
+            return null;
+        }
+
+        BigDecimal result = trimDecimal(input);
+        int intDigits = result.precision() - result.scale();
+        if (intDigits > HIVE_DECIMAL_MAX_PRECISION) {
+            return null;
+        }
+
+        int maxScale =
+                Math.min(
+                        HIVE_DECIMAL_MAX_SCALE,
+                        Math.min(HIVE_DECIMAL_MAX_PRECISION - intDigits, 
result.scale()));
+        if (result.scale() > maxScale) {
+            result = result.setScale(maxScale, RoundingMode.HALF_UP);
+            result = trimDecimal(result);
+        }
+
+        return result;
+    }
+
+    private static BigDecimal trimDecimal(BigDecimal input) {
+        if (input.compareTo(BigDecimal.ZERO) == 0) {
+            return BigDecimal.ZERO;
+        }
+
+        BigDecimal result = input.stripTrailingZeros();
+        if (result.compareTo(BigDecimal.ZERO) == 0) {
+            return BigDecimal.ZERO;
+        }
+
+        if (result.scale() < 0) {
+            result = result.setScale(0);
+        }
+
+        return result;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/bucket/HiveBucketFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/bucket/HiveBucketFunctionTest.java
new file mode 100644
index 0000000000..fb5ecc61b4
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/bucket/HiveBucketFunctionTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.bucket;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link HiveBucketFunction}. */
+class HiveBucketFunctionTest {
+
+    @Test
+    void testHiveBucketFunction() {
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.STRING(),
+                        DataTypes.BYTES(),
+                        DataTypes.DECIMAL(10, 4));
+        HiveBucketFunction hiveBucketFunction = new 
HiveBucketFunction(rowType);
+
+        BinaryRow row =
+                toBinaryRow(
+                        rowType,
+                        7,
+                        BinaryString.fromString("hello"),
+                        new byte[] {1, 2, 3},
+                        Decimal.fromBigDecimal(new BigDecimal("12.3400"), 10, 
4));
+
+        int expectedHash =
+                31
+                                * (31
+                                                * (31 * 7
+                                                        + HiveHasher.hashBytes(
+                                                                "hello"
+                                                                        
.getBytes(
+                                                                               
 StandardCharsets
+                                                                               
         .UTF_8)))
+                                        + HiveHasher.hashBytes(new byte[] {1, 
2, 3}))
+                        + new BigDecimal("12.34").hashCode();
+        assertThat(hiveBucketFunction.bucket(row, 8))
+                .isEqualTo((expectedHash & Integer.MAX_VALUE) % 8);
+    }
+
+    @Test
+    void testHiveBucketFunctionWithNulls() {
+        RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
+        HiveBucketFunction hiveBucketFunction = new 
HiveBucketFunction(rowType);
+
+        BinaryRow row = toBinaryRow(rowType, null, null);
+
+        assertThat(hiveBucketFunction.bucket(row, 4)).isZero();
+    }
+
+    @Test
+    void testHiveBucketFunctionUnsupportedType() {
+        RowType rowType = RowType.of(DataTypes.TIMESTAMP());
+        HiveBucketFunction hiveBucketFunction = new 
HiveBucketFunction(rowType);
+
+        assertThat(hiveBucketFunction.bucket(toBinaryRow(rowType, (Object) 
null), 4)).isZero();
+
+        assertThatThrownBy(
+                        () ->
+                                hiveBucketFunction.bucket(
+                                        toBinaryRow(
+                                                rowType,
+                                                
org.apache.paimon.data.Timestamp.fromEpochMillis(
+                                                        1L)),
+                                        4))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Unsupported type as bucket key type");
+    }
+
+    private BinaryRow toBinaryRow(RowType rowType, Object... values) {
+        return new 
InternalRowSerializer(rowType).toBinaryRow(GenericRow.of(values));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/hash/HiveHasherTest.java 
b/paimon-core/src/test/java/org/apache/paimon/hash/HiveHasherTest.java
new file mode 100644
index 0000000000..dac6d97dda
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/hash/HiveHasherTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.bucket.HiveHasher;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.memory.MemorySegment;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HiveHasher}. */
+class HiveHasherTest {
+
+    @Test
+    void testHashUnsafeBytes() {
+        BinaryString binaryString = BinaryString.fromString("hello");
+        assertThat(
+                        HiveHasher.hashUnsafeBytes(
+                                binaryString.getSegments(),
+                                binaryString.getOffset(),
+                                binaryString.getSizeInBytes()))
+                .isEqualTo(HiveHasher.hashBytes(binaryString.toBytes()));
+
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < 1000; i++) {
+            builder.append(UUID.randomUUID());
+        }
+
+        binaryString = fromString(builder.toString());
+        assertThat(
+                        HiveHasher.hashUnsafeBytes(
+                                binaryString.getSegments(),
+                                binaryString.getOffset(),
+                                binaryString.getSizeInBytes()))
+                .isEqualTo(HiveHasher.hashBytes(binaryString.toBytes()));
+    }
+
+    private BinaryString fromString(String input) {
+        BinaryString binaryString = BinaryString.fromString(input);
+        int numBytes = binaryString.getSizeInBytes();
+        int pad = new Random().nextInt(100);
+        int totalBytes = numBytes + pad;
+        int segmentSize = totalBytes / 2 + 1;
+        byte[] bytes1 = new byte[segmentSize];
+        byte[] bytes2 = new byte[segmentSize];
+        if (segmentSize - pad > 0 && numBytes >= segmentSize - pad) {
+            binaryString.getSegments()[0].get(0, bytes1, pad, segmentSize - 
pad);
+        }
+        binaryString.getSegments()[0].get(
+                segmentSize - pad, bytes2, 0, numBytes - segmentSize + pad);
+        return BinaryString.fromAddress(
+                new MemorySegment[] {MemorySegment.wrap(bytes1), 
MemorySegment.wrap(bytes2)},
+                pad,
+                numBytes);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index acdd40e9b2..dd039de6cc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -43,29 +43,29 @@ object PaimonFunctions {
 
   val PAIMON_BUCKET: String = "bucket"
   val MOD_BUCKET: String = "mod_bucket"
+  val HIVE_BUCKET: String = "hive_bucket"
   val MAX_PT: String = "max_pt"
   val PATH_TO_DESCRIPTOR: String = "path_to_descriptor"
   val DESCRIPTOR_TO_STRING: String = "descriptor_to_string"
 
-  private val FUNCTIONS = ImmutableMap.of(
-    PAIMON_BUCKET,
-    new BucketFunction(PAIMON_BUCKET, BucketFunctionType.DEFAULT),
-    MOD_BUCKET,
-    new BucketFunction(MOD_BUCKET, BucketFunctionType.MOD),
-    MAX_PT,
-    new MaxPtFunction,
-    PATH_TO_DESCRIPTOR,
-    new PathToDescriptorUnbound,
-    DESCRIPTOR_TO_STRING,
-    new DescriptorToStringUnbound
-  )
+  private val FUNCTIONS = ImmutableMap
+    .builder[String, UnboundFunction]()
+    .put(PAIMON_BUCKET, new BucketFunction(PAIMON_BUCKET, 
BucketFunctionType.DEFAULT))
+    .put(MOD_BUCKET, new BucketFunction(MOD_BUCKET, BucketFunctionType.MOD))
+    .put(HIVE_BUCKET, new BucketFunction(HIVE_BUCKET, BucketFunctionType.HIVE))
+    .put(MAX_PT, new MaxPtFunction)
+    .put(PATH_TO_DESCRIPTOR, new PathToDescriptorUnbound)
+    .put(DESCRIPTOR_TO_STRING, new DescriptorToStringUnbound)
+    .build()
 
   /** The bucket function type to the function name mapping */
   private val TYPE_FUNC_MAPPING = ImmutableMap.of(
     BucketFunctionType.DEFAULT,
     PAIMON_BUCKET,
     BucketFunctionType.MOD,
-    MOD_BUCKET
+    MOD_BUCKET,
+    BucketFunctionType.HIVE,
+    HIVE_BUCKET
   )
 
   val names: ImmutableSet[String] = FUNCTIONS.keySet
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
index 214965bf15..43425486c8 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
@@ -54,7 +54,14 @@ import org.apache.paimon.types.VarCharType;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.HiveHash;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -64,6 +71,8 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -72,6 +81,8 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
+import scala.collection.JavaConverters;
+
 /** Tests for Spark bucket functions. */
 public class BucketFunctionTest {
     private static final int NUM_BUCKETS =
@@ -243,6 +254,69 @@ public class BucketFunctionTest {
                 .forEach(row -> 
Assertions.assertThat(row.getInt(2)).isEqualTo(row.get(1)));
     }
 
+    @Test
+    public void testHiveBucketFunctionMatchesSparkHiveHash() {
+        RowType hiveBucketRowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, BOOLEAN_COL, new 
BooleanType()),
+                                new DataField(1, BYTE_COL, new TinyIntType()),
+                                new DataField(2, SHORT_COL, new 
SmallIntType()),
+                                new DataField(3, INTEGER_COL, new IntType()),
+                                new DataField(4, LONG_COL, new BigIntType()),
+                                new DataField(5, FLOAT_COL, new FloatType()),
+                                new DataField(6, DOUBLE_COL, new DoubleType()),
+                                new DataField(
+                                        7, STRING_COL, new 
VarCharType(VarCharType.MAX_LENGTH)),
+                                new DataField(
+                                        8,
+                                        DECIMAL_COL,
+                                        new DecimalType(DECIMAL_PRECISION, 
DECIMAL_SCALE)),
+                                new DataField(
+                                        9,
+                                        COMPACTED_DECIMAL_COL,
+                                        new DecimalType(
+                                                COMPACTED_DECIMAL_PRECISION,
+                                                COMPACTED_DECIMAL_SCALE)),
+                                new DataField(
+                                        10,
+                                        BINARY_COL,
+                                        new 
VarBinaryType(VarBinaryType.MAX_LENGTH))));
+        StructType schema =
+                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonRowType(hiveBucketRowType);
+        String[] bucketColumns = hiveBucketRowType.getFieldNames().toArray(new 
String[0]);
+        List<Row> rows =
+                Arrays.asList(
+                        RowFactory.create(
+                                true,
+                                (byte) 1,
+                                (short) 2,
+                                3,
+                                4L,
+                                1.5f,
+                                2.5d,
+                                "hello",
+                                new BigDecimal("12.340000000000000000"),
+                                new BigDecimal("56.789000000"),
+                                "spark-hive".getBytes(StandardCharsets.UTF_8)),
+                        RowFactory.create(
+                                null, null, null, null, null, null, null, 
null, null, null, null));
+
+        List<Row> result =
+                spark.createDataFrame(rows, schema)
+                        .selectExpr(
+                                "*",
+                                String.format(
+                                        "paimon.sys.hive_bucket(%s, %s) as 
actual_bucket",
+                                        NUM_BUCKETS, String.join(", ", 
bucketColumns)))
+                        .collectAsList();
+
+        for (Row row : result) {
+            Assertions.assertThat((int) row.getAs("actual_bucket"))
+                    .isEqualTo(sparkHiveBucket(row, schema, bucketColumns));
+        }
+    }
+
     @Test
     public void testBooleanType() {
         validateSparkBucketFunction(BOOLEAN_COL);
@@ -335,4 +409,20 @@ public class BucketFunctionTest {
                 .collectAsList()
                 .forEach(row -> 
Assertions.assertThat(row.getInt(2)).isNotEqualTo(row.get(1)));
     }
+
+    private static int sparkHiveBucket(Row row, StructType schema, String... 
bucketColumns) {
+        List<Expression> expressions = new ArrayList<>();
+        for (String bucketColumn : bucketColumns) {
+            int index = schema.fieldIndex(bucketColumn);
+            StructField field = schema.fields()[index];
+            expressions.add(
+                    Literal.create(row.isNullAt(index) ? null : 
row.get(index), field.dataType()));
+        }
+
+        int hash =
+                (Integer)
+                        new 
HiveHash(JavaConverters.asScalaBuffer(expressions).toSeq())
+                                
.eval((org.apache.spark.sql.catalyst.InternalRow) null);
+        return (hash & Integer.MAX_VALUE) % NUM_BUCKETS;
+    }
 }

Reply via email to