This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9273cdc491d MINOR: Bytes lexicographic comparator could use compiler
builtin (#21073)
9273cdc491d is described below
commit 9273cdc491d88c008c68c280c1af7395ab89c6ef
Author: Steven Schlansker <[email protected]>
AuthorDate: Fri Dec 12 12:32:44 2025 -0800
MINOR: Bytes lexicographic comparator could use compiler builtin (#21073)
The `Arrays.compare` method has a vectorized compiler intrinsic
available and should offer better performance than a hand-rolled loop.
The JDK implementation is at least able to compare 8 bytes at a time
instead of 1 byte at a time, and avoids repeated bounds-checking.
This operation is performance-sensitive if you use in-memory stores,
since it is used to navigate the TreeMap backing the store.
Reviewers: Sean Quah <[email protected]>, Bill Bejeck
<[email protected]>
---
.../java/org/apache/kafka/common/utils/Bytes.java | 10 +-
.../org/apache/kafka/common/utils/BytesTest.java | 35 +++++++
.../kafka/jmh/util/BytesCompareBenchmark.java | 105 +++++++++++++++++++++
3 files changed, 141 insertions(+), 9 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
index 5955312dbb0..67195fcf131 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -194,17 +194,9 @@ public class Bytes implements Comparable<Bytes> {
return 0;
}
- // similar to Arrays.compare() but considers offset and length
int end1 = offset1 + length1;
int end2 = offset2 + length2;
- for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++)
{
- int a = buffer1[i] & 0xff;
- int b = buffer2[j] & 0xff;
- if (a != b) {
- return a - b;
- }
- }
- return length1 - length2;
+ return Arrays.compareUnsigned(buffer1, offset1, end1, buffer2,
offset2, end2);
}
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
index 8bf8d872262..2c8b591c8db 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.utils;
import org.junit.jupiter.api.Test;
+import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.NavigableMap;
import java.util.TreeMap;
@@ -25,6 +26,7 @@ import java.util.TreeMap;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class BytesTest {
@@ -81,4 +83,37 @@ public class BytesTest {
assertEquals(subMapExpected.keySet(), subMapResults.keySet());
}
+
+ @Test
+ public void testBytesLexicographicCases() {
+ assertEquals(0, cmp("", ""));
+ assertTrue(cmp("", "aaa") < 0);
+ assertTrue(cmp("aaa", "") > 0);
+
+ assertEquals(0, cmp("aaa", "aaa"));
+ assertTrue(cmp("aaa", "bbb") < 0);
+ assertTrue(cmp("bbb", "aaa") > 0);
+
+ assertTrue(cmp("aaaaaa", "bbb") < 0);
+ assertTrue(cmp("aaa", "bbbbbb") < 0);
+ assertTrue(cmp("bbbbbb", "aaa") > 0);
+ assertTrue(cmp("bbb", "aaaaaa") > 0);
+
+ assertTrue(cmp("common_prefix_aaa", "common_prefix_bbb") < 0);
+ assertTrue(cmp("common_prefix_bbb", "common_prefix_aaa") > 0);
+
+ assertTrue(cmp("common_prefix_aaaaaa", "common_prefix_bbb") < 0);
+ assertTrue(cmp("common_prefix_aaa", "common_prefix_bbbbbb") < 0);
+ assertTrue(cmp("common_prefix_bbbbbb", "common_prefix_aaa") > 0);
+ assertTrue(cmp("common_prefix_bbb", "common_prefix_aaaaaa") > 0);
+
+ assertTrue(cmp("common_prefix", "common_prefix_aaa") < 0);
+ assertTrue(cmp("common_prefix_aaa", "common_prefix") > 0);
+ }
+
+ private int cmp(String l, String r) {
+ return Bytes.BYTES_LEXICO_COMPARATOR.compare(
+ l.getBytes(StandardCharsets.UTF_8),
+ r.getBytes(StandardCharsets.UTF_8));
+ }
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
new file mode 100644
index 00000000000..c994560f69e
--- /dev/null
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(2)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 10, time = 1)
+public class BytesCompareBenchmark {
+ private static final int TREE_SIZE = 10240;
+
+ @Param({"8", "16", "32", "128", "1024"})
+ private int bytes;
+
+ private byte[][] tv;
+ private TreeMap<byte[], Integer> oldMap = new TreeMap<>(new
HandwrittenLexicoComparator());
+ private TreeMap<byte[], Integer> newMap = new
TreeMap<>(Bytes.BYTES_LEXICO_COMPARATOR);
+
+ @Setup
+ public void setup() {
+ tv = new byte[TREE_SIZE][bytes];
+ for (int i = 0; i < TREE_SIZE; i++) {
+ tv[i][bytes - 4] = (byte) (i >>> 24);
+ tv[i][bytes - 3] = (byte) (i >>> 16);
+ tv[i][bytes - 2] = (byte) (i >>> 8);
+ tv[i][bytes - 1] = (byte) i;
+ oldMap.put(tv[i], i);
+ newMap.put(tv[i], i);
+ }
+ }
+
+ @Benchmark
+ public void samePrefixLexicoCustom(Blackhole bh) {
+ for (int i = 0; i < TREE_SIZE; i++) {
+ bh.consume(oldMap.get(tv[i]));
+ }
+ }
+
+ @Benchmark
+ public void samePrefixLexicoJdk(Blackhole bh) {
+ for (int i = 0; i < TREE_SIZE; i++) {
+ bh.consume(newMap.get(tv[i]));
+ }
+ }
+
+ static class HandwrittenLexicoComparator implements
Bytes.ByteArrayComparator {
+ @Override
+ public int compare(byte[] buffer1, byte[] buffer2) {
+ return compare(buffer1, 0, buffer1.length, buffer2, 0,
buffer2.length);
+ }
+
+ public int compare(final byte[] buffer1, int offset1, int length1,
+ final byte[] buffer2, int offset2, int length2) {
+
+ // short circuit equal case
+ if (buffer1 == buffer2 &&
+ offset1 == offset2 &&
+ length1 == length2) {
+ return 0;
+ }
+
+ int end1 = offset1 + length1;
+ int end2 = offset2 + length2;
+ for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++)
{
+ int a = buffer1[i] & 0xff;
+ int b = buffer2[j] & 0xff;
+ if (a != b) {
+ return a - b;
+ }
+ }
+ return length1 - length2;
+ }
+ }
+}