This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 24202c0d9b5 KAFKA-17939 Make Bytes part of the public API (KIP-1247)
(#21601)
24202c0d9b5 is described below
commit 24202c0d9b5ed9af2efe81e832a48eebdc3cb7c5
Author: Siddhartha Devineni <[email protected]>
AuthorDate: Thu Mar 12 02:23:02 2026 +0100
KAFKA-17939 Make Bytes part of the public API (KIP-1247) (#21601)
### Summary
This PR implements
[KIP-1247](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1247%3A+Make+Bytes+utils+class+part+of+the+public+API)
to officially make the `Bytes` class part of Kafka's public API.
### Changes
- Add `Bytes` to Javadoc generation in `build.gradle`
- Deprecate helper methods that are not part of the intended public API:
- `increment()`
- `BYTES_LEXICO_COMPARATOR`
- `ByteArrayComparator`
- Create `org.apache.kafka.common.utils.internals.BytesUtils` with
duplicates of deprecated helpers
- Update all internal Kafka code to use `BytesUtils` instead of
deprecated methods
- Enhance Javadoc documentation for core `Bytes` methods
### Compatibility
Deprecated methods will remain functional until version 5.0 to maintain
backward compatibility for any external users who may be relying on
them.
### Testing
- All existing `BytesTest` tests pass
- Verified Javadoc generation includes `Bytes` class
- Verified all internal references compile correctly
Reviewers: Sean Quah <[email protected]>, Kuan-Po Tseng
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
build.gradle | 1 +
.../java/org/apache/kafka/common/utils/Bytes.java | 83 ++++++++++---------
.../kafka/common/utils/internals/BytesUtils.java | 93 ++++++++++++++++++++++
.../apache/kafka/common/utils/package-info.java | 14 +++-
.../org/apache/kafka/common/utils/BytesTest.java | 10 ++-
gradle/spotbugs-exclude.xml | 11 +++
.../kafka/jmh/util/BytesCompareBenchmark.java | 6 +-
.../ForeignTableJoinProcessorSupplier.java | 3 +-
.../state/internals/CachingKeyValueStore.java | 3 +-
.../state/internals/DualColumnFamilyAccessor.java | 5 +-
.../state/internals/InMemoryKeyValueStore.java | 3 +-
.../state/internals/MemoryNavigableLRUCache.java | 3 +-
.../state/internals/RocksDBRangeIterator.java | 3 +-
.../streams/state/internals/RocksDBStore.java | 7 +-
.../state/internals/SegmentedCacheFunction.java | 3 +-
.../ChangeLoggingKeyValueBytesStoreTest.java | 3 +-
.../state/internals/RocksDBRangeIteratorTest.java | 5 +-
17 files changed, 190 insertions(+), 66 deletions(-)
diff --git a/build.gradle b/build.gradle
index ca356ba66f3..d3b30113950 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2043,6 +2043,7 @@ project(':clients') {
include "**/org/apache/kafka/common/security/scram/*"
include "**/org/apache/kafka/common/security/token/delegation/*"
include "**/org/apache/kafka/common/serialization/*"
+ include "**/org/apache/kafka/common/utils/Bytes.java"
include "**/org/apache/kafka/server/authorizer/*"
include "**/org/apache/kafka/server/policy/*"
include "**/org/apache/kafka/server/quota/*"
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
index 67195fcf131..5791deec3d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -16,12 +16,22 @@
*/
package org.apache.kafka.common.utils;
+import org.apache.kafka.common.utils.internals.BytesUtils;
+
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.Objects;
/**
- * Utility class that handles immutable byte arrays.
+ * An immutable wrapper for a byte array.
+ *
+ * <p>This class provides a convenient way to work with byte arrays in Kafka
APIs,
+ * particularly in Kafka Streams state stores and serialization. It implements
+ * {@link Comparable} to enable ordering of byte arrays.
+ *
+ * <p>The class caches the hashCode for improved performance when used as keys
+ * in hash-based data structures.
*/
public class Bytes implements Comparable<Bytes> {
@@ -34,6 +44,14 @@ public class Bytes implements Comparable<Bytes> {
// cache the hash code for the string, default to 0
private int hashCode;
+ /**
+ * Creates a Bytes instance wrapping the given byte array.
+ *
+ * <p>The provided array becomes the backing storage for the object.
+ *
+ * @param bytes the byte array to wrap, or null
+ * @return a new Bytes instance, or null if the input is null
+ */
public static Bytes wrap(byte[] bytes) {
if (bytes == null)
return null;
@@ -44,9 +62,10 @@ public class Bytes implements Comparable<Bytes> {
* Create a Bytes using the byte array.
*
* @param bytes This array becomes the backing storage for the object.
+ * @throws NullPointerException if bytes is null
*/
public Bytes(byte[] bytes) {
- this.bytes = bytes;
+ this.bytes = Objects.requireNonNull(bytes, "bytes cannot be null");
// initialize hash code to 0
hashCode = 0;
@@ -94,7 +113,7 @@ public class Bytes implements Comparable<Bytes> {
@Override
public int compareTo(Bytes that) {
- return BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes);
+ return BytesUtils.BYTES_LEXICO_COMPARATOR.compare(this.bytes,
that.bytes);
}
@Override
@@ -141,62 +160,40 @@ public class Bytes implements Comparable<Bytes> {
}
/**
- * Increment the underlying byte array by adding 1. Throws an
IndexOutOfBoundsException if incrementing would cause
- * the underlying input byte array to overflow.
+ * Increment the underlying byte array by adding 1.
*
* @param input - The byte array to increment
* @return A new copy of the incremented byte array.
+ * @throws IndexOutOfBoundsException if incrementing causes the underlying
input byte array to overflow.
+ * @deprecated This method is not part of the public API and will be
removed in version 5.0.
+ * Internal Kafka code should use {@link
org.apache.kafka.common.utils.internals.BytesUtils#increment(Bytes)} instead.
*/
+ @Deprecated(since = "4.3", forRemoval = true)
public static Bytes increment(Bytes input) throws
IndexOutOfBoundsException {
- byte[] inputArr = input.get();
- byte[] ret = new byte[inputArr.length];
- int carry = 1;
- for (int i = inputArr.length - 1; i >= 0; i--) {
- if (inputArr[i] == (byte) 0xFF && carry == 1) {
- ret[i] = (byte) 0x00;
- } else {
- ret[i] = (byte) (inputArr[i] + carry);
- carry = 0;
- }
- }
- if (carry == 0) {
- return wrap(ret);
- } else {
- throw new IndexOutOfBoundsException();
- }
+ return BytesUtils.increment(input);
}
/**
- * A byte array comparator based on lexicograpic ordering.
+ * A byte array comparator based on lexicographic ordering.
+ * @deprecated This field is not part of the public API and will be
removed in version 5.0.
+ * Internal Kafka code should use {@link
org.apache.kafka.common.utils.internals.BytesUtils#BYTES_LEXICO_COMPARATOR}
instead.
*/
+ @Deprecated(since = "4.3", forRemoval = true)
public static final ByteArrayComparator BYTES_LEXICO_COMPARATOR = new
LexicographicByteArrayComparator();
+ /**
+ * A byte array comparator interface.
+ *
+ * @deprecated This interface is not part of the public API and will be
removed in version 5.0.
+ * Internal Kafka code should use {@link
org.apache.kafka.common.utils.internals.BytesUtils.ByteArrayComparator} instead.
+ */
+ @Deprecated(since = "4.3", forRemoval = true)
public interface ByteArrayComparator extends Comparator<byte[]>,
Serializable {
int compare(final byte[] buffer1, int offset1, int length1,
final byte[] buffer2, int offset2, int length2);
}
- private static class LexicographicByteArrayComparator implements
ByteArrayComparator {
-
- @Override
- public int compare(byte[] buffer1, byte[] buffer2) {
- return compare(buffer1, 0, buffer1.length, buffer2, 0,
buffer2.length);
- }
-
- public int compare(final byte[] buffer1, int offset1, int length1,
- final byte[] buffer2, int offset2, int length2) {
-
- // short circuit equal case
- if (buffer1 == buffer2 &&
- offset1 == offset2 &&
- length1 == length2) {
- return 0;
- }
-
- int end1 = offset1 + length1;
- int end2 = offset2 + length2;
- return Arrays.compareUnsigned(buffer1, offset1, end1, buffer2,
offset2, end2);
- }
+ private static class LexicographicByteArrayComparator extends
BytesUtils.LexicographicByteArrayComparator implements ByteArrayComparator {
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java
b/clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java
new file mode 100644
index 00000000000..4823905aa4b
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+/**
+ * Internal utility class for Bytes-related operations.
+ * This class is for internal Kafka use only and is not part of the public API.
+ */
+public final class BytesUtils {
+
+ private BytesUtils() {
+ // Utility class, prevent instantiation
+ }
+
+ /**
+ * Increment the underlying byte array by adding 1.
+ *
+ * @param input - The byte array to increment
+ * @return A new copy of the incremented byte array
+ * @throws IndexOutOfBoundsException if incrementing causes the underlying
input byte array to overflow
+ */
+ public static Bytes increment(Bytes input) throws
IndexOutOfBoundsException {
+ byte[] inputArr = input.get();
+ byte[] ret = new byte[inputArr.length];
+ int carry = 1;
+ for (int i = inputArr.length - 1; i >= 0; i--) {
+ if (inputArr[i] == (byte) 0xFF && carry == 1) {
+ ret[i] = (byte) 0x00;
+ } else {
+ ret[i] = (byte) (inputArr[i] + carry);
+ carry = 0;
+ }
+ }
+ if (carry == 0) {
+ return Bytes.wrap(ret);
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * A byte array comparator based on lexicographic ordering.
+ */
+ public static final ByteArrayComparator BYTES_LEXICO_COMPARATOR = new
LexicographicByteArrayComparator();
+
+ public interface ByteArrayComparator extends Comparator<byte[]> {
+
+ int compare(final byte[] buffer1, int offset1, int length1,
+ final byte[] buffer2, int offset2, int length2);
+ }
+
+ public static class LexicographicByteArrayComparator implements
ByteArrayComparator {
+
+ @Override
+ public int compare(byte[] buffer1, byte[] buffer2) {
+ return compare(buffer1, 0, buffer1.length, buffer2, 0,
buffer2.length);
+ }
+
+ public int compare(final byte[] buffer1, int offset1, int length1,
+ final byte[] buffer2, int offset2, int length2) {
+
+ // short circuit equal case
+ if (buffer1 == buffer2 &&
+ offset1 == offset2 &&
+ length1 == length2) {
+ return 0;
+ }
+
+ int end1 = offset1 + length1;
+ int end2 = offset2 + length2;
+ return Arrays.compareUnsigned(buffer1, offset1, end1, buffer2,
offset2, end2);
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/package-info.java
b/clients/src/main/java/org/apache/kafka/common/utils/package-info.java
index 36479d3402a..5883b329313 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/package-info.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/package-info.java
@@ -15,7 +15,17 @@
* limitations under the License.
*/
/**
- * Provides common utilities for Kafka server and clients.
- * <strong>This package is not a supported Kafka API; the implementation may
change without warning between minor or patch releases.</strong>
+ * Provides utilities for Kafka server and clients.
+ *
+ * <p>This package contains the public API class {@link
org.apache.kafka.common.utils.Bytes}, which was made
+ * part of the public API via KIP-1247.
+ *
+ * <p>Other classes in this package, including {@link
org.apache.kafka.common.utils.Time} and {@link
org.apache.kafka.common.utils.Timer},
+ * are currently exposed through Kafka APIs but are not yet officially
designated
+ * as public API. A future KIP will address their public API status and design.
+ *
+ * <p>The remaining classes in this package are internal utilities and not
part of
+ * the supported Kafka API; their implementation may change without warning
+ * between releases.
*/
package org.apache.kafka.common.utils;
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
index 2c8b591c8db..c68527243cd 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.utils;
+import org.apache.kafka.common.utils.internals.BytesUtils;
+
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
@@ -34,14 +36,14 @@ public class BytesTest {
public void testIncrement() {
byte[] input = new byte[]{(byte) 0xAB, (byte) 0xCD, (byte) 0xFF};
byte[] expected = new byte[]{(byte) 0xAB, (byte) 0xCE, (byte) 0x00};
- Bytes output = Bytes.increment(Bytes.wrap(input));
+ Bytes output = BytesUtils.increment(Bytes.wrap(input));
assertArrayEquals(output.get(), expected);
}
@Test
public void testIncrementUpperBoundary() {
byte[] input = new byte[]{(byte) 0xFF, (byte) 0xFF, (byte) 0xFF};
- assertThrows(IndexOutOfBoundsException.class, () ->
Bytes.increment(Bytes.wrap(input)));
+ assertThrows(IndexOutOfBoundsException.class, () ->
BytesUtils.increment(Bytes.wrap(input)));
}
@Test
@@ -64,7 +66,7 @@ public class BytesTest {
map.put(key5, val);
Bytes prefix = key1;
- Bytes prefixEnd = Bytes.increment(prefix);
+ Bytes prefixEnd = BytesUtils.increment(prefix);
Comparator<? super Bytes> comparator = map.comparator();
final int result = comparator == null ? prefix.compareTo(prefixEnd) :
comparator.compare(prefix, prefixEnd);
@@ -112,7 +114,7 @@ public class BytesTest {
}
private int cmp(String l, String r) {
- return Bytes.BYTES_LEXICO_COMPARATOR.compare(
+ return BytesUtils.BYTES_LEXICO_COMPARATOR.compare(
l.getBytes(StandardCharsets.UTF_8),
r.getBytes(StandardCharsets.UTF_8));
}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index d8ee0dfa4a2..455c9814c78 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -674,4 +674,15 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>
+ <!-- KIP-1247: Comparators not intended for serialization -->
+ <Match>
+ <Or>
+ <!-- Internal comparator is not intended for serialization -->
+ <Class
name="org.apache.kafka.common.utils.internals.BytesUtils$LexicographicByteArrayComparator"/>
+ <!-- Benchmark comparator is not intended for serialization -->
+ <Class
name="org.apache.kafka.jmh.util.BytesCompareBenchmark$HandwrittenLexicoComparator"/>
+ </Or>
+ <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE"/>
+ </Match>
+
</FindBugsFilter>
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
index c994560f69e..23889576ca1 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.jmh.util;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
@@ -45,7 +45,7 @@ public class BytesCompareBenchmark {
private byte[][] tv;
private TreeMap<byte[], Integer> oldMap = new TreeMap<>(new
HandwrittenLexicoComparator());
- private TreeMap<byte[], Integer> newMap = new
TreeMap<>(Bytes.BYTES_LEXICO_COMPARATOR);
+ private TreeMap<byte[], Integer> newMap = new
TreeMap<>(BytesUtils.BYTES_LEXICO_COMPARATOR);
@Setup
public void setup() {
@@ -74,7 +74,7 @@ public class BytesCompareBenchmark {
}
}
- static class HandwrittenLexicoComparator implements
Bytes.ByteArrayComparator {
+ static class HandwrittenLexicoComparator implements
BytesUtils.ByteArrayComparator {
@Override
public int compare(byte[] buffer1, byte[] buffer2) {
return compare(buffer1, 0, buffer1.length, buffer2, 0,
buffer2.length);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
index 99d7b22dedc..921cc13196f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
@@ -18,6 +18,7 @@ package
org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
@@ -123,7 +124,7 @@ public class ForeignTableJoinProcessorSupplier<KLeft,
KRight, VRight>
//Perform the prefixScan and propagate the results
try (final KeyValueIterator<Bytes,
ValueAndTimestamp<SubscriptionWrapper<KLeft>>> prefixScanResults =
- subscriptionStore.range(prefixBytes,
Bytes.increment(prefixBytes))) {
+ subscriptionStore.range(prefixBytes,
BytesUtils.increment(prefixBytes))) {
while (prefixScanResults.hasNext()) {
final KeyValue<Bytes,
ValueAndTimestamp<SubscriptionWrapper<KLeft>>> next = prefixScanResults.next();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 4f78d49d060..3d6efc243a0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.StateStore;
@@ -426,7 +427,7 @@ public class CachingKeyValueStore
validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator =
wrapped().prefixScan(prefix, prefixKeySerializer);
final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null,
prefix));
- final Bytes to = Bytes.increment(from);
+ final Bytes to = BytesUtils.increment(from);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
internalContext.cache().range(cacheName, from, to, false);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator,
storeIterator, true);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
index 11b4c11b511..a5f91a62db6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.state.internals.RocksDBStore.DBAccessor;
@@ -267,7 +268,7 @@ class DualColumnFamilyAccessor implements
RocksDBStore.ColumnFamilyAccessor {
// RocksDB's JNI interface does not expose getters/setters that allow
the
// comparator to be pluggable, and the default is lexicographic, so
it's
// safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
+ private final Comparator<byte[]> comparator =
BytesUtils.BYTES_LEXICO_COMPARATOR;
private final String storeName;
private final RocksIterator iterNewFormat;
@@ -397,7 +398,7 @@ class DualColumnFamilyAccessor implements
RocksDBStore.ColumnFamilyAccessor {
// RocksDB's JNI interface does not expose getters/setters that allow the
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
+ private final Comparator<byte[]> comparator =
BytesUtils.BYTES_LEXICO_COMPARATOR;
private final byte[] rawLastKey;
private final boolean forward;
private final boolean toInclusive;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 51744ad9ced..f74d8e6387a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
@@ -173,7 +174,7 @@ public class InMemoryKeyValueStore implements
KeyValueStore<Bytes, byte[]> {
public synchronized <PS extends Serializer<P>, P> KeyValueIterator<Bytes,
byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null,
prefix));
- final Bytes to = Bytes.increment(from);
+ final Bytes to = BytesUtils.increment(from);
return new InMemoryKeyValueIterator(map.subMap(from, true, to,
false).keySet(), true);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 50df0589209..1e5e102c0a1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
@@ -89,7 +90,7 @@ public class MemoryNavigableLRUCache extends MemoryLRUCache {
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix, final PS prefixKeySerializer) {
final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null,
prefix));
- final Bytes to = Bytes.increment(from);
+ final Bytes to = BytesUtils.increment(from);
final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
index 6c85518101e..a8f7c6fcf79 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.apache.kafka.streams.KeyValue;
import org.rocksdb.RocksIterator;
@@ -27,7 +28,7 @@ class RocksDBRangeIterator extends RocksDbIterator {
// RocksDB's JNI interface does not expose getters/setters that allow the
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
+ private final Comparator<byte[]> comparator =
BytesUtils.BYTES_LEXICO_COMPARATOR;
private final byte[] rawLastKey;
private final boolean forward;
private final boolean toInclusive;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index debc2078c62..f664da65e98 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -536,7 +537,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
// RocksDB's deleteRange() does not support a null upper bound so in
the event
// of overflow from increment(), the operation cannot be performed and
an
// IndexOutOfBoundsException will be thrown.
- cfAccessor.deleteRange(dbAccessor, keyFrom.get(),
Bytes.increment(keyTo).get());
+ cfAccessor.deleteRange(dbAccessor, keyFrom.get(),
BytesUtils.increment(keyTo).get());
}
@Override
@@ -1038,7 +1039,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
}
/**
- * Same as {@link Bytes#increment(Bytes)} but {@code null} is returned
instead of throwing
+ * Same as {@link BytesUtils#increment(Bytes)} but {@code null} is
returned instead of throwing
* {@code IndexOutOfBoundsException} in the event of overflow.
*
* @param input bytes to increment
@@ -1047,7 +1048,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
*/
static Bytes incrementWithoutOverflow(final Bytes input) {
try {
- return Bytes.increment(input);
+ return BytesUtils.increment(input);
} catch (final IndexOutOfBoundsException e) {
return null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
index 68a40b436e7..1d3cbea4a9f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema;
import java.nio.ByteBuffer;
@@ -77,7 +78,7 @@ class SegmentedCacheFunction implements CacheFunction {
if (segmentCompare == 0) {
final byte[] cacheKeyBytes = cacheKey.get();
final byte[] storeKeyBytes = storeKey.get();
- return Bytes.BYTES_LEXICO_COMPARATOR.compare(
+ return BytesUtils.BYTES_LEXICO_COMPARATOR.compare(
cacheKeyBytes, SEGMENT_ID_BYTES, cacheKeyBytes.length -
SEGMENT_ID_BYTES,
storeKeyBytes, 0, storeKeyBytes.length
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 02de04a5247..05d64e31ed6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
@@ -220,7 +221,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
@Test
public void shouldGetRecordsWithPrefixKey() {
store.put(hi, there);
- store.put(Bytes.increment(hi), world);
+ store.put(BytesUtils.increment(hi), world);
final List<Bytes> keys = new ArrayList<>();
final List<Bytes> values = new ArrayList<>();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
index ad52daaa8d4..8ef4046794d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -124,7 +125,7 @@ public class RocksDBRangeIteratorTest {
@Test
public void
shouldReturnAllKeysWhenLastKeyIsGreaterThanLargestKeyInStateStoreInForwardDirection()
{
- final Bytes toBytes = Bytes.increment(key4Bytes);
+ final Bytes toBytes = BytesUtils.increment(key4Bytes);
final RocksIterator rocksIterator = mock(RocksIterator.class);
doNothing().when(rocksIterator).seek(key1Bytes.get());
when(rocksIterator.isValid())
@@ -340,7 +341,7 @@ public class RocksDBRangeIteratorTest {
@Test
public void
shouldReturnTheCurrentKeyOnInvokingPeekNextKeyInReverseDirection() {
final RocksIterator rocksIterator = mock(RocksIterator.class);
- final Bytes toBytes = Bytes.increment(key4Bytes);
+ final Bytes toBytes = BytesUtils.increment(key4Bytes);
doNothing().when(rocksIterator).seekForPrev(toBytes.get());
when(rocksIterator.isValid())
.thenReturn(true)