This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new bb818f2  [TABLE SERVICE] [PYTHON] add hash router for computing the 
hash routing value for a given key
bb818f2 is described below

commit bb818f2c5cf1a92cb45b2e69a9be452708785cad
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Tue Sep 25 10:10:37 2018 -0700

    [TABLE SERVICE] [PYTHON] add hash router for computing the hash routing 
value for a given key
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    As part of implementing python client for #1690, we need a hash router to 
compute the hash routing value
    for a given key.
    
    *Changes*
    
    - Murmur2 is not well supported in python 3. so standarized hash 
computation in using Murmur3.
    - Change java hash router to use Murmur3
    - Add python hash router
    - Add unit tests to cover the hash value computation and make sure it is 
consistent across java and python
    
    Master Issue: #1690
    
    
    
    
    Author:
    
    Reviewers: Jia Zhai <None>
    
    This closes #1698 from sijie/message_router
---
 .../src/main/resources/bookkeeper/suppressions.xml |   1 +
 .../python/bookkeeper/common/router/__init__.py    |  17 +
 .../python/bookkeeper/common/router/router.py      |  25 ++
 stream/clients/python/setup.py                     |   1 +
 .../unit/bookkeeper/common/router/test_router.py   |  19 +
 .../org/apache/bookkeeper/common/hash/Murmur3.java | 443 +++++++++++++++++++++
 .../common/router/AbstractHashRouter.java          |   9 +-
 .../bookkeeper/common/router/HashRouterTest.java   |  64 +++
 8 files changed, 574 insertions(+), 5 deletions(-)

diff --git a/buildtools/src/main/resources/bookkeeper/suppressions.xml 
b/buildtools/src/main/resources/bookkeeper/suppressions.xml
index c45e2fa..232f19b 100644
--- a/buildtools/src/main/resources/bookkeeper/suppressions.xml
+++ b/buildtools/src/main/resources/bookkeeper/suppressions.xml
@@ -25,6 +25,7 @@
     <suppress checks=".*" files=".+[\\/]generated[\\/].+\.java" />
     <suppress checks=".*" files=".+[\\/]generated-sources[\\/].+\.java" />
     <suppress checks=".*" files=".+[\\/]generated-test-sources[\\/].+\.java" />
+    <suppress checks=".*" files=".+[\\/]Murmur3.java" />
 
     <!-- suppress all checks in the copied code -->
     <suppress checks=".*" 
files=".+[\\/]com[\\/]scurrilous[\\/]circe[\\/].+\.java" />
diff --git a/stream/clients/python/bookkeeper/common/router/__init__.py 
b/stream/clients/python/bookkeeper/common/router/__init__.py
new file mode 100644
index 0000000..d7329cf
--- /dev/null
+++ b/stream/clients/python/bookkeeper/common/router/__init__.py
@@ -0,0 +1,17 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from bookkeeper.common.router.router import BytesHashRouter
+
+__all__ = [
+    'BytesHashRouter',
+]
diff --git a/stream/clients/python/bookkeeper/common/router/router.py 
b/stream/clients/python/bookkeeper/common/router/router.py
new file mode 100644
index 0000000..de4f31d
--- /dev/null
+++ b/stream/clients/python/bookkeeper/common/router/router.py
@@ -0,0 +1,25 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mmh3
+
+__SEED__ = 383242705
+
+
+class BytesHashRouter(object):
+    """A router that computes hash values for keys using MurmurHash3"""
+
+    def __init__(self):
+        return
+
+    def getRoutingKey(self, key):
+        return mmh3.hash64(key, seed=__SEED__, signed=True)[0]
diff --git a/stream/clients/python/setup.py b/stream/clients/python/setup.py
index 5fd3e62..49afd7a 100644
--- a/stream/clients/python/setup.py
+++ b/stream/clients/python/setup.py
@@ -33,6 +33,7 @@ dependencies = [
     'pytz',
     'futures>=3.2.0;python_version<"3.2"',
     'grpcio>=1.8.2',
+    'mmh3>=2.5.1'
 ]
 extras = {
 }
