This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 74877be2aa Refactor murmur functions to pinot-spi (#15160)
74877be2aa is described below
commit 74877be2aa599baa8a2ea16529a07a6dbf611f2c
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Mar 3 15:53:33 2025 -0800
Refactor murmur functions to pinot-spi (#15160)
---
.../common/function/scalar/HashFunctions.java | 168 ++++++-
.../org/apache/pinot/common/utils/HashUtil.java | 120 -----
.../common/utils/helix/LeadControllerUtils.java | 6 +-
.../common/function/scalar/HashFunctionsTest.java | 142 ++++++
.../spi/partition/Murmur3PartitionFunction.java | 203 +-------
.../spi/partition/MurmurPartitionFunction.java | 56 +--
.../spi/partition/PartitionFunctionTest.java | 19 +-
.../pinot/spi/utils/hash/MurmurHashFunctions.java | 514 +++++++++++++++++++++
.../admin/command/StreamAvroIntoKafkaCommand.java | 4 +-
9 files changed, 843 insertions(+), 389 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java
index ccf14a122d..6d167af166 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java
@@ -18,8 +18,11 @@
*/
package org.apache.pinot.common.function.scalar;
+import com.google.common.hash.Hashing;
+import java.nio.charset.StandardCharsets;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pinot.spi.annotations.ScalarFunction;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
/**
@@ -45,7 +48,18 @@ public class HashFunctions {
*/
@ScalarFunction
public static String sha(byte[] input) {
- return DigestUtils.shaHex(input);
+ return DigestUtils.sha1Hex(input);
+ }
+
+ /**
+ * Return SHA-224 digest as hex string.
+ *
+ * @param input the byte array representing the data
+ * @return hash string in hex format
+ */
+ @ScalarFunction
+ public static String sha224(byte[] input) {
+ return DigestUtils.sha3_224Hex(input);
}
/**
@@ -70,6 +84,17 @@ public class HashFunctions {
return DigestUtils.sha512Hex(input);
}
+ /**
+ * Return MD2 digest as hex string.
+ *
+ * @param input the byte array representing the data
+ * @return hash string in hex format
+ */
+ @ScalarFunction
+ public static String md2(byte[] input) {
+ return DigestUtils.md2Hex(input);
+ }
+
/**
* Return MD5 digest as hex string.
*
@@ -80,4 +105,145 @@ public class HashFunctions {
public static String md5(byte[] input) {
return DigestUtils.md5Hex(input);
}
+
+ /**
+ * Computes 32-bit MurmurHash2 of the given byte array.
+ *
+ * @param input the byte array to hash
+ * @return 32-bit hash
+ */
+ @ScalarFunction
+ public static int murmurHash2(byte[] input) {
+ return MurmurHashFunctions.murmurHash2(input);
+ }
+
+ /**
+ * Computes 32-bit MurmurHash2 of the given string.
+ *
+ * @param input the byte array to hash
+ * @return 32-bit hash
+ */
+ @ScalarFunction
+ public static int murmurHash2UTF8(String input) {
+ return
MurmurHashFunctions.murmurHash2(input.getBytes(StandardCharsets.UTF_8));
+ }
+
+ /**
+ * Computes 64-bit MurmurHash2 of the given byte array.
+ *
+ * @param input the byte array to hash
+ * @return 64-bit hash
+ */
+ @ScalarFunction
+ public static long murmurHash2Bit64(byte[] input) {
+ return MurmurHashFunctions.murmurHash2Bit64(input);
+ }
+
+ /**
+ * Computes 64-bit MurmurHash2 of the given byte array and seed.
+ *
+ * @param input the byte array to hash
+ * @return 64-bit hash
+ */
+ @ScalarFunction
+ public static long murmurHash2Bit64(byte[] input, int seed) {
+ return MurmurHashFunctions.murmurHash2Bit64(input, input.length, seed);
+ }
+
+ /**
+ * Computes 32-bit Murmur3 Hash of the given byte array and seed.
+ *
+ * @param input the byte array to hash
+ * @return 32-bit hash
+ */
+ @ScalarFunction
+ public static int murmurHash3Bit32(byte[] input, int seed) {
+ return Hashing.murmur3_32_fixed(seed).hashBytes(input).asInt();
+ }
+
+ /**
+ * Computes 64-bit Murmur3 Hash of the given byte array and seed.
+ *
+ * @param input the byte array to hash
+ * @return 64-bit hash
+ */
+ @ScalarFunction
+ public static long murmurHash3Bit64(byte[] input, int seed) {
+ return Hashing.murmur3_128(seed).hashBytes(input).asLong();
+ }
+
+ /**
+ * Computes 128-bit Murmur3 Hash of the given byte array and seed.
+ *
+ * @param input the byte array to hash
+ * @return 128-bit hash represented in a 16-byte array
+ */
+ @ScalarFunction
+ public static byte[] murmurHash3Bit128(byte[] input, int seed) {
+ return Hashing.murmur3_128(seed).hashBytes(input).asBytes();
+ }
+
+ /**
+ * Computes 32-bit Murmur3 Hash of the given byte array and seed for x64
platform.
+ *
+ * @param input the byte array to hash
+ * @return 32-bit hash
+ */
+ @ScalarFunction
+ public static int murmurHash3X64Bit32(byte[] input, int seed) {
+ return MurmurHashFunctions.murmurHash3X64Bit32(input, seed);
+ }
+
+ /**
+ * Computes 64-bit Murmur3 Hash of the given byte array and seed for x64
platform.
+ *
+ * @param input the byte array to hash
+ * @return 64-bit hash
+ */
+ @ScalarFunction
+ public static long murmurHash3X64Bit64(byte[] input, int seed) {
+ return MurmurHashFunctions.murmurHash3X64Bit32(input, seed);
+ }
+
+ /**
+ * Computes 128-bit Murmur3 Hash of the given byte array and seed for x64
platform.
+ *
+ * @param input the byte array to hash
+ * @return 128-bit hash represented in a 16-byte array
+ */
+ @ScalarFunction
+ public static byte[] murmurHash3X64Bit128(byte[] input, int seed) {
+ return MurmurHashFunctions.murmurHash3X64Bit128(input, seed);
+ }
+
+ /**
+ * Computes 32-bit Adler Hash of the given byte array
+ * @param input the byte array to hash
+ * @return 32-bit hash
+ */
+ @ScalarFunction
+ public static int adler32(byte[] input) {
+ return Hashing.adler32().hashBytes(input).asInt();
+ }
+
+ /**
+ * Computes 32-bit CRC (Cyclic Redundancy Check) of the given byte array
+ * @param input the byte array to hash
+ * @return 32-bit CRC32 hash
+ */
+ @ScalarFunction
+ public static int crc32(byte[] input) {
+ return Hashing.crc32().hashBytes(input).asInt();
+ }
+
+ /**
+ * Computes 32-bit CRC32C (Cyclic Redundancy Check 32C) of the given byte
array.
+ *
+ * @param input the byte array to hash
+ * @return 32-bit CRC32C hash
+ */
+ @ScalarFunction
+ public static int crc32c(byte[] input) {
+ return Hashing.crc32c().hashBytes(input).asInt();
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
index a8c5cc9985..de0409751e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
@@ -19,8 +19,6 @@
package org.apache.pinot.common.utils;
import com.google.common.primitives.Ints;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
public class HashUtil {
@@ -56,122 +54,4 @@ public class HashUtil {
}
return Integer.MAX_VALUE;
}
-
- public static long compute(IntBuffer buff) {
- buff.rewind();
- ByteBuffer bBuff = ByteBuffer.allocate(buff.array().length * 4);
- for (int i : buff.array()) {
- bBuff.putInt(i);
- }
- return compute(bBuff);
- }
-
- public static long compute(ByteBuffer buff) {
- return hash64(buff.array(), buff.array().length);
- }
-
- public static long hash64(final byte[] data, int length) {
- // Default seed is 0xe17a1465.
- return hash64(data, length, 0xe17a1465);
- }
-
- // Implement 64-bit Murmur2 hash.
- public static long hash64(final byte[] data, int length, int seed) {
- final long m = 0xc6a4a7935bd1e995L;
- final int r = 47;
-
- long h = (seed & 0xffffffffL) ^ (length * m);
-
- int length8 = length / 8;
-
- for (int i = 0; i < length8; i++) {
- final int i8 = i * 8;
- long k =
- ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) +
(((long) data[i8 + 2] & 0xff) << 16) + (
- ((long) data[i8 + 3] & 0xff) << 24) + (((long) data[i8 + 4] &
0xff) << 32) + (((long) data[i8 + 5] & 0xff)
- << 40) + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8
+ 7] & 0xff) << 56);
-
- k *= m;
- k ^= k >>> r;
- k *= m;
-
- h ^= k;
- h *= m;
- }
-
- // CHECKSTYLE:OFF: checkstyle:coding
- switch (length % 8) {
- case 7:
- h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48;
- case 6:
- h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40;
- case 5:
- h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32;
- case 4:
- h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24;
- case 3:
- h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16;
- case 2:
- h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8;
- case 1:
- h ^= data[length & ~7] & 0xff;
- h *= m;
- }
- // CHECKSTYLE:ON: checkstyle:coding
-
- h ^= h >>> r;
- h *= m;
- h ^= h >>> r;
- return h;
- }
-
- /**
- * Generates 32 bit murmur2 hash from byte array
- * @param data byte array to hash
- * @return 32 bit hash of the given array
- */
- @SuppressWarnings("checkstyle")
- public static int murmur2(final byte[] data) {
- int length = data.length;
- int seed = 0x9747b28c;
- // 'm' and 'r' are mixing constants generated offline.
- // They're not really 'magic', they just happen to work well.
- final int m = 0x5bd1e995;
- final int r = 24;
-
- // Initialize the hash to a random value
- int h = seed ^ length;
- int length4 = length / 4;
-
- for (int i = 0; i < length4; i++) {
- final int i4 = i * 4;
- int k =
- (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 +
2] & 0xff) << 16) + ((data[i4 + 3] & 0xff)
- << 24);
- k *= m;
- k ^= k >>> r;
- k *= m;
- h *= m;
- h ^= k;
- }
-
- // CHECKSTYLE:OFF: checkstyle:coding
- // Handle the last few bytes of the input array
- switch (length % 4) {
- case 3:
- h ^= (data[(length & ~3) + 2] & 0xff) << 16;
- case 2:
- h ^= (data[(length & ~3) + 1] & 0xff) << 8;
- case 1:
- h ^= data[length & ~3] & 0xff;
- h *= m;
- }
- // CHECKSTYLE:ON: checkstyle:coding
-
- h ^= h >>> 13;
- h *= m;
- h ^= h >>> 15;
-
- return h;
- }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java
index c7dffb044c..9984f5f463 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java
@@ -24,7 +24,7 @@ import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ResourceConfig;
-import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.function.scalar.HashFunctions;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +40,7 @@ public class LeadControllerUtils {
/**
* Given a raw table name and number of partitions, returns the partition id
in lead controller resource.
- * Uses murmur2 function to get hashcode for table, ignores the most
significant bit.
+ * Uses murmurHash2 function to get hashcode for table, ignores the most
significant bit.
* Note: This method CANNOT be changed when lead controller resource is
enabled.
* Otherwise it will assign different controller for the same table, which
will mess up the controller periodic
* tasks and realtime segment completion.
@@ -48,7 +48,7 @@ public class LeadControllerUtils {
* @return partition id in lead controller resource.
*/
public static int getPartitionIdForTable(String rawTableName) {
- return (HashUtil.murmur2(rawTableName.getBytes(UTF_8)) & Integer.MAX_VALUE)
+ return (HashFunctions.murmurHash2(rawTableName.getBytes(UTF_8)) &
Integer.MAX_VALUE)
% Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/HashFunctionsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/HashFunctionsTest.java
new file mode 100644
index 0000000000..6f23c8c89a
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/HashFunctionsTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.function.scalar;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class HashFunctionsTest {
+
+ @Test
+ public void testShaHash() {
+ String input = "testString";
+ assertEquals(HashFunctions.sha(input.getBytes()),
"956265657d0b637ef65b9b59f9f858eecf55ed6a");
+ }
+
+ @Test
+ public void testSha2224Hash() {
+ String input = "testString";
+ assertEquals(HashFunctions.sha224(input.getBytes()),
+ "bb54d1095764bff72b570dcdc3172ed6d1b26695494528a0059c95ae");
+ }
+
+ @Test
+ public void testSha256Hash() {
+ String input = "testString";
+ assertEquals(HashFunctions.sha256(input.getBytes()),
+ "4acf0b39d9c4766709a3689f553ac01ab550545ffa4544dfc0b2cea82fba02a3");
+ }
+
+ @Test
+ public void testSha512Hash() {
+ String input = "testString";
+ assertEquals(HashFunctions.sha512(input.getBytes()),
+ "c48af5a7f6d4a851fc8a434eed638ab1a6ef68e19dbcae894ac67c9fbc5bcb01"
+ +
"82b8e7123b3df3c9e4dcb7690c23103f03dc17f54352071ceb2a4eb204b26b91");
+ }
+
+ @Test
+ public void testMd2Hash() {
+ String input = "testString";
+ assertEquals(HashFunctions.md2(input.getBytes()),
"466c453913ba0d8325f96b2d47984fb5");
+ }
+
+ @Test
+ public void testMd5Hash() {
+ String input = "testString";
+ assertEquals(HashFunctions.md5(input.getBytes()),
"536788f4dbdffeecfbb8f350a941eea3");
+ }
+
+ @Test
+ public void testMurmurHash2() {
+ String input = "testString";
+ assertEquals(HashFunctions.murmurHash2(input.getBytes()), -534425817);
+ }
+
+ @Test
+ public void testMurmurHash2UTF8() {
+ String input = "testString";
+ assertEquals(HashFunctions.murmurHash2UTF8(input), -534425817);
+ }
+
+ @Test
+ public void testMurmurHash2Bit64() {
+ String input = "testString";
+ assertEquals(HashFunctions.murmurHash2Bit64(input.getBytes()),
3907736674355139845L);
+ assertEquals(HashFunctions.murmurHash2Bit64(input.getBytes(), 12345),
-2138976126980760436L);
+ }
+
+ @Test
+ public void testMurmurHash3Bit32() {
+ String input = "testString";
+ assertEquals(HashFunctions.murmurHash3Bit32(input.getBytes(), 0),
-1435605585);
+ }
+
+ @Test
+ public void testMurmurHash3Bit64() {
+ String input = "testString";
+ assertEquals(HashFunctions.murmurHash3Bit64(input.getBytes(), 0),
-3652179990542706350L);
+ }
+
+ @Test
+ public void testMurmurHash3Bit128() {
+ String input = "testString";
+ assertEquals(HashFunctions.murmurHash3Bit128(input.getBytes(), 0),
+ new byte[]{82, -103, -23, 15, -90, -39, 80, -51, 15, 73, -81, -28,
111, -21, -78, 108});
+ }
+
+ @Test
+ public void testMurmurHash3X64Bit32() {
+ String input = "testString";
+ assertEquals(HashFunctions.murmurHash3X64Bit32(input.getBytes(), 0),
-1096986291);
+ }
+
+ @Test
+ public void testMurmurHash3X64Bit64() {
+ String input = "testString";
+ assertEquals(HashFunctions.murmurHash3X64Bit64(input.getBytes(), 0),
-1096986291L);
+ }
+
+ @Test
+ public void testMurmurHash3X64Bit128() {
+ String input = "testString";
+ assertEquals(HashFunctions.murmurHash3X64Bit128(input.getBytes(), 0),
+ new byte[]{-66, -99, 81, 77, -7, 29, 124, 76, 42, 38, -34, -42, -92,
-83, 83, 13});
+ }
+
+ @Test
+ public void testAdler32() {
+ String input = "testString";
+ assertEquals(HashFunctions.adler32(input.getBytes()), 392102968);
+ }
+
+ @Test
+ public void testCrc32() {
+ String input = "testString";
+ assertEquals(HashFunctions.crc32(input.getBytes()), 418708744);
+ }
+
+ @Test
+ public void testCrc32c() {
+ String input = "testString";
+ assertEquals(HashFunctions.crc32c(input.getBytes()), -1608760557);
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
index c8f673a3dd..ba22c1a992 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
@@ -18,11 +18,10 @@
*/
package org.apache.pinot.segment.spi.partition;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.hash.Hashing;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -31,7 +30,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* Implementation of {@link PartitionFunction} which partitions based on 32
bit murmur3 hash
*/
public class Murmur3PartitionFunction implements PartitionFunction {
- public static final byte INVALID_CHAR = (byte) '?';
private static final String NAME = "Murmur3";
private static final String SEED_KEY = "seed";
private static final String VARIANT_KEY = "variant";
@@ -71,7 +69,8 @@ public class Murmur3PartitionFunction implements
PartitionFunction {
@Override
public int getPartition(String value) {
- int hash = _useX64 ? murmur3Hash32BitsX64(value, _seed) :
murmur3Hash32BitsX86(value.getBytes(UTF_8), _seed);
+ int hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(value, _seed)
+ : MurmurHashFunctions.murmurHash3X86Bit32(value.getBytes(UTF_8),
_seed);
return (hash & Integer.MAX_VALUE) % _numPartitions;
}
@@ -90,200 +89,4 @@ public class Murmur3PartitionFunction implements
PartitionFunction {
public String toString() {
return NAME;
}
-
- @VisibleForTesting
- static int murmur3Hash32BitsX86(byte[] data, int seed) {
- return Hashing.murmur3_32_fixed(seed).hashBytes(data).asInt();
- }
-
- /**
- * Taken from <a href=
- *
"https://github.com/infinispan/infinispan/blob/main/commons/all/src/main/java/org/infinispan/commons/hash
- * /MurmurHash3.java"
- * >Infinispan code base</a>.
- *
- * MurmurHash3 implementation in Java, based on Austin Appleby's <a href=
- * "https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp"
- * >original in C</a>
- *
- * This is an implementation of MurmurHash3 to generate 32 bit hash for x64
architecture (not part of the original
- * Murmur3 implementations) used by Infinispan and Debezium, Removed the
parts that we don't need and formatted
- * the code to Apache Pinot's Checkstyle.
- *
- * @author Patrick McFarland
- * @see <a href="http://sites.google.com/site/murmurhash/">MurmurHash
website</a>
- * @see <a href="http://en.wikipedia.org/wiki/MurmurHash">MurmurHash entry
on Wikipedia</a>
- */
-
- private static void bmix(State state) {
- state._k1 *= state._c1;
- state._k1 = (state._k1 << 23) | (state._k1 >>> 64 - 23);
- state._k1 *= state._c2;
- state._h1 ^= state._k1;
- state._h1 += state._h2;
-
- state._h2 = (state._h2 << 41) | (state._h2 >>> 64 - 41);
-
- state._k2 *= state._c2;
- state._k2 = (state._k2 << 23) | (state._k2 >>> 64 - 23);
- state._k2 *= state._c1;
- state._h2 ^= state._k2;
- state._h2 += state._h1;
-
- state._h1 = state._h1 * 3 + 0x52dce729;
- state._h2 = state._h2 * 3 + 0x38495ab5;
-
- state._c1 = state._c1 * 5 + 0x7b7d159c;
- state._c2 = state._c2 * 5 + 0x6bce6396;
- }
-
- private static long fmix(long k) {
- k ^= k >>> 33;
- k *= 0xff51afd7ed558ccdL;
- k ^= k >>> 33;
- k *= 0xc4ceb9fe1a85ec53L;
- k ^= k >>> 33;
-
- return k;
- }
-
- @VisibleForTesting
- static int murmur3Hash32BitsX64(String s, int seed) {
- State state = new State();
-
- state._h1 = 0x9368e53c2f6af274L ^ seed;
- state._h2 = 0x586dcd208f7cd3fdL ^ seed;
-
- state._c1 = 0x87c37b91114253d5L;
- state._c2 = 0x4cf5ad432745937fL;
-
- int byteLen = 0;
- int stringLen = s.length();
-
- // CHECKSTYLE:OFF
- for (int i = 0; i < stringLen; i++) {
- char c1 = s.charAt(i);
- int cp;
- if (!Character.isSurrogate(c1)) {
- cp = c1;
- } else if (Character.isHighSurrogate(c1)) {
- if (i + 1 < stringLen) {
- char c2 = s.charAt(i + 1);
- if (Character.isLowSurrogate(c2)) {
- i++;
- cp = Character.toCodePoint(c1, c2);
- } else {
- cp = INVALID_CHAR;
- }
- } else {
- cp = INVALID_CHAR;
- }
- } else {
- cp = INVALID_CHAR;
- }
-
- if (cp <= 0x7f) {
- addByte(state, (byte) cp, byteLen++);
- } else if (cp <= 0x07ff) {
- byte b1 = (byte) (0xc0 | (0x1f & (cp >> 6)));
- byte b2 = (byte) (0x80 | (0x3f & cp));
- addByte(state, b1, byteLen++);
- addByte(state, b2, byteLen++);
- } else if (cp <= 0xffff) {
- byte b1 = (byte) (0xe0 | (0x0f & (cp >> 12)));
- byte b2 = (byte) (0x80 | (0x3f & (cp >> 6)));
- byte b3 = (byte) (0x80 | (0x3f & cp));
- addByte(state, b1, byteLen++);
- addByte(state, b2, byteLen++);
- addByte(state, b3, byteLen++);
- } else {
- byte b1 = (byte) (0xf0 | (0x07 & (cp >> 18)));
- byte b2 = (byte) (0x80 | (0x3f & (cp >> 12)));
- byte b3 = (byte) (0x80 | (0x3f & (cp >> 6)));
- byte b4 = (byte) (0x80 | (0x3f & cp));
- addByte(state, b1, byteLen++);
- addByte(state, b2, byteLen++);
- addByte(state, b3, byteLen++);
- addByte(state, b4, byteLen++);
- }
- }
-
- long savedK1 = state._k1;
- long savedK2 = state._k2;
- state._k1 = 0;
- state._k2 = 0;
- switch (byteLen & 15) {
- case 15:
- state._k2 ^= (long) ((byte) (savedK2 >> 48)) << 48;
- case 14:
- state._k2 ^= (long) ((byte) (savedK2 >> 40)) << 40;
- case 13:
- state._k2 ^= (long) ((byte) (savedK2 >> 32)) << 32;
- case 12:
- state._k2 ^= (long) ((byte) (savedK2 >> 24)) << 24;
- case 11:
- state._k2 ^= (long) ((byte) (savedK2 >> 16)) << 16;
- case 10:
- state._k2 ^= (long) ((byte) (savedK2 >> 8)) << 8;
- case 9:
- state._k2 ^= ((byte) savedK2);
- case 8:
- state._k1 ^= (long) ((byte) (savedK1 >> 56)) << 56;
- case 7:
- state._k1 ^= (long) ((byte) (savedK1 >> 48)) << 48;
- case 6:
- state._k1 ^= (long) ((byte) (savedK1 >> 40)) << 40;
- case 5:
- state._k1 ^= (long) ((byte) (savedK1 >> 32)) << 32;
- case 4:
- state._k1 ^= (long) ((byte) (savedK1 >> 24)) << 24;
- case 3:
- state._k1 ^= (long) ((byte) (savedK1 >> 16)) << 16;
- case 2:
- state._k1 ^= (long) ((byte) (savedK1 >> 8)) << 8;
- case 1:
- state._k1 ^= ((byte) savedK1);
- bmix(state);
- }
- // CHECKSTYLE:ON
-
- state._h2 ^= byteLen;
-
- state._h1 += state._h2;
- state._h2 += state._h1;
-
- state._h1 = fmix(state._h1);
- state._h2 = fmix(state._h2);
-
- state._h1 += state._h2;
- state._h2 += state._h1;
-
- return (int) (state._h1 >> 32);
- }
-
- private static void addByte(State state, byte b, int len) {
- int shift = (len & 0x7) * 8;
- long bb = (b & 0xffL) << shift;
- if ((len & 0x8) == 0) {
- state._k1 |= bb;
- } else {
- state._k2 |= bb;
- if ((len & 0xf) == 0xf) {
- bmix(state);
- state._k1 = 0;
- state._k2 = 0;
- }
- }
- }
-
- private static class State {
- long _h1;
- long _h2;
-
- long _k1;
- long _k2;
-
- long _c1;
- long _c2;
- }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
index a43a49f789..8251450c44 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
@@ -18,8 +18,8 @@
*/
package org.apache.pinot.segment.spi.partition;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -42,7 +42,7 @@ public class MurmurPartitionFunction implements
PartitionFunction {
@Override
public int getPartition(String value) {
- return (murmur2(value.getBytes(UTF_8)) & Integer.MAX_VALUE) %
_numPartitions;
+ return (MurmurHashFunctions.murmurHash2(value.getBytes(UTF_8)) &
Integer.MAX_VALUE) % _numPartitions;
}
@Override
@@ -60,56 +60,4 @@ public class MurmurPartitionFunction implements
PartitionFunction {
public String toString() {
return NAME;
}
-
- /**
- * NOTE: This code has been copied over from
org.apache.kafka.common.utils.Utils::murmur2
- *
- * Generates 32 bit murmur2 hash from byte array
- * @param data byte array to hash
- * @return 32 bit hash of the given array
- */
- @VisibleForTesting
- static int murmur2(final byte[] data) {
- int length = data.length;
- int seed = 0x9747b28c;
- // 'm' and 'r' are mixing constants generated offline.
- // They're not really 'magic', they just happen to work well.
- final int m = 0x5bd1e995;
- final int r = 24;
-
- // Initialize the hash to a random value
- int h = seed ^ length;
- int length4 = length / 4;
-
- for (int i = 0; i < length4; i++) {
- final int i4 = i * 4;
- int k =
- (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 +
2] & 0xff) << 16) + ((data[i4 + 3] & 0xff)
- << 24);
- k *= m;
- k ^= k >>> r;
- k *= m;
- h *= m;
- h ^= k;
- }
-
- // Handle the last few bytes of the input array
- // CHECKSTYLE:OFF
- switch (length % 4) {
- case 3:
- h ^= (data[(length & ~3) + 2] & 0xff) << 16;
- case 2:
- h ^= (data[(length & ~3) + 1] & 0xff) << 8;
- case 1:
- h ^= data[length & ~3] & 0xff;
- h *= m;
- }
- // CHECKSTYLE:ON
-
- h ^= h >>> 13;
- h *= m;
- h ^= h >>> 15;
-
- return h;
- }
}
diff --git
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
index a80f65d054..041920894b 100644
---
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
+++
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
import org.testng.annotations.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -383,15 +384,15 @@ public class PartitionFunctionTest {
}
/**
- * Tests the equivalence of org.apache.kafka.common.utils.Utils::murmur2 and
+ * Tests the equivalence of org.apache.kafka.common.utils.Utils::murmurHash2
and
* {@link MurmurPartitionFunction#getPartition}
- * Our implementation of murmur2 has been copied over from Utils::murmur2
+ * Our implementation of murmurHash2 has been copied over from
Utils::murmurHash2
*/
@Test
public void testMurmurEquivalence() {
// 10 values of size 7, were randomly generated, using {@link
Random::nextBytes} with seed 100
- // Applied org.apache.kafka.common.utils.Utils::murmur2 to those values
and stored in expectedMurmurValues
+ // Applied org.apache.kafka.common.utils.Utils::murmurHash2 to those
values and stored in expectedMurmurValues
int[] expectedMurmurValues = new int[]{
-1044832774, -594851693, 1441878663, 1766739604, 1034724141,
-296671913, 443511156, 1483601453, 1819695080,
-931669296
@@ -401,12 +402,12 @@ public class PartitionFunctionTest {
Random random = new Random(seed);
// Generate the same values as above - 10 random values of size 7, using
{@link Random::nextBytes} with seed 100
- // Apply {@link MurmurPartitionFunction::murmur2
+ // Apply {@link MurmurPartitionFunction::murmurHash2
// compare with stored results
byte[] bytes = new byte[7];
for (int expectedMurmurValue : expectedMurmurValues) {
random.nextBytes(bytes);
- assertEquals(MurmurPartitionFunction.murmur2(bytes),
expectedMurmurValue);
+ assertEquals(MurmurHashFunctions.murmurHash2(bytes),
expectedMurmurValue);
}
}
@@ -419,8 +420,8 @@ public class PartitionFunctionTest {
// 10 String values of size 7, were randomly generated, using {@link
Random::nextBytes} with seed 100
// Applied {@link MurmurPartitionFunction} initialized with 5 partitions,
by overriding
- // {@MurmurPartitionFunction::murmur2} with org
- // .apache.kafka.common.utils.Utils::murmur2
+ // {@MurmurPartitionFunction::murmurHash2} with org
+ // .apache.kafka.common.utils.Utils::murmurHash2
// stored the results in expectedPartitions
int[] expectedPartitions = new int[]{1, 4, 4, 1, 1, 2, 0, 4, 2, 3};
@@ -549,8 +550,8 @@ public class PartitionFunctionTest {
for (int expectedHashValue : expectedHashValues) {
random.nextBytes(bytes);
String nextString = new String(bytes, UTF_8);
- int actualHashValue = useX64 ?
Murmur3PartitionFunction.murmur3Hash32BitsX64(nextString, hashSeed)
- : Murmur3PartitionFunction.murmur3Hash32BitsX86(bytes, hashSeed);
+ int actualHashValue = useX64 ?
MurmurHashFunctions.murmurHash3X64Bit32(nextString, hashSeed)
+ : MurmurHashFunctions.murmurHash3X86Bit32(bytes, hashSeed);
assertEquals(actualHashValue, expectedHashValue);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/hash/MurmurHashFunctions.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/hash/MurmurHashFunctions.java
new file mode 100644
index 0000000000..276eb0f46e
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/hash/MurmurHashFunctions.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils.hash;
+
+import com.google.common.hash.Hashing;
+import java.nio.ByteBuffer;
+
+
+public class MurmurHashFunctions {
+ public static final byte INVALID_CHAR = (byte) '?';
+
+ private MurmurHashFunctions() {
+ }
+
+ /**
+ * NOTE: This code has been copied over from
org.apache.kafka.common.utils.Utils::murmurHash2
+ *
+ * Generates 32 bit murmurHash2 hash from byte array
+ * @param data byte array to hash
+ * @return 32 bit hash of the given array
+ */
+ public static int murmurHash2(final byte[] data) {
+ int length = data.length;
+ int seed = 0x9747b28c;
+ // 'm' and 'r' are mixing constants generated offline.
+ // They're not really 'magic', they just happen to work well.
+ final int m = 0x5bd1e995;
+ final int r = 24;
+
+ // Initialize the hash to a random value
+ int h = seed ^ length;
+ int length4 = length / 4;
+
+ for (int i = 0; i < length4; i++) {
+ final int i4 = i * 4;
+ int k =
+ (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 +
2] & 0xff) << 16) + ((data[i4 + 3] & 0xff)
+ << 24);
+ k *= m;
+ k ^= k >>> r;
+ k *= m;
+ h *= m;
+ h ^= k;
+ }
+
+ // Handle the last few bytes of the input array
+ // CHECKSTYLE:OFF
+ switch (length % 4) {
+ case 3:
+ h ^= (data[(length & ~3) + 2] & 0xff) << 16;
+ case 2:
+ h ^= (data[(length & ~3) + 1] & 0xff) << 8;
+ case 1:
+ h ^= data[length & ~3] & 0xff;
+ h *= m;
+ }
+ // CHECKSTYLE:ON
+
+ h ^= h >>> 13;
+ h *= m;
+ h ^= h >>> 15;
+
+ return h;
+ }
+
+ /**
+ * Implement 64-bit Murmur2 hash.
+ * @param data byte array to hash
+ * @return 64-bit hash
+ */
+ public static long murmurHash2Bit64(final byte[] data) {
+ return murmurHash2Bit64(data, data.length, 0xe17a1465);
+ }
+
+ /**
+ * Implement 64-bit Murmur2 hash.
+ * @param data byte array to hash
+ * @param length byte array length
+ * @param seed hash seed
+ * @return 64-bit hash
+ */
+ public static long murmurHash2Bit64(final byte[] data, int length, int seed)
{
+ final long m = 0xc6a4a7935bd1e995L;
+ final int r = 47;
+
+ long h = (seed & 0xffffffffL) ^ (length * m);
+
+ int length8 = length / 8;
+
+ for (int i = 0; i < length8; i++) {
+ final int i8 = i * 8;
+ long k =
+ ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) +
(((long) data[i8 + 2] & 0xff) << 16) + (
+ ((long) data[i8 + 3] & 0xff) << 24) + (((long) data[i8 + 4] &
0xff) << 32) + (((long) data[i8 + 5] & 0xff)
+ << 40) + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8
+ 7] & 0xff) << 56);
+
+ k *= m;
+ k ^= k >>> r;
+ k *= m;
+
+ h ^= k;
+ h *= m;
+ }
+
+ // CHECKSTYLE:OFF: checkstyle:coding
+ switch (length % 8) {
+ case 7:
+ h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48;
+ case 6:
+ h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40;
+ case 5:
+ h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32;
+ case 4:
+ h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24;
+ case 3:
+ h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16;
+ case 2:
+ h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8;
+ case 1:
+ h ^= data[length & ~7] & 0xff;
+ h *= m;
+ }
+ // CHECKSTYLE:ON: checkstyle:coding
+
+ h ^= h >>> r;
+ h *= m;
+ h ^= h >>> r;
+ return h;
+ }
+
+ public static int murmurHash3X86Bit32(byte[] data, int seed) {
+ return Hashing.murmur3_32_fixed(seed).hashBytes(data).asInt();
+ }
+
+ /**
+ * Taken from <a href=
+ *
"https://github.com/infinispan/infinispan/blob/main/commons/all/src/main/java/org/infinispan/commons/hash
+ * /MurmurHash3.java"
+ * >Infinispan code base</a>.
+ *
+ * MurmurHash3 implementation in Java, based on Austin Appleby's <a href=
+ * "https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp"
+ * >original in C</a>s
+ *
+ * This is an implementation of MurmurHash3 to generate 32 bit hash for x64
architecture (not part of the original
+ * Murmur3 implementations) used by Infinispan and Debezium, Removed the
parts that we don't need and formatted
+ * the code to Apache Pinot's Checkstyle.
+ *
+ * @author Patrick McFarland
+ * @see <a href="http://sites.google.com/site/murmurhash/">MurmurHash
website</a>
+ * @see <a href="http://en.wikipedia.org/wiki/MurmurHash">MurmurHash entry
on Wikipedia</a>
+ */
+ public static int murmurHash3X64Bit32(String s, int seed) {
+ State state = new State();
+
+ state._h1 = 0x9368e53c2f6af274L ^ seed;
+ state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+ state._c1 = 0x87c37b91114253d5L;
+ state._c2 = 0x4cf5ad432745937fL;
+
+ int byteLen = 0;
+ int stringLen = s.length();
+
+ // CHECKSTYLE:OFF: checkstyle:coding
+ for (int i = 0; i < stringLen; i++) {
+ char c1 = s.charAt(i);
+ int cp;
+ if (!Character.isSurrogate(c1)) {
+ cp = c1;
+ } else if (Character.isHighSurrogate(c1)) {
+ if (i + 1 < stringLen) {
+ char c2 = s.charAt(i + 1);
+ if (Character.isLowSurrogate(c2)) {
+ i++;
+ cp = Character.toCodePoint(c1, c2);
+ } else {
+ cp = INVALID_CHAR;
+ }
+ } else {
+ cp = INVALID_CHAR;
+ }
+ } else {
+ cp = INVALID_CHAR;
+ }
+
+ if (cp <= 0x7f) {
+ addByte(state, (byte) cp, byteLen++);
+ } else if (cp <= 0x07ff) {
+ byte b1 = (byte) (0xc0 | (0x1f & (cp >> 6)));
+ byte b2 = (byte) (0x80 | (0x3f & cp));
+ addByte(state, b1, byteLen++);
+ addByte(state, b2, byteLen++);
+ } else if (cp <= 0xffff) {
+ byte b1 = (byte) (0xe0 | (0x0f & (cp >> 12)));
+ byte b2 = (byte) (0x80 | (0x3f & (cp >> 6)));
+ byte b3 = (byte) (0x80 | (0x3f & cp));
+ addByte(state, b1, byteLen++);
+ addByte(state, b2, byteLen++);
+ addByte(state, b3, byteLen++);
+ } else {
+ byte b1 = (byte) (0xf0 | (0x07 & (cp >> 18)));
+ byte b2 = (byte) (0x80 | (0x3f & (cp >> 12)));
+ byte b3 = (byte) (0x80 | (0x3f & (cp >> 6)));
+ byte b4 = (byte) (0x80 | (0x3f & cp));
+ addByte(state, b1, byteLen++);
+ addByte(state, b2, byteLen++);
+ addByte(state, b3, byteLen++);
+ addByte(state, b4, byteLen++);
+ }
+ }
+
+ long savedK1 = state._k1;
+ long savedK2 = state._k2;
+ state._k1 = 0;
+ state._k2 = 0;
+ switch (byteLen & 15) {
+ case 15:
+ state._k2 ^= (long) ((byte) (savedK2 >> 48)) << 48;
+ case 14:
+ state._k2 ^= (long) ((byte) (savedK2 >> 40)) << 40;
+ case 13:
+ state._k2 ^= (long) ((byte) (savedK2 >> 32)) << 32;
+ case 12:
+ state._k2 ^= (long) ((byte) (savedK2 >> 24)) << 24;
+ case 11:
+ state._k2 ^= (long) ((byte) (savedK2 >> 16)) << 16;
+ case 10:
+ state._k2 ^= (long) ((byte) (savedK2 >> 8)) << 8;
+ case 9:
+ state._k2 ^= ((byte) savedK2);
+ case 8:
+ state._k1 ^= (long) ((byte) (savedK1 >> 56)) << 56;
+ case 7:
+ state._k1 ^= (long) ((byte) (savedK1 >> 48)) << 48;
+ case 6:
+ state._k1 ^= (long) ((byte) (savedK1 >> 40)) << 40;
+ case 5:
+ state._k1 ^= (long) ((byte) (savedK1 >> 32)) << 32;
+ case 4:
+ state._k1 ^= (long) ((byte) (savedK1 >> 24)) << 24;
+ case 3:
+ state._k1 ^= (long) ((byte) (savedK1 >> 16)) << 16;
+ case 2:
+ state._k1 ^= (long) ((byte) (savedK1 >> 8)) << 8;
+ case 1:
+ state._k1 ^= ((byte) savedK1);
+ bmix(state);
+ }
+ // CHECKSTYLE:ON: checkstyle:coding
+ state._h2 ^= byteLen;
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ state._h1 = fmix(state._h1);
+ state._h2 = fmix(state._h2);
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ return (int) (state._h1 >> 32);
+ }
+
+ /**
+ * Hash a value using the x64 128 bit variant of MurmurHash3
+ *
+ * @param key value to hash
+ * @param seed random value
+ * @return 128 bit hashed key, in an array containing two longs
+ */
+ public static byte[] murmurHash3X64Bit128(final byte[] key, final int seed) {
+ State state = new State();
+
+ state._h1 = 0x9368e53c2f6af274L ^ seed;
+ state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+ state._c1 = 0x87c37b91114253d5L;
+ state._c2 = 0x4cf5ad432745937fL;
+
+ for (int i = 0; i < key.length / 16; i++) {
+ state._k1 = getblock(key, i * 2 * 8);
+ state._k2 = getblock(key, (i * 2 + 1) * 8);
+
+ bmix(state);
+ }
+
+ state._k1 = 0;
+ state._k2 = 0;
+
+ int tail = (key.length >>> 4) << 4;
+ // CHECKSTYLE:OFF: checkstyle:coding
+ switch (key.length & 15) {
+ case 15:
+ state._k2 ^= (long) key[tail + 14] << 48;
+ case 14:
+ state._k2 ^= (long) key[tail + 13] << 40;
+ case 13:
+ state._k2 ^= (long) key[tail + 12] << 32;
+ case 12:
+ state._k2 ^= (long) key[tail + 11] << 24;
+ case 11:
+ state._k2 ^= (long) key[tail + 10] << 16;
+ case 10:
+ state._k2 ^= (long) key[tail + 9] << 8;
+ case 9:
+ state._k2 ^= key[tail + 8];
+ case 8:
+ state._k1 ^= (long) key[tail + 7] << 56;
+ case 7:
+ state._k1 ^= (long) key[tail + 6] << 48;
+ case 6:
+ state._k1 ^= (long) key[tail + 5] << 40;
+ case 5:
+ state._k1 ^= (long) key[tail + 4] << 32;
+ case 4:
+ state._k1 ^= (long) key[tail + 3] << 24;
+ case 3:
+ state._k1 ^= (long) key[tail + 2] << 16;
+ case 2:
+ state._k1 ^= (long) key[tail + 1] << 8;
+ case 1:
+ state._k1 ^= key[tail + 0];
+ bmix(state);
+ }
+ // CHECKSTYLE:ON: checkstyle:coding
+
+ state._h2 ^= key.length;
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ state._h1 = fmix(state._h1);
+ state._h2 = fmix(state._h2);
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ ByteBuffer buffer = ByteBuffer.allocate(16);
+ buffer.putLong(state._h1);
+ buffer.putLong(state._h2);
+ return buffer.array();
+ }
+
+ /**
+ * Hash a value using the x64 64 bit variant of murmurHash3
+ *
+ * @param key value to hash
+ * @param seed random value
+ * @return 64 bit hashed key
+ */
+ public static long murmurHash3X64Bit64(final byte[] key, final int seed) {
+ // Exactly the same as murmurHash3X64Bit128, except it only returns
state.h1
+ State state = new State();
+
+ state._h1 = 0x9368e53c2f6af274L ^ seed;
+ state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+ state._c1 = 0x87c37b91114253d5L;
+ state._c2 = 0x4cf5ad432745937fL;
+
+ for (int i = 0; i < key.length / 16; i++) {
+ state._k1 = getblock(key, i * 2 * 8);
+ state._k2 = getblock(key, (i * 2 + 1) * 8);
+
+ bmix(state);
+ }
+
+ state._k1 = 0;
+ state._k2 = 0;
+
+ int tail = (key.length >>> 4) << 4;
+ // CHECKSTYLE:OFF: checkstyle:coding
+ switch (key.length & 15) {
+ case 15:
+ state._k2 ^= (long) key[tail + 14] << 48;
+ case 14:
+ state._k2 ^= (long) key[tail + 13] << 40;
+ case 13:
+ state._k2 ^= (long) key[tail + 12] << 32;
+ case 12:
+ state._k2 ^= (long) key[tail + 11] << 24;
+ case 11:
+ state._k2 ^= (long) key[tail + 10] << 16;
+ case 10:
+ state._k2 ^= (long) key[tail + 9] << 8;
+ case 9:
+ state._k2 ^= key[tail + 8];
+ case 8:
+ state._k1 ^= (long) key[tail + 7] << 56;
+ case 7:
+ state._k1 ^= (long) key[tail + 6] << 48;
+ case 6:
+ state._k1 ^= (long) key[tail + 5] << 40;
+ case 5:
+ state._k1 ^= (long) key[tail + 4] << 32;
+ case 4:
+ state._k1 ^= (long) key[tail + 3] << 24;
+ case 3:
+ state._k1 ^= (long) key[tail + 2] << 16;
+ case 2:
+ state._k1 ^= (long) key[tail + 1] << 8;
+ case 1:
+ state._k1 ^= key[tail];
+ bmix(state);
+ }
+ // CHECKSTYLE:ON: checkstyle:coding
+
+ state._h2 ^= key.length;
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ state._h1 = fmix(state._h1);
+ state._h2 = fmix(state._h2);
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ return state._h1;
+ }
+
+ /**
+ * Hash a value using the x64 32 bit variant of murmurHash3
+ *
+ * @param key value to hash
+ * @param seed random value
+ * @return 32 bit hashed key
+ */
+ public static int murmurHash3X64Bit32(final byte[] key, final int seed) {
+ return (int) (murmurHash3X64Bit64(key, seed) >>> 32);
+ }
+
+ private static void addByte(State state, byte b, int len) {
+ int shift = (len & 0x7) * 8;
+ long bb = (b & 0xffL) << shift;
+ if ((len & 0x8) == 0) {
+ state._k1 |= bb;
+ } else {
+ state._k2 |= bb;
+ if ((len & 0xf) == 0xf) {
+ bmix(state);
+ state._k1 = 0;
+ state._k2 = 0;
+ }
+ }
+ }
+
+ static long getblock(byte[] key, int i) {
+ return ((key[i + 0] & 0x00000000000000FFL)) | ((key[i + 1] &
0x00000000000000FFL) << 8) | (
+ (key[i + 2] & 0x00000000000000FFL) << 16) | ((key[i + 3] &
0x00000000000000FFL) << 24) | (
+ (key[i + 4] & 0x00000000000000FFL) << 32) | ((key[i + 5] &
0x00000000000000FFL) << 40) | (
+ (key[i + 6] & 0x00000000000000FFL) << 48) | ((key[i + 7] &
0x00000000000000FFL) << 56);
+ }
+
+ private static void bmix(State state) {
+ state._k1 *= state._c1;
+ state._k1 = (state._k1 << 23) | (state._k1 >>> 64 - 23);
+ state._k1 *= state._c2;
+ state._h1 ^= state._k1;
+ state._h1 += state._h2;
+
+ state._h2 = (state._h2 << 41) | (state._h2 >>> 64 - 41);
+
+ state._k2 *= state._c2;
+ state._k2 = (state._k2 << 23) | (state._k2 >>> 64 - 23);
+ state._k2 *= state._c1;
+ state._h2 ^= state._k2;
+ state._h2 += state._h1;
+
+ state._h1 = state._h1 * 3 + 0x52dce729;
+ state._h2 = state._h2 * 3 + 0x38495ab5;
+
+ state._c1 = state._c1 * 5 + 0x7b7d159c;
+ state._c2 = state._c2 * 5 + 0x6bce6396;
+ }
+
+ private static long fmix(long k) {
+ k ^= k >>> 33;
+ k *= 0xff51afd7ed558ccdL;
+ k ^= k >>> 33;
+ k *= 0xc4ceb9fe1a85ec53L;
+ k ^= k >>> 33;
+
+ return k;
+ }
+
+ private static class State {
+ long _h1;
+ long _h2;
+
+ long _k1;
+ long _k2;
+
+ long _c1;
+ long _c2;
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 114a6a4fa6..91e35b1ece 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -31,10 +31,10 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
-import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
import org.apache.pinot.tools.Command;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.slf4j.Logger;
@@ -134,7 +134,7 @@ public class StreamAvroIntoKafkaCommand extends
AbstractBaseAdminCommand impleme
break;
}
// Write the message to Kafka
- streamDataProducer.produce(_kafkaTopic,
Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes);
+ streamDataProducer.produce(_kafkaTopic,
Longs.toByteArray(MurmurHashFunctions.murmurHash2Bit64(bytes)), bytes);
// Sleep between messages
if (sleepRequired) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]