This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 24202c0d9b5 KAFKA-17939 Make Bytes part of the public API (KIP-1247) 
(#21601)
24202c0d9b5 is described below

commit 24202c0d9b5ed9af2efe81e832a48eebdc3cb7c5
Author: Siddhartha Devineni <[email protected]>
AuthorDate: Thu Mar 12 02:23:02 2026 +0100

    KAFKA-17939 Make Bytes part of the public API (KIP-1247) (#21601)
    
    ### Summary
    This PR implements
    
    
[KIP-1247](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1247%3A+Make+Bytes+utils+class+part+of+the+public+API)
    to officially make the `Bytes` class part of Kafka's public API.
    
    ### Changes
    - Add `Bytes` to Javadoc generation in `build.gradle`
    - Deprecate helper methods that are not part of the intended public API:
      - `increment()`
      - `BYTES_LEXICO_COMPARATOR`
      - `ByteArrayComparator`
    - Create `org.apache.kafka.common.utils.internals.BytesUtils` with
    duplicates of deprecated helpers
    - Update all internal Kafka code to use `BytesUtils` instead of
    deprecated methods
    - Enhance Javadoc documentation for core `Bytes` methods
    
    ### Compatibility
    Deprecated methods will remain functional until version 5.0 to maintain
    backward compatibility for any external users who may be relying on
    them.
    
    ### Testing
    - All existing `BytesTest` tests pass
    - Verified Javadoc generation includes `Bytes` class
    - Verified all internal references compile correctly
    
    Reviewers: Sean Quah <[email protected]>, Kuan-Po Tseng
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 build.gradle                                       |  1 +
 .../java/org/apache/kafka/common/utils/Bytes.java  | 83 ++++++++++---------
 .../kafka/common/utils/internals/BytesUtils.java   | 93 ++++++++++++++++++++++
 .../apache/kafka/common/utils/package-info.java    | 14 +++-
 .../org/apache/kafka/common/utils/BytesTest.java   | 10 ++-
 gradle/spotbugs-exclude.xml                        | 11 +++
 .../kafka/jmh/util/BytesCompareBenchmark.java      |  6 +-
 .../ForeignTableJoinProcessorSupplier.java         |  3 +-
 .../state/internals/CachingKeyValueStore.java      |  3 +-
 .../state/internals/DualColumnFamilyAccessor.java  |  5 +-
 .../state/internals/InMemoryKeyValueStore.java     |  3 +-
 .../state/internals/MemoryNavigableLRUCache.java   |  3 +-
 .../state/internals/RocksDBRangeIterator.java      |  3 +-
 .../streams/state/internals/RocksDBStore.java      |  7 +-
 .../state/internals/SegmentedCacheFunction.java    |  3 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java       |  3 +-
 .../state/internals/RocksDBRangeIteratorTest.java  |  5 +-
 17 files changed, 190 insertions(+), 66 deletions(-)

diff --git a/build.gradle b/build.gradle
index ca356ba66f3..d3b30113950 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2043,6 +2043,7 @@ project(':clients') {
     include "**/org/apache/kafka/common/security/scram/*"
     include "**/org/apache/kafka/common/security/token/delegation/*"
     include "**/org/apache/kafka/common/serialization/*"
+    include "**/org/apache/kafka/common/utils/Bytes.java"
     include "**/org/apache/kafka/server/authorizer/*"
     include "**/org/apache/kafka/server/policy/*"
     include "**/org/apache/kafka/server/quota/*"
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
index 67195fcf131..5791deec3d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -16,12 +16,22 @@
  */
 package org.apache.kafka.common.utils;
 
+import org.apache.kafka.common.utils.internals.BytesUtils;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.Objects;
 
 /**
- * Utility class that handles immutable byte arrays.
+ * An immutable wrapper for a byte array.
+ *
+ * <p>This class provides a convenient way to work with byte arrays in Kafka 
APIs,
+ * particularly in Kafka Streams state stores and serialization. It implements
+ * {@link Comparable} to enable ordering of byte arrays.
+ *
+ * <p>The class caches the hashCode for improved performance when used as keys
+ * in hash-based data structures.
  */
 public class Bytes implements Comparable<Bytes> {
 
@@ -34,6 +44,14 @@ public class Bytes implements Comparable<Bytes> {
     // cache the hash code for the string, default to 0
     private int hashCode;
 
+    /**
+     * Creates a Bytes instance wrapping the given byte array.
+     *
+     * <p>The provided array becomes the backing storage for the object.
+     *
+     * @param bytes the byte array to wrap, or null
+     * @return a new Bytes instance, or null if the input is null
+     */
     public static Bytes wrap(byte[] bytes) {
         if (bytes == null)
             return null;
@@ -44,9 +62,10 @@ public class Bytes implements Comparable<Bytes> {
      * Create a Bytes using the byte array.
      *
      * @param bytes This array becomes the backing storage for the object.
+     * @throws NullPointerException if bytes is null
      */
     public Bytes(byte[] bytes) {
-        this.bytes = bytes;
+        this.bytes = Objects.requireNonNull(bytes, "bytes cannot be null");
 
         // initialize hash code to 0
         hashCode = 0;
@@ -94,7 +113,7 @@ public class Bytes implements Comparable<Bytes> {
 
     @Override
     public int compareTo(Bytes that) {
-        return BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes);
+        return BytesUtils.BYTES_LEXICO_COMPARATOR.compare(this.bytes, 
that.bytes);
     }
 
     @Override
@@ -141,62 +160,40 @@ public class Bytes implements Comparable<Bytes> {
     }
 
     /**
-     * Increment the underlying byte array by adding 1. Throws an 
IndexOutOfBoundsException if incrementing would cause
-     * the underlying input byte array to overflow.
+     * Increment the underlying byte array by adding 1.
      *
      * @param input - The byte array to increment
      * @return A new copy of the incremented byte array.
+     * @throws IndexOutOfBoundsException if incrementing causes the underlying 
input byte array to overflow.
+     * @deprecated This method is not part of the public API and will be 
removed in version 5.0.
+     *             Internal Kafka code should use {@link 
org.apache.kafka.common.utils.internals.BytesUtils#increment(Bytes)} instead.
      */
+    @Deprecated(since = "4.3", forRemoval = true)
     public static Bytes increment(Bytes input) throws 
IndexOutOfBoundsException {
-        byte[] inputArr = input.get();
-        byte[] ret = new byte[inputArr.length];
-        int carry = 1;
-        for (int i = inputArr.length - 1; i >= 0; i--) {
-            if (inputArr[i] == (byte) 0xFF && carry == 1) {
-                ret[i] = (byte) 0x00;
-            } else {
-                ret[i] = (byte) (inputArr[i] + carry);
-                carry = 0;
-            }
-        }
-        if (carry == 0) {
-            return wrap(ret);
-        } else {
-            throw new IndexOutOfBoundsException();
-        }
+        return BytesUtils.increment(input);
     }
 
     /**
-     * A byte array comparator based on lexicograpic ordering.
+     * A byte array comparator based on lexicographic ordering.
+     * @deprecated This field is not part of the public API and will be 
removed in version 5.0.
+     *             Internal Kafka code should use {@link 
org.apache.kafka.common.utils.internals.BytesUtils#BYTES_LEXICO_COMPARATOR} 
instead.
      */
+    @Deprecated(since = "4.3", forRemoval = true)
     public static final ByteArrayComparator BYTES_LEXICO_COMPARATOR = new 
LexicographicByteArrayComparator();
 
+    /**
+     * A byte array comparator interface.
+     *
+     * @deprecated This interface is not part of the public API and will be 
removed in version 5.0.
+     *             Internal Kafka code should use {@link 
org.apache.kafka.common.utils.internals.BytesUtils.ByteArrayComparator} instead.
+     */
+    @Deprecated(since = "4.3", forRemoval = true)
     public interface ByteArrayComparator extends Comparator<byte[]>, 
Serializable {
 
         int compare(final byte[] buffer1, int offset1, int length1,
                     final byte[] buffer2, int offset2, int length2);
     }
 
-    private static class LexicographicByteArrayComparator implements 
ByteArrayComparator {
-
-        @Override
-        public int compare(byte[] buffer1, byte[] buffer2) {
-            return compare(buffer1, 0, buffer1.length, buffer2, 0, 
buffer2.length);
-        }
-
-        public int compare(final byte[] buffer1, int offset1, int length1,
-                           final byte[] buffer2, int offset2, int length2) {
-
-            // short circuit equal case
-            if (buffer1 == buffer2 &&
-                    offset1 == offset2 &&
-                    length1 == length2) {
-                return 0;
-            }
-
-            int end1 = offset1 + length1;
-            int end2 = offset2 + length2;
-            return Arrays.compareUnsigned(buffer1, offset1, end1, buffer2, 
offset2, end2);
-        }
+    private static class LexicographicByteArrayComparator extends 
BytesUtils.LexicographicByteArrayComparator implements ByteArrayComparator {
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java
new file mode 100644
index 00000000000..4823905aa4b
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+/**
+ * Internal utility class for Bytes-related operations.
+ * This class is for internal Kafka use only and is not part of the public API.
+ */
+public final class BytesUtils {
+
+    private BytesUtils() {
+        // Utility class, prevent instantiation
+    }
+
+    /**
+     * Increment the underlying byte array by adding 1.
+     *
+     * @param input - The byte array to increment
+     * @return A new copy of the incremented byte array
+     * @throws IndexOutOfBoundsException if incrementing causes the underlying 
input byte array to overflow
+     */
+    public static Bytes increment(Bytes input) throws 
IndexOutOfBoundsException {
+        byte[] inputArr = input.get();
+        byte[] ret = new byte[inputArr.length];
+        int carry = 1;
+        for (int i = inputArr.length - 1; i >= 0; i--) {
+            if (inputArr[i] == (byte) 0xFF && carry == 1) {
+                ret[i] = (byte) 0x00;
+            } else {
+                ret[i] = (byte) (inputArr[i] + carry);
+                carry = 0;
+            }
+        }
+        if (carry == 0) {
+            return Bytes.wrap(ret);
+        } else {
+            throw new IndexOutOfBoundsException();
+        }
+    }
+
+    /**
+     * A byte array comparator based on lexicographic ordering.
+     */
+    public static final ByteArrayComparator BYTES_LEXICO_COMPARATOR = new 
LexicographicByteArrayComparator();
+
+    public interface ByteArrayComparator extends Comparator<byte[]> {
+
+        int compare(final byte[] buffer1, int offset1, int length1,
+                    final byte[] buffer2, int offset2, int length2);
+    }
+
+    public static class LexicographicByteArrayComparator implements 
ByteArrayComparator {
+
+        @Override
+        public int compare(byte[] buffer1, byte[] buffer2) {
+            return compare(buffer1, 0, buffer1.length, buffer2, 0, 
buffer2.length);
+        }
+
+        public int compare(final byte[] buffer1, int offset1, int length1,
+                           final byte[] buffer2, int offset2, int length2) {
+
+            // short circuit equal case
+            if (buffer1 == buffer2 &&
+                    offset1 == offset2 &&
+                    length1 == length2) {
+                return 0;
+            }
+
+            int end1 = offset1 + length1;
+            int end2 = offset2 + length2;
+            return Arrays.compareUnsigned(buffer1, offset1, end1, buffer2, 
offset2, end2);
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/package-info.java 
b/clients/src/main/java/org/apache/kafka/common/utils/package-info.java
index 36479d3402a..5883b329313 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/package-info.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/package-info.java
@@ -15,7 +15,17 @@
  * limitations under the License.
  */
 /**
- * Provides common utilities for Kafka server and clients.
- * <strong>This package is not a supported Kafka API; the implementation may 
change without warning between minor or patch releases.</strong>
+ * Provides utilities for Kafka server and clients.
+ *
+ * <p>This package contains the public API class {@link 
org.apache.kafka.common.utils.Bytes}, which was made
+ * part of the public API via KIP-1247.
+ *
+ * <p>Other classes in this package, including {@link 
org.apache.kafka.common.utils.Time} and {@link 
org.apache.kafka.common.utils.Timer},
+ * are currently exposed through Kafka APIs but are not yet officially 
designated
+ * as public API. A future KIP will address their public API status and design.
+ *
+ * <p>The remaining classes in this package are internal utilities and not 
part of
+ * the supported Kafka API; their implementation may change without warning
+ * between releases.
  */
 package org.apache.kafka.common.utils;
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
index 2c8b591c8db..c68527243cd 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.utils;
 
+import org.apache.kafka.common.utils.internals.BytesUtils;
+
 import org.junit.jupiter.api.Test;
 
 import java.nio.charset.StandardCharsets;
@@ -34,14 +36,14 @@ public class BytesTest {
     public void testIncrement() {
         byte[] input = new byte[]{(byte) 0xAB, (byte) 0xCD, (byte) 0xFF};
         byte[] expected = new byte[]{(byte) 0xAB, (byte) 0xCE, (byte) 0x00};
-        Bytes output = Bytes.increment(Bytes.wrap(input));
+        Bytes output = BytesUtils.increment(Bytes.wrap(input));
         assertArrayEquals(output.get(), expected);
     }
 
     @Test
     public void testIncrementUpperBoundary() {
         byte[] input = new byte[]{(byte) 0xFF, (byte) 0xFF, (byte) 0xFF};
-        assertThrows(IndexOutOfBoundsException.class, () -> 
Bytes.increment(Bytes.wrap(input)));
+        assertThrows(IndexOutOfBoundsException.class, () -> 
BytesUtils.increment(Bytes.wrap(input)));
     }
 
     @Test
@@ -64,7 +66,7 @@ public class BytesTest {
         map.put(key5, val);
 
         Bytes prefix = key1;
-        Bytes prefixEnd = Bytes.increment(prefix);
+        Bytes prefixEnd = BytesUtils.increment(prefix);
 
         Comparator<? super Bytes> comparator = map.comparator();
         final int result = comparator == null ? prefix.compareTo(prefixEnd) : 
comparator.compare(prefix, prefixEnd);
@@ -112,7 +114,7 @@ public class BytesTest {
     }
 
     private int cmp(String l, String r) {
-        return Bytes.BYTES_LEXICO_COMPARATOR.compare(
+        return BytesUtils.BYTES_LEXICO_COMPARATOR.compare(
             l.getBytes(StandardCharsets.UTF_8),
             r.getBytes(StandardCharsets.UTF_8));
     }
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index d8ee0dfa4a2..455c9814c78 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -674,4 +674,15 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
         <Bug pattern="SF_SWITCH_FALLTHROUGH"/>
     </Match>
 
+    <!-- KIP-1247: Comparators not intended for serialization -->
+    <Match>
+        <Or>
+            <!-- Internal comparator is not intended for serialization -->
+            <Class 
name="org.apache.kafka.common.utils.internals.BytesUtils$LexicographicByteArrayComparator"/>
+            <!-- Benchmark comparator is not intended for serialization -->
+            <Class 
name="org.apache.kafka.jmh.util.BytesCompareBenchmark$HandwrittenLexicoComparator"/>
+        </Or>
+        <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE"/>
+    </Match>
+
 </FindBugsFilter>
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
index c994560f69e..23889576ca1 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.jmh.util;
 
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Fork;
@@ -45,7 +45,7 @@ public class BytesCompareBenchmark {
 
     private byte[][] tv;
     private TreeMap<byte[], Integer> oldMap = new TreeMap<>(new 
HandwrittenLexicoComparator());
-    private TreeMap<byte[], Integer> newMap = new 
TreeMap<>(Bytes.BYTES_LEXICO_COMPARATOR);
+    private TreeMap<byte[], Integer> newMap = new 
TreeMap<>(BytesUtils.BYTES_LEXICO_COMPARATOR);
 
     @Setup
     public void setup() {
@@ -74,7 +74,7 @@ public class BytesCompareBenchmark {
         }
     }
 
-    static class HandwrittenLexicoComparator implements 
Bytes.ByteArrayComparator {
+    static class HandwrittenLexicoComparator implements 
BytesUtils.ByteArrayComparator {
         @Override
         public int compare(byte[] buffer1, byte[] buffer2) {
             return compare(buffer1, 0, buffer1.length, buffer2, 0, 
buffer2.length);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
index 99d7b22dedc..921cc13196f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
@@ -18,6 +18,7 @@ package 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
@@ -123,7 +124,7 @@ public class ForeignTableJoinProcessorSupplier<KLeft, 
KRight, VRight>
 
             //Perform the prefixScan and propagate the results
             try (final KeyValueIterator<Bytes, 
ValueAndTimestamp<SubscriptionWrapper<KLeft>>> prefixScanResults =
-                     subscriptionStore.range(prefixBytes, 
Bytes.increment(prefixBytes))) {
+                     subscriptionStore.range(prefixBytes, 
BytesUtils.increment(prefixBytes))) {
 
                 while (prefixScanResults.hasNext()) {
                     final KeyValue<Bytes, 
ValueAndTimestamp<SubscriptionWrapper<KLeft>>> next = prefixScanResults.next();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 4f78d49d060..3d6efc243a0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.StateStore;
@@ -426,7 +427,7 @@ public class CachingKeyValueStore
         validateStoreOpen();
         final KeyValueIterator<Bytes, byte[]> storeIterator = 
wrapped().prefixScan(prefix, prefixKeySerializer);
         final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, 
prefix));
-        final Bytes to = Bytes.increment(from);
+        final Bytes to = BytesUtils.increment(from);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
internalContext.cache().range(cacheName, from, to, false);
         return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator, true);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
index 11b4c11b511..a5f91a62db6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.state.internals.RocksDBStore.DBAccessor;
@@ -267,7 +268,7 @@ class DualColumnFamilyAccessor implements 
RocksDBStore.ColumnFamilyAccessor {
         // RocksDB's JNI interface does not expose getters/setters that allow 
the
         // comparator to be pluggable, and the default is lexicographic, so 
it's
         // safe to just force lexicographic comparator here for now.
-        private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
+        private final Comparator<byte[]> comparator = 
BytesUtils.BYTES_LEXICO_COMPARATOR;
 
         private final String storeName;
         private final RocksIterator iterNewFormat;
@@ -397,7 +398,7 @@ class DualColumnFamilyAccessor implements 
RocksDBStore.ColumnFamilyAccessor {
     // RocksDB's JNI interface does not expose getters/setters that allow the
     // comparator to be pluggable, and the default is lexicographic, so it's
     // safe to just force lexicographic comparator here for now.
-        private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
+        private final Comparator<byte[]> comparator = 
BytesUtils.BYTES_LEXICO_COMPARATOR;
         private final byte[] rawLastKey;
         private final boolean forward;
         private final boolean toInclusive;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 51744ad9ced..f74d8e6387a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStore;
@@ -173,7 +174,7 @@ public class InMemoryKeyValueStore implements 
KeyValueStore<Bytes, byte[]> {
     public synchronized <PS extends Serializer<P>, P> KeyValueIterator<Bytes, 
byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
 
         final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, 
prefix));
-        final Bytes to = Bytes.increment(from);
+        final Bytes to = BytesUtils.increment(from);
 
         return new InMemoryKeyValueIterator(map.subMap(from, true, to, 
false).keySet(), true);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 50df0589209..1e5e102c0a1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
@@ -89,7 +90,7 @@ public class MemoryNavigableLRUCache extends MemoryLRUCache {
     public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
 
         final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, 
prefix));
-        final Bytes to = Bytes.increment(from);
+        final Bytes to = BytesUtils.increment(from);
 
         final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
index 6c85518101e..a8f7c6fcf79 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 import org.apache.kafka.streams.KeyValue;
 
 import org.rocksdb.RocksIterator;
@@ -27,7 +28,7 @@ class RocksDBRangeIterator extends RocksDbIterator {
     // RocksDB's JNI interface does not expose getters/setters that allow the
     // comparator to be pluggable, and the default is lexicographic, so it's
     // safe to just force lexicographic comparator here for now.
-    private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
+    private final Comparator<byte[]> comparator = 
BytesUtils.BYTES_LEXICO_COMPARATOR;
     private final byte[] rawLastKey;
     private final boolean forward;
     private final boolean toInclusive;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index debc2078c62..f664da65e98 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -536,7 +537,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         // RocksDB's deleteRange() does not support a null upper bound so in 
the event
         // of overflow from increment(), the operation cannot be performed and 
an
         // IndexOutOfBoundsException will be thrown.
-        cfAccessor.deleteRange(dbAccessor, keyFrom.get(), 
Bytes.increment(keyTo).get());
+        cfAccessor.deleteRange(dbAccessor, keyFrom.get(), 
BytesUtils.increment(keyTo).get());
     }
 
     @Override
@@ -1038,7 +1039,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
     }
 
     /**
-     * Same as {@link Bytes#increment(Bytes)} but {@code null} is returned 
instead of throwing
+     * Same as {@link BytesUtils#increment(Bytes)} but {@code null} is 
returned instead of throwing
      * {@code IndexOutOfBoundsException} in the event of overflow.
      *
      * @param input bytes to increment
@@ -1047,7 +1048,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
      */
     static Bytes incrementWithoutOverflow(final Bytes input) {
         try {
-            return Bytes.increment(input);
+            return BytesUtils.increment(input);
         } catch (final IndexOutOfBoundsException e) {
             return null;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
index 68a40b436e7..1d3cbea4a9f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema;
 
 import java.nio.ByteBuffer;
@@ -77,7 +78,7 @@ class SegmentedCacheFunction implements CacheFunction {
         if (segmentCompare == 0) {
             final byte[] cacheKeyBytes = cacheKey.get();
             final byte[] storeKeyBytes = storeKey.get();
-            return Bytes.BYTES_LEXICO_COMPARATOR.compare(
+            return BytesUtils.BYTES_LEXICO_COMPARATOR.compare(
                 cacheKeyBytes, SEGMENT_ID_BYTES, cacheKeyBytes.length - 
SEGMENT_ID_BYTES,
                 storeKeyBytes, 0, storeKeyBytes.length
             );
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 02de04a5247..05d64e31ed6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
@@ -220,7 +221,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
     @Test
     public void shouldGetRecordsWithPrefixKey() {
         store.put(hi, there);
-        store.put(Bytes.increment(hi), world);
+        store.put(BytesUtils.increment(hi), world);
 
         final List<Bytes> keys = new ArrayList<>();
         final List<Bytes> values = new ArrayList<>();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
index ad52daaa8d4..8ef4046794d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.internals.BytesUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -124,7 +125,7 @@ public class RocksDBRangeIteratorTest {
 
     @Test
     public void 
shouldReturnAllKeysWhenLastKeyIsGreaterThanLargestKeyInStateStoreInForwardDirection()
 {
-        final Bytes toBytes = Bytes.increment(key4Bytes);
+        final Bytes toBytes = BytesUtils.increment(key4Bytes);
         final RocksIterator rocksIterator = mock(RocksIterator.class);
         doNothing().when(rocksIterator).seek(key1Bytes.get());
         when(rocksIterator.isValid())
@@ -340,7 +341,7 @@ public class RocksDBRangeIteratorTest {
     @Test
     public void 
shouldReturnTheCurrentKeyOnInvokingPeekNextKeyInReverseDirection() {
         final RocksIterator rocksIterator = mock(RocksIterator.class);
-        final Bytes toBytes = Bytes.increment(key4Bytes);
+        final Bytes toBytes = BytesUtils.increment(key4Bytes);
         doNothing().when(rocksIterator).seekForPrev(toBytes.get());
         when(rocksIterator.isValid())
             .thenReturn(true)

Reply via email to