diff --git 
a/stream/clients/python/tests/unit/bookkeeper/common/router/test_router.py 
b/stream/clients/python/tests/unit/bookkeeper/common/router/test_router.py
new file mode 100644
index 0000000..55e9eda
--- /dev/null
+++ b/stream/clients/python/tests/unit/bookkeeper/common/router/test_router.py
@@ -0,0 +1,19 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from bookkeeper.common.router import BytesHashRouter
+
+
+def test_get_routing_hash():
+    router = BytesHashRouter()
+    hash_key = router.getRoutingKey("foo")
+    assert hash_key == -2336792112830167536
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/hash/Murmur3.java 
b/stream/common/src/main/java/org/apache/bookkeeper/common/hash/Murmur3.java
new file mode 100644
index 0000000..47852ba
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/hash/Murmur3.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.hash;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Murmur3 is successor to Murmur2 fast non-crytographic hash algorithms.
+ *
+ * Murmur3 32 and 128 bit variants.
+ * 32-bit Java port of 
https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#94
+ * 128-bit Java port of 
https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#255
+ *
+ * This is a public domain code with no copyrights.
+ * From homepage of MurmurHash (https://code.google.com/p/smhasher/),
+ * "All MurmurHash versions are public domain software, and the author 
disclaims all copyright
+ * to their code."
+ */
+public class Murmur3 {
+    // Constants for 32 bit variant
+    private static final int C1_32 = 0xcc9e2d51;
+    private static final int C2_32 = 0x1b873593;
+    private static final int R1_32 = 15;
+    private static final int R2_32 = 13;
+    private static final int M_32 = 5;
+    private static final int N_32 = 0xe6546b64;
+
+    // Constants for 128 bit variant
+    private static final long C1 = 0x87c37b91114253d5L;
+    private static final long C2 = 0x4cf5ad432745937fL;
+    private static final int R1 = 31;
+    private static final int R2 = 27;
+    private static final int R3 = 33;
+    private static final int M = 5;
+    private static final int N1 = 0x52dce729;
+    private static final int N2 = 0x38495ab5;
+
+    public static final int DEFAULT_SEED = 104729;
+
+    /**
+     * Murmur3 32-bit variant.
+     *
+     * @param data - input byte array
+     * @return - hashcode
+     */
+    public static int hash32(byte[] data) {
+        return hash32(data, 0, data.length, DEFAULT_SEED);
+    }
+
+    /**
+     * Murmur3 32-bit variant.
+     *
+     * @param data - input byte array
+     * @param length - length of array
+     * @return - hashcode
+     */
+    public static int hash32(byte[] data, int length) {
+        return hash32(data, 0, length, DEFAULT_SEED);
+    }
+
+    /**
+     * Murmur3 32-bit variant.
+     *
+     * @param data   - input byte array
+     * @param length - length of array
+     * @param seed   - seed. (default 0)
+     * @return - hashcode
+     */
+    public static int hash32(byte[] data, int length, int seed) {
+        return hash32(data, 0, length, seed);
+    }
+
+    /**
+     * Murmur3 32-bit variant.
+     *
+     * @param data   - input byte array
+     * @param offset - offset of data
+     * @param length - length of array
+     * @param seed   - seed. (default 0)
+     * @return - hashcode
+     */
+    public static int hash32(byte[] data, int offset, int length, int seed) {
+        int hash = seed;
+        final int nblocks = length >> 2;
+
+        // body
+        for (int i = 0; i < nblocks; i++) {
+            int i_4 = i << 2;
+            int k = (data[offset + i_4] & 0xff)
+                | ((data[offset + i_4 + 1] & 0xff) << 8)
+                | ((data[offset + i_4 + 2] & 0xff) << 16)
+                | ((data[offset + i_4 + 3] & 0xff) << 24);
+
+            // mix functions
+            k *= C1_32;
+            k = Integer.rotateLeft(k, R1_32);
+            k *= C2_32;
+            hash ^= k;
+            hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
+        }
+
+        // tail
+        int idx = nblocks << 2;
+        int k1 = 0;
+        switch (length - idx) {
+            case 3:
+                k1 ^= data[offset + idx + 2] << 16;
+            case 2:
+                k1 ^= data[offset + idx + 1] << 8;
+            case 1:
+                k1 ^= data[offset + idx];
+
+                // mix functions
+                k1 *= C1_32;
+                k1 = Integer.rotateLeft(k1, R1_32);
+                k1 *= C2_32;
+                hash ^= k1;
+        }
+
+        // finalization
+        hash ^= length;
+        hash ^= (hash >>> 16);
+        hash *= 0x85ebca6b;
+        hash ^= (hash >>> 13);
+        hash *= 0xc2b2ae35;
+        hash ^= (hash >>> 16);
+
+        return hash;
+    }
+
+    public static int hash32(ByteBuf data, int offset, int length, int seed) {
+        int hash = seed;
+        final int nblocks = length >> 2;
+
+        // body
+        for (int i = 0; i < nblocks; i++) {
+            int i_4 = i << 2;
+            int k = (data.getByte(offset + i_4) & 0xff)
+                | ((data.getByte(offset + i_4 + 1) & 0xff) << 8)
+                | ((data.getByte(offset + i_4 + 2) & 0xff) << 16)
+                | ((data.getByte(offset + i_4 + 3) & 0xff) << 24);
+
+            // mix functions
+            k *= C1_32;
+            k = Integer.rotateLeft(k, R1_32);
+            k *= C2_32;
+            hash ^= k;
+            hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
+        }
+
+        // tail
+        int idx = nblocks << 2;
+        int k1 = 0;
+        switch (length - idx) {
+            case 3:
+                k1 ^= data.getByte(offset + idx + 2) << 16;
+            case 2:
+                k1 ^= data.getByte(offset + idx + 1) << 8;
+            case 1:
+                k1 ^= data.getByte(offset + idx);
+
+                // mix functions
+                k1 *= C1_32;
+                k1 = Integer.rotateLeft(k1, R1_32);
+                k1 *= C2_32;
+                hash ^= k1;
+        }
+
+        // finalization
+        hash ^= length;
+        hash ^= (hash >>> 16);
+        hash *= 0x85ebca6b;
+        hash ^= (hash >>> 13);
+        hash *= 0xc2b2ae35;
+        hash ^= (hash >>> 16);
+
+        return hash;
+    }
+
+    /**
+     * Murmur3 128-bit variant.
+     *
+     * @param data - input byte array
+     * @return - hashcode (2 longs)
+     */
+    public static long[] hash128(byte[] data) {
+        return hash128(data, 0, data.length, DEFAULT_SEED);
+    }
+
+    /**
+     * Murmur3 128-bit variant.
+     *
+     * @param data   - input byte array
+     * @param offset - the first element of array
+     * @param length - length of array
+     * @param seed   - seed. (default is 0)
+     * @return - hashcode (2 longs)
+     */
+    public static long[] hash128(byte[] data, int offset, int length, int 
seed) {
+        long h1 = seed;
+        long h2 = seed;
+        final int nblocks = length >> 4;
+
+        // body
+        for (int i = 0; i < nblocks; i++) {
+            final int i16 = i << 4;
+            long k1 = ((long) data[offset + i16] & 0xff)
+                | (((long) data[offset + i16 + 1] & 0xff) << 8)
+                | (((long) data[offset + i16 + 2] & 0xff) << 16)
+                | (((long) data[offset + i16 + 3] & 0xff) << 24)
+                | (((long) data[offset + i16 + 4] & 0xff) << 32)
+                | (((long) data[offset + i16 + 5] & 0xff) << 40)
+                | (((long) data[offset + i16 + 6] & 0xff) << 48)
+                | (((long) data[offset + i16 + 7] & 0xff) << 56);
+
+            long k2 = ((long) data[offset + i16 + 8] & 0xff)
+                | (((long) data[offset + i16 + 9] & 0xff) << 8)
+                | (((long) data[offset + i16 + 10] & 0xff) << 16)
+                | (((long) data[offset + i16 + 11] & 0xff) << 24)
+                | (((long) data[offset + i16 + 12] & 0xff) << 32)
+                | (((long) data[offset + i16 + 13] & 0xff) << 40)
+                | (((long) data[offset + i16 + 14] & 0xff) << 48)
+                | (((long) data[offset + i16 + 15] & 0xff) << 56);
+
+            // mix functions for k1
+            k1 *= C1;
+            k1 = Long.rotateLeft(k1, R1);
+            k1 *= C2;
+            h1 ^= k1;
+            h1 = Long.rotateLeft(h1, R2);
+            h1 += h2;
+            h1 = h1 * M + N1;
+
+            // mix functions for k2
+            k2 *= C2;
+            k2 = Long.rotateLeft(k2, R3);
+            k2 *= C1;
+            h2 ^= k2;
+            h2 = Long.rotateLeft(h2, R1);
+            h2 += h1;
+            h2 = h2 * M + N2;
+        }
+
+        // tail
+        long k1 = 0;
+        long k2 = 0;
+        int tailStart = nblocks << 4;
+        switch (length - tailStart) {
+            case 15:
+                k2 ^= (long) (data[offset + tailStart + 14] & 0xff) << 48;
+            case 14:
+                k2 ^= (long) (data[offset + tailStart + 13] & 0xff) << 40;
+            case 13:
+                k2 ^= (long) (data[offset + tailStart + 12] & 0xff) << 32;
+            case 12:
+                k2 ^= (long) (data[offset + tailStart + 11] & 0xff) << 24;
+            case 11:
+                k2 ^= (long) (data[offset + tailStart + 10] & 0xff) << 16;
+            case 10:
+                k2 ^= (long) (data[offset + tailStart + 9] & 0xff) << 8;
+            case 9:
+                k2 ^= (long) (data[offset + tailStart + 8] & 0xff);
+                k2 *= C2;
+                k2 = Long.rotateLeft(k2, R3);
+                k2 *= C1;
+                h2 ^= k2;
+
+            case 8:
+                k1 ^= (long) (data[offset + tailStart + 7] & 0xff) << 56;
+            case 7:
+                k1 ^= (long) (data[offset + tailStart + 6] & 0xff) << 48;
+            case 6:
+                k1 ^= (long) (data[offset + tailStart + 5] & 0xff) << 40;
+            case 5:
+                k1 ^= (long) (data[offset + tailStart + 4] & 0xff) << 32;
+            case 4:
+                k1 ^= (long) (data[offset + tailStart + 3] & 0xff) << 24;
+            case 3:
+                k1 ^= (long) (data[offset + tailStart + 2] & 0xff) << 16;
+            case 2:
+                k1 ^= (long) (data[offset + tailStart + 1] & 0xff) << 8;
+            case 1:
+                k1 ^= (long) (data[offset + tailStart] & 0xff);
+                k1 *= C1;
+                k1 = Long.rotateLeft(k1, R1);
+                k1 *= C2;
+                h1 ^= k1;
+        }
+
+        // finalization
+        h1 ^= length;
+        h2 ^= length;
+
+        h1 += h2;
+        h2 += h1;
+
+        h1 = fmix64(h1);
+        h2 = fmix64(h2);
+
+        h1 += h2;
+        h2 += h1;
+
+        return new long[]{h1, h2};
+    }
+
+    /**
+     * Murmur3 128-bit variant.
+     *
+     * @param data   - input byte array
+     * @param offset - the first element of array
+     * @param length - length of array
+     * @param seed   - seed. (default is 0)
+     * @return - hashcode (2 longs)
+     */
+    public static long[] hash128(ByteBuf data, int offset, int length, long 
seed) {
+        long h1 = seed;
+        long h2 = seed;
+        final int nblocks = length >> 4;
+
+        // body
+        for (int i = 0; i < nblocks; i++) {
+            final int i16 = i << 4;
+            long k1 = ((long) data.getByte(offset + i16) & 0xff)
+                | (((long) data.getByte(offset + i16 + 1) & 0xff) << 8)
+                | (((long) data.getByte(offset + i16 + 2) & 0xff) << 16)
+                | (((long) data.getByte(offset + i16 + 3) & 0xff) << 24)
+                | (((long) data.getByte(offset + i16 + 4) & 0xff) << 32)
+                | (((long) data.getByte(offset + i16 + 5) & 0xff) << 40)
+                | (((long) data.getByte(offset + i16 + 6) & 0xff) << 48)
+                | (((long) data.getByte(offset + i16 + 7) & 0xff) << 56);
+
+            long k2 = ((long) data.getByte(offset + i16 + 8) & 0xff)
+                | (((long) data.getByte(offset + i16 + 9) & 0xff) << 8)
+                | (((long) data.getByte(offset + i16 + 10) & 0xff) << 16)
+                | (((long) data.getByte(offset + i16 + 11) & 0xff) << 24)
+                | (((long) data.getByte(offset + i16 + 12) & 0xff) << 32)
+                | (((long) data.getByte(offset + i16 + 13) & 0xff) << 40)
+                | (((long) data.getByte(offset + i16 + 14) & 0xff) << 48)
+                | (((long) data.getByte(offset + i16 + 15) & 0xff) << 56);
+
+            // mix functions for k1
+            k1 *= C1;
+            k1 = Long.rotateLeft(k1, R1);
+            k1 *= C2;
+            h1 ^= k1;
+            h1 = Long.rotateLeft(h1, R2);
+            h1 += h2;
+            h1 = h1 * M + N1;
+
+            // mix functions for k2
+            k2 *= C2;
+            k2 = Long.rotateLeft(k2, R3);
+            k2 *= C1;
+            h2 ^= k2;
+            h2 = Long.rotateLeft(h2, R1);
+            h2 += h1;
+            h2 = h2 * M + N2;
+        }
+
+        // tail
+        long k1 = 0;
+        long k2 = 0;
+        int tailStart = nblocks << 4;
+        switch (length - tailStart) {
+            case 15:
+                k2 ^= (long) (data.getByte(offset + tailStart + 14) & 0xff) << 
48;
+            case 14:
+                k2 ^= (long) (data.getByte(offset + tailStart + 13) & 0xff) << 
40;
+            case 13:
+                k2 ^= (long) (data.getByte(offset + tailStart + 12) & 0xff) << 
32;
+            case 12:
+                k2 ^= (long) (data.getByte(offset + tailStart + 11) & 0xff) << 
24;
+            case 11:
+                k2 ^= (long) (data.getByte(offset + tailStart + 10) & 0xff) << 
16;
+            case 10:
+                k2 ^= (long) (data.getByte(offset + tailStart + 9) & 0xff) << 
8;
+            case 9:
+                k2 ^= (long) (data.getByte(offset + tailStart + 8) & 0xff);
+                k2 *= C2;
+                k2 = Long.rotateLeft(k2, R3);
+                k2 *= C1;
+                h2 ^= k2;
+
+            case 8:
+                k1 ^= (long) (data.getByte(offset + tailStart + 7) & 0xff) << 
56;
+            case 7:
+                k1 ^= (long) (data.getByte(offset + tailStart + 6) & 0xff) << 
48;
+            case 6:
+                k1 ^= (long) (data.getByte(offset + tailStart + 5) & 0xff) << 
40;
+            case 5:
+                k1 ^= (long) (data.getByte(offset + tailStart + 4) & 0xff) << 
32;
+            case 4:
+                k1 ^= (long) (data.getByte(offset + tailStart + 3) & 0xff) << 
24;
+            case 3:
+                k1 ^= (long) (data.getByte(offset + tailStart + 2) & 0xff) << 
16;
+            case 2:
+                k1 ^= (long) (data.getByte(offset + tailStart + 1) & 0xff) << 
8;
+            case 1:
+                k1 ^= (long) (data.getByte(offset + tailStart) & 0xff);
+                k1 *= C1;
+                k1 = Long.rotateLeft(k1, R1);
+                k1 *= C2;
+                h1 ^= k1;
+        }
+
+        // finalization
+        h1 ^= length;
+        h2 ^= length;
+
+        h1 += h2;
+        h2 += h1;
+
+        h1 = fmix64(h1);
+        h2 = fmix64(h2);
+
+        h1 += h2;
+        h2 += h1;
+
+        return new long[]{h1, h2};
+    }
+
+    private static long fmix64(long h) {
+        h ^= (h >>> 33);
+        h *= 0xff51afd7ed558ccdL;
+        h ^= (h >>> 33);
+        h *= 0xc4ceb9fe1a85ec53L;
+        h ^= (h >>> 33);
+        return h;
+    }
+
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java
index 2c6e052..7dadf2d 100644
--- 
a/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java
@@ -18,7 +18,7 @@
 package org.apache.bookkeeper.common.router;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.bookkeeper.common.hash.MurmurHash;
+import org.apache.bookkeeper.common.hash.Murmur3;
 
 /**
  * The base hash router.
@@ -26,8 +26,7 @@ import org.apache.bookkeeper.common.hash.MurmurHash;
 public abstract class AbstractHashRouter<K> implements HashRouter<K> {
 
     private static final long serialVersionUID = -7979076779920023308L;
-    // the MurmurHash2 value of "ZSTREAM"
-    static final long HASH_SEED = 383242705L;
+    public static final long HASH_SEED = 383242705L;
 
     protected AbstractHashRouter() {
     }
@@ -36,8 +35,8 @@ public abstract class AbstractHashRouter<K> implements 
HashRouter<K> {
     public Long getRoutingKey(K key) {
         ByteBuf keyData = getRoutingKeyData(key);
         try {
-            return MurmurHash.hash64(
-                keyData, keyData.readerIndex(), keyData.readableBytes(), 
HASH_SEED);
+            return Murmur3.hash128(
+                keyData, keyData.readerIndex(), keyData.readableBytes(), 
HASH_SEED)[0];
         } finally {
             keyData.release();
         }
diff --git 
a/stream/common/src/test/java/org/apache/bookkeeper/common/router/HashRouterTest.java
 
b/stream/common/src/test/java/org/apache/bookkeeper/common/router/HashRouterTest.java
new file mode 100644
index 0000000..5d4ed79
--- /dev/null
+++ 
b/stream/common/src/test/java/org/apache/bookkeeper/common/router/HashRouterTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.router;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.hash.Hashing;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.hash.Murmur3;
+import org.junit.Test;
+
+/**
+ * Unit test {@link HashRouter}s.
+ */
+@Slf4j
+public class HashRouterTest {
+
+    @Test
+    public void testByteBufHashRouter() {
+        ByteBuf key = Unpooled.wrappedBuffer("foo".getBytes(UTF_8));
+
+        // murmur3 - 32 bits
+        int hash32 = Murmur3.hash32(
+            key, key.readerIndex(), key.readableBytes(), (int) 
AbstractHashRouter.HASH_SEED);
+        int guavaHash32 = Hashing.murmur3_32((int) 
AbstractHashRouter.HASH_SEED)
+            .newHasher()
+            .putString("foo", UTF_8)
+            .hash()
+            .asInt();
+        assertEquals(hash32, guavaHash32);
+
+        // murmur3 - 128 bits
+        long[] hash128 = Murmur3.hash128(
+            key, key.readerIndex(), key.readableBytes(), 
AbstractHashRouter.HASH_SEED);
+        log.info("hash128: {}", hash128);
+        long guavaHash128 = Hashing.murmur3_128((int) 
AbstractHashRouter.HASH_SEED)
+            .newHasher()
+            .putString("foo", UTF_8)
+            .hash()
+            .asLong();
+        assertEquals(hash128[0], guavaHash128);
+
+        ByteBufHashRouter router = ByteBufHashRouter.of();
+        long routingHash = router.getRoutingKey(key).longValue();
+        log.info("Routing hash = {}", routingHash);
+        assertEquals(hash128[0], routingHash);
+    }
+
+}

Reply via email to