This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9bc2e9c feat: Introduce HashBucketAssigner (#117)
9bc2e9c is described below
commit 9bc2e9cc4c637a6398ccdffa5efb1003adb6228d
Author: Keith Lee <[email protected]>
AuthorDate: Mon Dec 29 11:14:01 2025 +0000
feat: Introduce HashBucketAssigner (#117)
---
crates/fluss/src/bucketing/mod.rs | 266 +++++++++++++++++++++
crates/fluss/src/client/write/bucket_assigner.rs | 54 ++++-
crates/fluss/src/client/write/writer_client.rs | 13 +-
crates/fluss/src/lib.rs | 1 +
.../src/{lib.rs => metadata/data_lake_format.rs} | 33 +--
crates/fluss/src/metadata/mod.rs | 2 +
crates/fluss/src/util/mod.rs | 2 +
crates/fluss/src/util/murmur_hash.rs | 222 +++++++++++++++++
8 files changed, 563 insertions(+), 30 deletions(-)
diff --git a/crates/fluss/src/bucketing/mod.rs
b/crates/fluss/src/bucketing/mod.rs
new file mode 100644
index 0000000..2611ac7
--- /dev/null
+++ b/crates/fluss/src/bucketing/mod.rs
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::DataLakeFormat;
+use crate::util::murmur_hash;
+
+pub trait BucketingFunction: Sync + Send {
+ fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32>;
+}
+
+#[allow(dead_code)]
+impl dyn BucketingFunction {
+ /// Provides the bucketing function for a given [DataLakeFormat]
+ ///
+ /// # Arguments
+ /// * `lake_format` - Data lake format or none
+ ///
+ /// # Returns
+ /// * BucketingFunction
+ pub fn of(lake_format: Option<&DataLakeFormat>) -> Box<dyn
BucketingFunction> {
+ match lake_format {
+ None => Box::new(FlussBucketingFunction),
+ Some(DataLakeFormat::Paimon) => Box::new(PaimonBucketingFunction),
+ Some(DataLakeFormat::Lance) => Box::new(FlussBucketingFunction),
+ Some(DataLakeFormat::Iceberg) =>
Box::new(IcebergBucketingFunction),
+ }
+ }
+}
+
+struct FlussBucketingFunction;
+impl BucketingFunction for FlussBucketingFunction {
+ fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32> {
+ if bucket_key.is_empty() {
+ return Err(IllegalArgument {
+ message: "bucket_key must not be empty!".to_string(),
+ });
+ }
+
+ if num_buckets <= 0 {
+ return Err(IllegalArgument {
+ message: "num_buckets must be positive!".to_string(),
+ });
+ }
+
+ let key_hash = murmur_hash::fluss_hash_bytes(bucket_key)?;
+
+ Ok(murmur_hash::fluss_hash_i32(key_hash) % num_buckets)
+ }
+}
+
+struct PaimonBucketingFunction;
+impl BucketingFunction for PaimonBucketingFunction {
+ fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32> {
+ if bucket_key.is_empty() {
+ return Err(IllegalArgument {
+ message: "bucket_key must not be empty!".to_string(),
+ });
+ }
+
+ if num_buckets <= 0 {
+ return Err(IllegalArgument {
+ message: "num_buckets must be positive!".to_string(),
+ });
+ }
+
+ let key_hash = murmur_hash::fluss_hash_bytes(bucket_key)?;
+
+ Ok((key_hash % num_buckets).abs())
+ }
+}
+
+struct IcebergBucketingFunction;
+impl BucketingFunction for IcebergBucketingFunction {
+ fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32> {
+ if bucket_key.is_empty() {
+ return Err(IllegalArgument {
+ message: "bucket_key must not be empty!".to_string(),
+ });
+ }
+
+ if num_buckets <= 0 {
+ return Err(IllegalArgument {
+ message: "num_buckets must be positive!".to_string(),
+ });
+ };
+
+ Ok((murmur_hash::hash_bytes(bucket_key) as i32 & i32::MAX) %
num_buckets)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_default_bucketing() {
+ let default_bucketing = <dyn BucketingFunction>::of(None);
+
+ let expected = 1;
+ let actual = default_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 0;
+ let actual = default_bucketing
+ .bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 6;
+ let actual = default_bucketing
+ .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 6;
+ let actual = default_bucketing
+ .bucketing("The quick brown fox jumps over the lazy
dog".as_bytes(), 8)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+ }
+
+ #[test]
+ fn test_paimon_bucketing() {
+ let paimon_bucketing = <dyn
BucketingFunction>::of(Some(&DataLakeFormat::Paimon));
+
+ let expected = 1;
+ let actual = paimon_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 11;
+ let actual = paimon_bucketing
+ .bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 12;
+ let actual = paimon_bucketing
+ .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 0;
+ let actual = paimon_bucketing
+ .bucketing("The quick brown fox jumps over the lazy
dog".as_bytes(), 8)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+ }
+
+ #[test]
+ fn test_lance_bucketing() {
+ let lance_bucketing = <dyn
BucketingFunction>::of(Some(&DataLakeFormat::Lance));
+
+ let expected = 1;
+ let actual = lance_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 0;
+ let actual = lance_bucketing
+ .bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 6;
+ let actual = lance_bucketing
+ .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 6;
+ let actual = lance_bucketing
+ .bucketing("The quick brown fox jumps over the lazy
dog".as_bytes(), 8)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+ }
+
+ #[test]
+ fn test_iceberg_bucketing() {
+ let iceberg_bucketing = <dyn
BucketingFunction>::of(Some(&DataLakeFormat::Iceberg));
+
+ let expected = 3;
+ let actual = iceberg_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 4;
+ let actual = iceberg_bucketing
+ .bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 12;
+ let actual = iceberg_bucketing
+ .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+
+ let expected = 3;
+ let actual = iceberg_bucketing
+ .bucketing("The quick brown fox jumps over the lazy
dog".as_bytes(), 8)
+ .unwrap();
+ assert_eq!(
+ expected, actual,
+ "Expecting bucket to be {expected} but got {actual}"
+ );
+ }
+}
diff --git a/crates/fluss/src/client/write/bucket_assigner.rs
b/crates/fluss/src/client/write/bucket_assigner.rs
index 991c5f9..44b2673 100644
--- a/crates/fluss/src/client/write/bucket_assigner.rs
+++ b/crates/fluss/src/client/write/bucket_assigner.rs
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+use crate::bucketing::BucketingFunction;
use crate::cluster::Cluster;
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
use crate::metadata::TablePath;
use rand::Rng;
use std::sync::atomic::{AtomicI32, Ordering};
@@ -25,7 +28,7 @@ pub trait BucketAssigner: Sync + Send {
fn on_new_batch(&self, cluster: &Cluster, prev_bucket_id: i32);
- fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) ->
i32;
+ fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) ->
Result<i32>;
}
#[derive(Debug)]
@@ -91,12 +94,55 @@ impl BucketAssigner for StickyBucketAssigner {
self.next_bucket(cluster, prev_bucket_id);
}
- fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) ->
i32 {
+ fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) ->
Result<i32> {
let bucket_id = self.current_bucket_id.load(Ordering::Relaxed);
if bucket_id < 0 {
- self.next_bucket(cluster, bucket_id)
+ Ok(self.next_bucket(cluster, bucket_id))
} else {
- bucket_id
+ Ok(bucket_id)
}
}
}
+
+/// A [BucketAssigner] which assigns based on a modulo hashing function
+pub struct HashBucketAssigner {
+ num_buckets: i32,
+ bucketing_function: Box<dyn BucketingFunction>,
+}
+
+#[allow(dead_code)]
+impl HashBucketAssigner {
+ /// Creates a new [HashBucketAssigner] based on the given
[BucketingFunction].
+ /// See [BucketingFunction.of(Option<&DataLakeFormat>)] for bucketing
functions.
+ ///
+ ///
+ /// # Arguments
+ /// * `num_buckets` - The number of buckets
+ /// * `bucketing_function` - The bucketing function
+ ///
+ /// # Returns
+ /// * [HashBucketAssigner] - The hash bucket assigner
+ pub fn new(num_buckets: i32, bucketing_function: Box<dyn
BucketingFunction>) -> Self {
+ HashBucketAssigner {
+ num_buckets,
+ bucketing_function,
+ }
+ }
+}
+
+impl BucketAssigner for HashBucketAssigner {
+ fn abort_if_batch_full(&self) -> bool {
+ false
+ }
+
+ fn on_new_batch(&self, _: &Cluster, _: i32) {
+ // do nothing
+ }
+
+ fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) ->
Result<i32> {
+ let key = bucket_key.ok_or_else(|| IllegalArgument {
+ message: "no bucket key provided".to_string(),
+ })?;
+ self.bucketing_function.bucketing(key, self.num_buckets)
+ }
+}
diff --git a/crates/fluss/src/client/write/writer_client.rs
b/crates/fluss/src/client/write/writer_client.rs
index 042859a..22e0397 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -91,7 +91,7 @@ impl WriterClient {
let table_path = &record.table_path;
let cluster = self.metadata.get_cluster();
- let (bucket_assigner, bucket_id) = self.assign_bucket(table_path);
+ let (bucket_assigner, bucket_id) = self.assign_bucket(table_path)?;
let mut result = self
.accumulate
@@ -101,7 +101,7 @@ impl WriterClient {
if result.abort_record_for_new_batch {
let prev_bucket_id = bucket_id;
bucket_assigner.on_new_batch(&cluster, prev_bucket_id);
- let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
+ let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?;
result = self
.accumulate
.append(record, bucket_id, &cluster, false)
@@ -114,7 +114,10 @@ impl WriterClient {
Ok(result.result_handle.expect("result_handle should exist"))
}
- fn assign_bucket(&self, table_path: &Arc<TablePath>) -> (Arc<Box<dyn
BucketAssigner>>, i32) {
+ fn assign_bucket(
+ &self,
+ table_path: &Arc<TablePath>,
+ ) -> Result<(Arc<Box<dyn BucketAssigner>>, i32)> {
let cluster = self.metadata.get_cluster();
let bucket_assigner = {
if let Some(assigner) = self.bucket_assigners.get(table_path) {
@@ -126,8 +129,8 @@ impl WriterClient {
assigner
}
};
- let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
- (bucket_assigner, bucket_id)
+ let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?;
+ Ok((bucket_assigner, bucket_id))
}
pub async fn close(self) -> Result<()> {
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs
index 25978ce..1bd72a4 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/lib.rs
@@ -26,6 +26,7 @@ mod cluster;
pub mod config;
pub mod error;
+mod bucketing;
mod compression;
pub mod io;
mod util;
diff --git a/crates/fluss/src/lib.rs
b/crates/fluss/src/metadata/data_lake_format.rs
similarity index 62%
copy from crates/fluss/src/lib.rs
copy to crates/fluss/src/metadata/data_lake_format.rs
index 25978ce..76a23f8 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/metadata/data_lake_format.rs
@@ -15,25 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-pub mod client;
-pub mod metadata;
-pub mod record;
-pub mod row;
-pub mod rpc;
-
-mod cluster;
-
-pub mod config;
-pub mod error;
-
-mod compression;
-pub mod io;
-mod util;
-
-pub type TableId = u64;
-pub type PartitionId = u64;
-pub type BucketId = i32;
-
-pub mod proto {
- include!(concat!(env!("OUT_DIR"), "/proto.rs"));
+/// Identifies the logical format of a data lake table supported by Fluss.
+///
+/// This enum is typically used in metadata and configuration to distinguish
+/// between different table formats so that the appropriate integration and
+/// semantics can be applied.
+pub enum DataLakeFormat {
+ /// Apache Paimon data lake table format.
+ Paimon,
+ /// Lance columnar data format / lakehouse table format.
+ Lance,
+ /// Apache Iceberg data lake table format.
+ Iceberg,
}
diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs
index 8754007..9c0b1b4 100644
--- a/crates/fluss/src/metadata/mod.rs
+++ b/crates/fluss/src/metadata/mod.rs
@@ -15,11 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+mod data_lake_format;
mod database;
mod datatype;
mod json_serde;
mod table;
+pub use data_lake_format::*;
pub use database::*;
pub use datatype::*;
pub use json_serde::*;
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index d8c0db5..5f67290 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+pub mod murmur_hash;
+
use crate::metadata::TableBucket;
use linked_hash_map::LinkedHashMap;
use std::collections::{HashMap, HashSet};
diff --git a/crates/fluss/src/util/murmur_hash.rs
b/crates/fluss/src/util/murmur_hash.rs
new file mode 100644
index 0000000..12229c7
--- /dev/null
+++ b/crates/fluss/src/util/murmur_hash.rs
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/* This file is based on source code of Apache Flink Project
(https://flink.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+
+pub const MURMUR3_DEFAULT_SEED: u32 = 0;
+pub const FLINK_MURMUR3_DEFAULT_SEED: i32 = 42;
+
+const C1: u32 = 0xCC9E_2D51;
+const C2: u32 = 0x1B87_3593;
+const R1: u32 = 15;
+const R2: u32 = 13;
+const M: u32 = 5;
+const N: u32 = 0xE654_6B64;
+const CHUNK_SIZE: usize = 4;
+
+/// Hashes the data using 32-bit Murmur3 hash with 0 as seed
+///
+/// # Arguments
+/// * `data` - byte array containing data to be hashed
+///
+/// # Returns
+/// Returns hash value
+pub fn hash_bytes(data: &[u8]) -> u32 {
+ hash_bytes_with_seed(data, MURMUR3_DEFAULT_SEED)
+}
+
+#[inline(always)]
+fn hash_bytes_with_seed(data: &[u8], seed: u32) -> u32 {
+ let length = data.len();
+ let chunks = length / CHUNK_SIZE;
+ let length_aligned = chunks * CHUNK_SIZE;
+
+ let mut h1 = hash_full_chunks(data, seed);
+ let mut k1 = 0u32;
+
+ for (shift, &b) in data[length_aligned..].iter().enumerate() {
+ k1 |= (b as u32) << (8 * shift);
+ }
+
+ h1 ^= k1.wrapping_mul(C1).rotate_left(R1).wrapping_mul(C2);
+
+ fmix(h1, length)
+}
+
+/// Hashes the data using Fluss'/Flink's variant of 32-bit Murmur hash with 42
as seed and tail bytes mixed into hash byte-by-byte
+/// Maximum data array size supported is 2GB
+///
+/// # Arguments
+/// * `data` - byte array containing data to be hashed
+///
+/// # Returns
+/// * result of hashing, `Ok(hash_value)`
+///
+/// # Error
+/// Returns `Err(IllegalArgument)` if byte array is larger than 2GB
+pub fn fluss_hash_bytes(data: &[u8]) -> Result<i32> {
+ fluss_hash_bytes_with_seed(data, FLINK_MURMUR3_DEFAULT_SEED)
+}
+#[inline(always)]
+fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> Result<i32> {
+ let length = data.len();
+
+ if length >= i32::MAX as usize {
+ return Err(IllegalArgument {
+ message: "data array size {length} is bigger than
supported".to_string(),
+ });
+ }
+
+ let chunks = length / CHUNK_SIZE;
+ let length_aligned = chunks * CHUNK_SIZE;
+
+ let mut h1 = hash_full_chunks(data, seed as u32);
+
+ for byte in data.iter().take(length).skip(length_aligned) {
+ let k1 = mix_k1(*byte as u32);
+ h1 = mix_h1(h1, k1);
+ }
+
+ Ok(fmix(h1, length) as i32)
+}
+
+#[inline(always)]
+fn hash_full_chunks(data: &[u8], seed: u32) -> u32 {
+ data.chunks_exact(CHUNK_SIZE).fold(seed, |h1, chunk| {
+ let block = u32::from_le_bytes(chunk.try_into().unwrap());
+ let k1 = mix_k1(block);
+ mix_h1(h1, k1)
+ })
+}
+
+#[inline(always)]
+fn mix_k1(k1: u32) -> u32 {
+ k1.wrapping_mul(C1).rotate_left(R1).wrapping_mul(C2)
+}
+
+#[inline(always)]
+fn mix_h1(h1: u32, k1: u32) -> u32 {
+ (h1 ^ k1).rotate_left(R2).wrapping_mul(M).wrapping_add(N)
+}
+
+// Finalization mix - force all bits of a hash block to avalanche
+#[inline(always)]
+fn fmix(mut h1: u32, length: usize) -> u32 {
+ h1 ^= length as u32;
+ bit_mix(h1)
+}
+
+/// Hashes an i32 using Fluss'/Flink's variant of Murmur
+///
+/// # Arguments
+/// * `input` - i32 value to be hashed
+///
+/// # Returns
+/// Returns hash value
+pub fn fluss_hash_i32(input: i32) -> i32 {
+ let mut input = input as u32;
+ input = input.wrapping_mul(C1);
+ input = input.rotate_left(R1);
+ input = input.wrapping_mul(C2);
+ input = input.rotate_left(R2);
+
+ input = input.wrapping_mul(M).wrapping_add(N);
+ input ^= CHUNK_SIZE as u32;
+ let output = bit_mix(input) as i32;
+
+ if output >= 0 {
+ output
+ } else if output != i32::MIN {
+ -output
+ } else {
+ 0
+ }
+}
+
+const BIT_MIX_A: u32 = 0x85EB_CA6B;
+const BIT_MIX_B: u32 = 0xC2B2_AE35;
+
+#[inline(always)]
+fn bit_mix(mut input: u32) -> u32 {
+ input = input ^ (input >> 16);
+ input = input.wrapping_mul(BIT_MIX_A);
+ input = input ^ (input >> 13);
+ input = input.wrapping_mul(BIT_MIX_B);
+ input = input ^ (input >> 16);
+ input
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ #[test]
+ fn test_murmur3() {
+ //
+ let empty_data_hash = hash_bytes(&[]);
+ assert_eq!(empty_data_hash, 0);
+
+ let empty_data_hash = hash_bytes_with_seed(&[], 1);
+ assert_eq!(0x514E_28B7, empty_data_hash);
+
+ let empty_data_hash = hash_bytes_with_seed(&[], 0xFFFF_FFFF);
+ assert_eq!(0x81F1_6F39, empty_data_hash);
+
+ let hash = hash_bytes("The quick brown fox jumps over the lazy
dog".as_bytes());
+ assert_eq!(0x2E4F_F723, hash);
+
+ let hash = hash_bytes_with_seed(
+ "The quick brown fox jumps over the lazy dog".as_bytes(),
+ 0x9747_B28C,
+ );
+ assert_eq!(0x2FA8_26CD, hash);
+ }
+
+ #[test]
+ fn test_flink_murmur() {
+ let empty_data_hash = fluss_hash_bytes_with_seed(&[],
0).expect("Failed to hash");
+ assert_eq!(empty_data_hash, 0);
+
+ let empty_data_hash = fluss_hash_bytes(&[]).expect("Failed to hash");
+ assert_eq!(0x087F_CD5C, empty_data_hash);
+
+ let empty_data_hash =
+ fluss_hash_bytes_with_seed(&[], 0xFFFF_FFFFu32 as
i32).expect("Failed to hash");
+ assert_eq!(0x81F1_6F39u32 as i32, empty_data_hash);
+
+ let hash =
+ fluss_hash_bytes_with_seed("The quick brown fox jumps over the
lazy dog".as_bytes(), 0)
+ .expect("Failed to hash");
+ assert_eq!(0x5FD2_0A20, hash);
+
+ let hash = fluss_hash_bytes("The quick brown fox jumps over the lazy
dog".as_bytes())
+ .expect("Failed to hash");
+ assert_eq!(0x1BC6_F880, hash);
+
+ let hash = fluss_hash_i32(0);
+ assert_eq!(0x2362_F9DE, hash);
+
+ let hash = fluss_hash_i32(42);
+ assert_eq!(0x43A4_6E1D, hash);
+
+ let hash = fluss_hash_i32(-77);
+ assert_eq!(0x2EEB_27DE, hash);
+ }
+}