This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9bc2e9c  feat: Introduce HashBucketAssigner (#117)
9bc2e9c is described below

commit 9bc2e9cc4c637a6398ccdffa5efb1003adb6228d
Author: Keith Lee <[email protected]>
AuthorDate: Mon Dec 29 11:14:01 2025 +0000

    feat: Introduce HashBucketAssigner (#117)
---
 crates/fluss/src/bucketing/mod.rs                  | 266 +++++++++++++++++++++
 crates/fluss/src/client/write/bucket_assigner.rs   |  54 ++++-
 crates/fluss/src/client/write/writer_client.rs     |  13 +-
 crates/fluss/src/lib.rs                            |   1 +
 .../src/{lib.rs => metadata/data_lake_format.rs}   |  33 +--
 crates/fluss/src/metadata/mod.rs                   |   2 +
 crates/fluss/src/util/mod.rs                       |   2 +
 crates/fluss/src/util/murmur_hash.rs               | 222 +++++++++++++++++
 8 files changed, 563 insertions(+), 30 deletions(-)

diff --git a/crates/fluss/src/bucketing/mod.rs 
b/crates/fluss/src/bucketing/mod.rs
new file mode 100644
index 0000000..2611ac7
--- /dev/null
+++ b/crates/fluss/src/bucketing/mod.rs
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::DataLakeFormat;
+use crate::util::murmur_hash;
+
+pub trait BucketingFunction: Sync + Send {
+    fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32>;
+}
+
+#[allow(dead_code)]
+impl dyn BucketingFunction {
+    /// Provides the bucketing function for a given [DataLakeFormat]
+    ///
+    /// # Arguments
+    /// * `lake_format` - Data lake format or none
+    ///
+    /// # Returns
+    /// * BucketingFunction
+    pub fn of(lake_format: Option<&DataLakeFormat>) -> Box<dyn 
BucketingFunction> {
+        match lake_format {
+            None => Box::new(FlussBucketingFunction),
+            Some(DataLakeFormat::Paimon) => Box::new(PaimonBucketingFunction),
+            Some(DataLakeFormat::Lance) => Box::new(FlussBucketingFunction),
+            Some(DataLakeFormat::Iceberg) => 
Box::new(IcebergBucketingFunction),
+        }
+    }
+}
+
+struct FlussBucketingFunction;
+impl BucketingFunction for FlussBucketingFunction {
+    fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32> {
+        if bucket_key.is_empty() {
+            return Err(IllegalArgument {
+                message: "bucket_key must not be empty!".to_string(),
+            });
+        }
+
+        if num_buckets <= 0 {
+            return Err(IllegalArgument {
+                message: "num_buckets must be positive!".to_string(),
+            });
+        }
+
+        let key_hash = murmur_hash::fluss_hash_bytes(bucket_key)?;
+
+        Ok(murmur_hash::fluss_hash_i32(key_hash) % num_buckets)
+    }
+}
+
+struct PaimonBucketingFunction;
+impl BucketingFunction for PaimonBucketingFunction {
+    fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32> {
+        if bucket_key.is_empty() {
+            return Err(IllegalArgument {
+                message: "bucket_key must not be empty!".to_string(),
+            });
+        }
+
+        if num_buckets <= 0 {
+            return Err(IllegalArgument {
+                message: "num_buckets must be positive!".to_string(),
+            });
+        }
+
+        let key_hash = murmur_hash::fluss_hash_bytes(bucket_key)?;
+
+        Ok((key_hash % num_buckets).abs())
+    }
+}
+
+struct IcebergBucketingFunction;
+impl BucketingFunction for IcebergBucketingFunction {
+    fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32> {
+        if bucket_key.is_empty() {
+            return Err(IllegalArgument {
+                message: "bucket_key must not be empty!".to_string(),
+            });
+        }
+
+        if num_buckets <= 0 {
+            return Err(IllegalArgument {
+                message: "num_buckets must be positive!".to_string(),
+            });
+        };
+
+        Ok((murmur_hash::hash_bytes(bucket_key) as i32 & i32::MAX) % 
num_buckets)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_default_bucketing() {
+        let default_bucketing = <dyn BucketingFunction>::of(None);
+
+        let expected = 1;
+        let actual = default_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 0;
+        let actual = default_bucketing
+            .bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 6;
+        let actual = default_bucketing
+            .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 6;
+        let actual = default_bucketing
+            .bucketing("The quick brown fox jumps over the lazy 
dog".as_bytes(), 8)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+    }
+
+    #[test]
+    fn test_paimon_bucketing() {
+        let paimon_bucketing = <dyn 
BucketingFunction>::of(Some(&DataLakeFormat::Paimon));
+
+        let expected = 1;
+        let actual = paimon_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 11;
+        let actual = paimon_bucketing
+            .bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 12;
+        let actual = paimon_bucketing
+            .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 0;
+        let actual = paimon_bucketing
+            .bucketing("The quick brown fox jumps over the lazy 
dog".as_bytes(), 8)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+    }
+
+    #[test]
+    fn test_lance_bucketing() {
+        let lance_bucketing = <dyn 
BucketingFunction>::of(Some(&DataLakeFormat::Lance));
+
+        let expected = 1;
+        let actual = lance_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 0;
+        let actual = lance_bucketing
+            .bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 6;
+        let actual = lance_bucketing
+            .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 6;
+        let actual = lance_bucketing
+            .bucketing("The quick brown fox jumps over the lazy 
dog".as_bytes(), 8)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+    }
+
+    #[test]
+    fn test_iceberg_bucketing() {
+        let iceberg_bucketing = <dyn 
BucketingFunction>::of(Some(&DataLakeFormat::Iceberg));
+
+        let expected = 3;
+        let actual = iceberg_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 4;
+        let actual = iceberg_bucketing
+            .bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 12;
+        let actual = iceberg_bucketing
+            .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+
+        let expected = 3;
+        let actual = iceberg_bucketing
+            .bucketing("The quick brown fox jumps over the lazy 
dog".as_bytes(), 8)
+            .unwrap();
+        assert_eq!(
+            expected, actual,
+            "Expecting bucket to be {expected} but got {actual}"
+        );
+    }
+}
diff --git a/crates/fluss/src/client/write/bucket_assigner.rs 
b/crates/fluss/src/client/write/bucket_assigner.rs
index 991c5f9..44b2673 100644
--- a/crates/fluss/src/client/write/bucket_assigner.rs
+++ b/crates/fluss/src/client/write/bucket_assigner.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::bucketing::BucketingFunction;
 use crate::cluster::Cluster;
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
 use crate::metadata::TablePath;
 use rand::Rng;
 use std::sync::atomic::{AtomicI32, Ordering};
@@ -25,7 +28,7 @@ pub trait BucketAssigner: Sync + Send {
 
     fn on_new_batch(&self, cluster: &Cluster, prev_bucket_id: i32);
 
-    fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> 
i32;
+    fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> 
Result<i32>;
 }
 
 #[derive(Debug)]
@@ -91,12 +94,55 @@ impl BucketAssigner for StickyBucketAssigner {
         self.next_bucket(cluster, prev_bucket_id);
     }
 
-    fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> 
i32 {
+    fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> 
Result<i32> {
         let bucket_id = self.current_bucket_id.load(Ordering::Relaxed);
         if bucket_id < 0 {
-            self.next_bucket(cluster, bucket_id)
+            Ok(self.next_bucket(cluster, bucket_id))
         } else {
-            bucket_id
+            Ok(bucket_id)
         }
     }
 }
+
+/// A [BucketAssigner] which assigns based on a modulo hashing function
+pub struct HashBucketAssigner {
+    num_buckets: i32,
+    bucketing_function: Box<dyn BucketingFunction>,
+}
+
+#[allow(dead_code)]
+impl HashBucketAssigner {
+    /// Creates a new [HashBucketAssigner] based on the given 
[BucketingFunction].
+    /// See [BucketingFunction.of(Option<&DataLakeFormat>)] for bucketing 
functions.
+    ///
+    ///
+    /// # Arguments
+    /// * `num_buckets` - The number of buckets
+    /// * `bucketing_function` - The bucketing function
+    ///
+    /// # Returns
+    /// * [HashBucketAssigner] - The hash bucket assigner
+    pub fn new(num_buckets: i32, bucketing_function: Box<dyn 
BucketingFunction>) -> Self {
+        HashBucketAssigner {
+            num_buckets,
+            bucketing_function,
+        }
+    }
+}
+
+impl BucketAssigner for HashBucketAssigner {
+    fn abort_if_batch_full(&self) -> bool {
+        false
+    }
+
+    fn on_new_batch(&self, _: &Cluster, _: i32) {
+        // do nothing
+    }
+
+    fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) -> 
Result<i32> {
+        let key = bucket_key.ok_or_else(|| IllegalArgument {
+            message: "no bucket key provided".to_string(),
+        })?;
+        self.bucketing_function.bucketing(key, self.num_buckets)
+    }
+}
diff --git a/crates/fluss/src/client/write/writer_client.rs 
b/crates/fluss/src/client/write/writer_client.rs
index 042859a..22e0397 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -91,7 +91,7 @@ impl WriterClient {
         let table_path = &record.table_path;
         let cluster = self.metadata.get_cluster();
 
-        let (bucket_assigner, bucket_id) = self.assign_bucket(table_path);
+        let (bucket_assigner, bucket_id) = self.assign_bucket(table_path)?;
 
         let mut result = self
             .accumulate
@@ -101,7 +101,7 @@ impl WriterClient {
         if result.abort_record_for_new_batch {
             let prev_bucket_id = bucket_id;
             bucket_assigner.on_new_batch(&cluster, prev_bucket_id);
-            let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
+            let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?;
             result = self
                 .accumulate
                 .append(record, bucket_id, &cluster, false)
@@ -114,7 +114,10 @@ impl WriterClient {
 
         Ok(result.result_handle.expect("result_handle should exist"))
     }
-    fn assign_bucket(&self, table_path: &Arc<TablePath>) -> (Arc<Box<dyn 
BucketAssigner>>, i32) {
+    fn assign_bucket(
+        &self,
+        table_path: &Arc<TablePath>,
+    ) -> Result<(Arc<Box<dyn BucketAssigner>>, i32)> {
         let cluster = self.metadata.get_cluster();
         let bucket_assigner = {
             if let Some(assigner) = self.bucket_assigners.get(table_path) {
@@ -126,8 +129,8 @@ impl WriterClient {
                 assigner
             }
         };
-        let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
-        (bucket_assigner, bucket_id)
+        let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?;
+        Ok((bucket_assigner, bucket_id))
     }
 
     pub async fn close(self) -> Result<()> {
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs
index 25978ce..1bd72a4 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/lib.rs
@@ -26,6 +26,7 @@ mod cluster;
 pub mod config;
 pub mod error;
 
+mod bucketing;
 mod compression;
 pub mod io;
 mod util;
diff --git a/crates/fluss/src/lib.rs 
b/crates/fluss/src/metadata/data_lake_format.rs
similarity index 62%
copy from crates/fluss/src/lib.rs
copy to crates/fluss/src/metadata/data_lake_format.rs
index 25978ce..76a23f8 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/metadata/data_lake_format.rs
@@ -15,25 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub mod client;
-pub mod metadata;
-pub mod record;
-pub mod row;
-pub mod rpc;
-
-mod cluster;
-
-pub mod config;
-pub mod error;
-
-mod compression;
-pub mod io;
-mod util;
-
-pub type TableId = u64;
-pub type PartitionId = u64;
-pub type BucketId = i32;
-
-pub mod proto {
-    include!(concat!(env!("OUT_DIR"), "/proto.rs"));
+/// Identifies the logical format of a data lake table supported by Fluss.
+///
+/// This enum is typically used in metadata and configuration to distinguish
+/// between different table formats so that the appropriate integration and
+/// semantics can be applied.
+pub enum DataLakeFormat {
+    /// Apache Paimon data lake table format.
+    Paimon,
+    /// Lance columnar data format / lakehouse table format.
+    Lance,
+    /// Apache Iceberg data lake table format.
+    Iceberg,
 }
diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs
index 8754007..9c0b1b4 100644
--- a/crates/fluss/src/metadata/mod.rs
+++ b/crates/fluss/src/metadata/mod.rs
@@ -15,11 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod data_lake_format;
 mod database;
 mod datatype;
 mod json_serde;
 mod table;
 
+pub use data_lake_format::*;
 pub use database::*;
 pub use datatype::*;
 pub use json_serde::*;
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index d8c0db5..5f67290 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod murmur_hash;
+
 use crate::metadata::TableBucket;
 use linked_hash_map::LinkedHashMap;
 use std::collections::{HashMap, HashSet};
diff --git a/crates/fluss/src/util/murmur_hash.rs 
b/crates/fluss/src/util/murmur_hash.rs
new file mode 100644
index 0000000..12229c7
--- /dev/null
+++ b/crates/fluss/src/util/murmur_hash.rs
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/* This file is based on source code of Apache Flink Project 
(https://flink.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+
+pub const MURMUR3_DEFAULT_SEED: u32 = 0;
+pub const FLINK_MURMUR3_DEFAULT_SEED: i32 = 42;
+
+const C1: u32 = 0xCC9E_2D51;
+const C2: u32 = 0x1B87_3593;
+const R1: u32 = 15;
+const R2: u32 = 13;
+const M: u32 = 5;
+const N: u32 = 0xE654_6B64;
+const CHUNK_SIZE: usize = 4;
+
+/// Hashes the data using 32-bit Murmur3 hash with 0 as seed
+///
+/// # Arguments
+/// * `data` - byte array containing data to be hashed
+///
+/// # Returns
+/// Returns hash value
+pub fn hash_bytes(data: &[u8]) -> u32 {
+    hash_bytes_with_seed(data, MURMUR3_DEFAULT_SEED)
+}
+
+#[inline(always)]
+fn hash_bytes_with_seed(data: &[u8], seed: u32) -> u32 {
+    let length = data.len();
+    let chunks = length / CHUNK_SIZE;
+    let length_aligned = chunks * CHUNK_SIZE;
+
+    let mut h1 = hash_full_chunks(data, seed);
+    let mut k1 = 0u32;
+
+    for (shift, &b) in data[length_aligned..].iter().enumerate() {
+        k1 |= (b as u32) << (8 * shift);
+    }
+
+    h1 ^= k1.wrapping_mul(C1).rotate_left(R1).wrapping_mul(C2);
+
+    fmix(h1, length)
+}
+
+/// Hashes the data using Fluss'/Flink's variant of 32-bit Murmur hash with 42 
as seed and tail bytes mixed into hash byte-by-byte
+/// Maximum data array size supported is 2GB
+///
+/// # Arguments
+/// * `data` - byte array containing data to be hashed
+///
+/// # Returns
+/// * result of hashing, `Ok(hash_value)`
+///
+/// # Error
+/// Returns `Err(IllegalArgument)` if byte array is larger than 2GB
+pub fn fluss_hash_bytes(data: &[u8]) -> Result<i32> {
+    fluss_hash_bytes_with_seed(data, FLINK_MURMUR3_DEFAULT_SEED)
+}
+#[inline(always)]
+fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> Result<i32> {
+    let length = data.len();
+
+    if length >= i32::MAX as usize {
+        return Err(IllegalArgument {
+            message: "data array size {length} is bigger than 
supported".to_string(),
+        });
+    }
+
+    let chunks = length / CHUNK_SIZE;
+    let length_aligned = chunks * CHUNK_SIZE;
+
+    let mut h1 = hash_full_chunks(data, seed as u32);
+
+    for byte in data.iter().take(length).skip(length_aligned) {
+        let k1 = mix_k1(*byte as u32);
+        h1 = mix_h1(h1, k1);
+    }
+
+    Ok(fmix(h1, length) as i32)
+}
+
+#[inline(always)]
+fn hash_full_chunks(data: &[u8], seed: u32) -> u32 {
+    data.chunks_exact(CHUNK_SIZE).fold(seed, |h1, chunk| {
+        let block = u32::from_le_bytes(chunk.try_into().unwrap());
+        let k1 = mix_k1(block);
+        mix_h1(h1, k1)
+    })
+}
+
+#[inline(always)]
+fn mix_k1(k1: u32) -> u32 {
+    k1.wrapping_mul(C1).rotate_left(R1).wrapping_mul(C2)
+}
+
+#[inline(always)]
+fn mix_h1(h1: u32, k1: u32) -> u32 {
+    (h1 ^ k1).rotate_left(R2).wrapping_mul(M).wrapping_add(N)
+}
+
+// Finalization mix - force all bits of a hash block to avalanche
+#[inline(always)]
+fn fmix(mut h1: u32, length: usize) -> u32 {
+    h1 ^= length as u32;
+    bit_mix(h1)
+}
+
+/// Hashes an i32 using Fluss'/Flink's variant of Murmur
+///
+/// # Arguments
+/// * `input` - i32 value to be hashed
+///
+/// # Returns
+/// Returns hash value
+pub fn fluss_hash_i32(input: i32) -> i32 {
+    let mut input = input as u32;
+    input = input.wrapping_mul(C1);
+    input = input.rotate_left(R1);
+    input = input.wrapping_mul(C2);
+    input = input.rotate_left(R2);
+
+    input = input.wrapping_mul(M).wrapping_add(N);
+    input ^= CHUNK_SIZE as u32;
+    let output = bit_mix(input) as i32;
+
+    if output >= 0 {
+        output
+    } else if output != i32::MIN {
+        -output
+    } else {
+        0
+    }
+}
+
+const BIT_MIX_A: u32 = 0x85EB_CA6B;
+const BIT_MIX_B: u32 = 0xC2B2_AE35;
+
+#[inline(always)]
+fn bit_mix(mut input: u32) -> u32 {
+    input = input ^ (input >> 16);
+    input = input.wrapping_mul(BIT_MIX_A);
+    input = input ^ (input >> 13);
+    input = input.wrapping_mul(BIT_MIX_B);
+    input = input ^ (input >> 16);
+    input
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    #[test]
+    fn test_murmur3() {
+        //
+        let empty_data_hash = hash_bytes(&[]);
+        assert_eq!(empty_data_hash, 0);
+
+        let empty_data_hash = hash_bytes_with_seed(&[], 1);
+        assert_eq!(0x514E_28B7, empty_data_hash);
+
+        let empty_data_hash = hash_bytes_with_seed(&[], 0xFFFF_FFFF);
+        assert_eq!(0x81F1_6F39, empty_data_hash);
+
+        let hash = hash_bytes("The quick brown fox jumps over the lazy 
dog".as_bytes());
+        assert_eq!(0x2E4F_F723, hash);
+
+        let hash = hash_bytes_with_seed(
+            "The quick brown fox jumps over the lazy dog".as_bytes(),
+            0x9747_B28C,
+        );
+        assert_eq!(0x2FA8_26CD, hash);
+    }
+
+    #[test]
+    fn test_flink_murmur() {
+        let empty_data_hash = fluss_hash_bytes_with_seed(&[], 
0).expect("Failed to hash");
+        assert_eq!(empty_data_hash, 0);
+
+        let empty_data_hash = fluss_hash_bytes(&[]).expect("Failed to hash");
+        assert_eq!(0x087F_CD5C, empty_data_hash);
+
+        let empty_data_hash =
+            fluss_hash_bytes_with_seed(&[], 0xFFFF_FFFFu32 as 
i32).expect("Failed to hash");
+        assert_eq!(0x81F1_6F39u32 as i32, empty_data_hash);
+
+        let hash =
+            fluss_hash_bytes_with_seed("The quick brown fox jumps over the 
lazy dog".as_bytes(), 0)
+                .expect("Failed to hash");
+        assert_eq!(0x5FD2_0A20, hash);
+
+        let hash = fluss_hash_bytes("The quick brown fox jumps over the lazy 
dog".as_bytes())
+            .expect("Failed to hash");
+        assert_eq!(0x1BC6_F880, hash);
+
+        let hash = fluss_hash_i32(0);
+        assert_eq!(0x2362_F9DE, hash);
+
+        let hash = fluss_hash_i32(42);
+        assert_eq!(0x43A4_6E1D, hash);
+
+        let hash = fluss_hash_i32(-77);
+        assert_eq!(0x2EEB_27DE, hash);
+    }
+}

Reply via email to