This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc720d33a03 MINOR: remove get prefix for internal state methods 
(#17053)
fc720d33a03 is described below

commit fc720d33a03e5b40a31b82c4baddb8a23b6e8f9d
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sat Aug 31 05:02:06 2024 -0700

    MINOR: remove get prefix for internal state methods (#17053)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kstream/internals/KStreamKStreamJoin.java      |  6 +--
 .../internals/KStreamKStreamJoinLeftSide.java      |  4 +-
 .../internals/KStreamKStreamJoinRightSide.java     |  4 +-
 .../internals/AbstractReadWriteDecorator.java      |  2 +-
 .../processor/internals/DefaultStateUpdater.java   |  4 +-
 .../internals/GlobalProcessorContextImpl.java      |  6 +--
 .../processor/internals/ProcessorContextImpl.java  |  4 +-
 ...stractDualSchemaRocksDBSegmentedBytesStore.java |  4 +-
 .../AbstractRocksDBSegmentedBytesStore.java        |  6 +--
 .../streams/state/internals/AbstractSegments.java  |  2 +-
 .../streams/state/internals/LeftOrRightValue.java  |  4 +-
 .../internals/LeftOrRightValueSerializer.java      |  8 ++--
 .../state/internals/LogicalKeyValueSegment.java    | 14 +++----
 .../state/internals/LogicalKeyValueSegments.java   |  2 +-
 .../state/internals/LogicalSegmentIterator.java    |  6 +--
 .../state/internals/MeteredKeyValueStore.java      |  5 +--
 .../state/internals/MeteredSessionStore.java       |  2 +-
 .../internals/MeteredTimestampedKeyValueStore.java |  9 ++--
 .../internals/MeteredVersionedKeyValueStore.java   |  2 +-
 .../state/internals/MeteredWindowStore.java        |  5 +--
 .../streams/state/internals/RocksDBStore.java      |  2 +-
 .../state/internals/RocksDBVersionedStore.java     | 48 +++++++++++-----------
 .../RocksDBVersionedStoreRestoreWriteBuffer.java   | 10 ++---
 ...RocksDBVersionedStoreSegmentValueFormatter.java |  8 ++--
 .../kafka/streams/state/internals/Segments.java    |  2 +-
 .../streams/state/internals/StoreQueryUtils.java   |  4 +-
 .../state/internals/TimestampedKeyAndJoinSide.java |  4 +-
 .../TimestampedKeyAndJoinSideSerializer.java       |  4 +-
 .../internals/StreamsMetadataStateTest.java        |  1 -
 .../state/internals/KeyValueSegmentsTest.java      | 14 +++----
 .../internals/LogicalKeyValueSegmentsTest.java     | 10 ++---
 ...sDBVersionedStoreSegmentValueFormatterTest.java |  4 +-
 .../state/internals/TimestampedSegmentsTest.java   | 14 +++----
 33 files changed, 110 insertions(+), 114 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 745f7038257..9ba8f316271 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -207,7 +207,7 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, 
VThis, VOther> impleme
                 while (it.hasNext()) {
                     final KeyValue<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<VLeft, VRight>> nextKeyValue = it.next();
                     final TimestampedKeyAndJoinSide<K> 
timestampedKeyAndJoinSide = nextKeyValue.key;
-                    sharedTimeTracker.minTime = 
timestampedKeyAndJoinSide.getTimestamp();
+                    sharedTimeTracker.minTime = 
timestampedKeyAndJoinSide.timestamp();
                     if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
                         // if windows are open for both joinSides we can break 
since there are no more candidates to emit
                         break;
@@ -250,8 +250,8 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, 
VThis, VOther> impleme
         private void forwardNonJoinedOuterRecords(final Record<K, VThis> 
record,
                                                   final 
TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide,
                                                   final 
LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
-            final K key = timestampedKeyAndJoinSide.getKey();
-            final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
+            final K key = timestampedKeyAndJoinSide.key();
+            final long timestamp = timestampedKeyAndJoinSide.timestamp();
             final VThis thisValue = getThisValue(leftOrRightValue);
             final VOther otherValue = getOtherValue(leftOrRightValue);
             final VOut nullJoinedValue = joiner.apply(key, thisValue, 
otherValue);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
index 3b4ee5b3393..df03e39b8c5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
@@ -60,12 +60,12 @@ class KStreamKStreamJoinLeftSide<K, VLeft, VRight, VOut> 
extends KStreamKStreamJ
 
         @Override
         public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
-            return leftOrRightValue.getLeftValue();
+            return leftOrRightValue.leftValue();
         }
 
         @Override
         public VRight getOtherValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
-            return leftOrRightValue.getRightValue();
+            return leftOrRightValue.rightValue();
         }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
index e4bcfe4e105..ec29d5f12b8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
@@ -59,12 +59,12 @@ class KStreamKStreamJoinRightSide<K, VLeft, VRight, VOut> 
extends KStreamKStream
 
         @Override
         public VRight getThisValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
-            return leftOrRightValue.getRightValue();
+            return leftOrRightValue.rightValue();
         }
 
         @Override
         public VLeft getOtherValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
-            return leftOrRightValue.getLeftValue();
+            return leftOrRightValue.leftValue();
         }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 11433af42cf..353c26ec253 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -53,7 +53,7 @@ abstract class AbstractReadWriteDecorator<T extends 
StateStore, K, V> extends Wr
         throw new UnsupportedOperationException(ERROR_MESSAGE);
     }
 
-    static StateStore readWriteStore(final StateStore store) {
+    static StateStore wrapWithReadWriteStore(final StateStore store) {
         if (store instanceof TimestampedKeyValueStore) {
             return new 
TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) 
store);
         } else if (store instanceof VersionedKeyValueStore) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 2e4bb85db7c..addef5a9f15 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -200,7 +200,7 @@ public class DefaultStateUpdater implements StateUpdater {
         private void performActionsOnTasks() {
             tasksAndActionsLock.lock();
             try {
-                for (final TaskAndAction taskAndAction : getTasksAndActions()) 
{
+                for (final TaskAndAction taskAndAction : tasksAndActions()) {
                     final Action action = taskAndAction.action();
                     switch (action) {
                         case ADD:
@@ -458,7 +458,7 @@ public class DefaultStateUpdater implements StateUpdater {
             changelogReader.clear();
         }
 
-        private List<TaskAndAction> getTasksAndActions() {
+        private List<TaskAndAction> tasksAndActions() {
             final List<TaskAndAction> tasksAndActionsToProcess = new 
ArrayList<>(tasksAndActions);
             tasksAndActions.clear();
             return tasksAndActionsToProcess;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 74148cd21b1..828ae3a0a79 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -34,7 +34,7 @@ import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
 
 import java.time.Duration;
 
-import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.readWriteStore;
+import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;
 
 public class GlobalProcessorContextImpl extends 
AbstractProcessorContext<Object, Object> {
 
@@ -60,7 +60,7 @@ public class GlobalProcessorContextImpl extends 
AbstractProcessorContext<Object,
     @Override
     public <S extends StateStore> S getStateStore(final String name) {
         final StateStore store = stateManager.globalStore(name);
-        return (S) readWriteStore(store);
+        return (S) wrapWithReadWriteStore(store);
     }
 
     @SuppressWarnings("unchecked")
@@ -156,4 +156,4 @@ public class GlobalProcessorContextImpl extends 
AbstractProcessorContext<Object,
     public void registerCacheFlushListener(final String namespace, final 
DirtyEntryFlushListener listener) {
         cache.addDirtyEntryFlushListener(namespace, listener);
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 91ee78b1246..dc946ba3b6c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -47,7 +47,7 @@ import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTEN
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
-import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.readWriteStore;
+import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;
 
 public class ProcessorContextImpl extends AbstractProcessorContext<Object, 
Object> implements RecordCollector.Supplier {
     // the below are null for standby tasks
@@ -182,7 +182,7 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext<Object, Objec
         }
 
         final StateStore store = stateManager.store(name);
-        return (S) readWriteStore(store);
+        return (S) wrapWithReadWriteStore(store);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index f2d789be2b1..e55d8452fae 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -113,7 +113,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
     public void remove(final Bytes rawBaseKey) {
         final long timestamp = baseKeySchema.segmentTimestamp(rawBaseKey);
         observedStreamTime = Math.max(observedStreamTime, timestamp);
-        final S segment = segments.getSegmentForTimestamp(timestamp);
+        final S segment = segments.segmentForTimestamp(timestamp);
         if (segment == null) {
             return;
         }
@@ -227,7 +227,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
             }
         }
 
-        final S segment = segments.getSegmentForTimestamp(timestampFromRawKey);
+        final S segment = segments.segmentForTimestamp(timestampFromRawKey);
         if (segment == null) {
             return null;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 9aee1de871e..f5b4366ae98 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -239,7 +239,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
     public void remove(final Bytes key) {
         final long timestamp = keySchema.segmentTimestamp(key);
         observedStreamTime = Math.max(observedStreamTime, timestamp);
-        final S segment = segments.getSegmentForTimestamp(timestamp);
+        final S segment = segments.segmentForTimestamp(timestamp);
         if (segment == null) {
             return;
         }
@@ -249,7 +249,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
     @Override
     public void remove(final Bytes key, final long timestamp) {
         final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key, 
timestamp);
-        final S segment = segments.getSegmentForTimestamp(timestamp);
+        final S segment = segments.segmentForTimestamp(timestamp);
         if (segment != null) {
             segment.deleteRange(keyBytes, keyBytes);
         }
@@ -281,7 +281,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
                     key.toString(), timestampFromKey, observedStreamTime - 
retentionPeriod + 1);
             return null;
         }
-        final S segment = segments.getSegmentForTimestamp(timestampFromKey);
+        final S segment = segments.segmentForTimestamp(timestampFromKey);
         if (segment == null) {
             return null;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index 24020e9d2c5..5611fe99d24 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -75,7 +75,7 @@ abstract class AbstractSegments<S extends Segment> implements 
Segments<S> {
     }
 
     @Override
-    public S getSegmentForTimestamp(final long timestamp) {
+    public S segmentForTimestamp(final long timestamp) {
         return segments.get(segmentId(timestamp));
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
index 5f5e0ab7dd7..869a524bee2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
@@ -63,11 +63,11 @@ public class LeftOrRightValue<V1, V2> {
         return new LeftOrRightValue<>(null, rightValue);
     }
 
-    public V1 getLeftValue() {
+    public V1 leftValue() {
         return leftValue;
     }
 
-    public V2 getRightValue() {
+    public V2 rightValue() {
         return rightValue;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
index 8f3c47d14d1..1c64c29fd5a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
@@ -65,9 +65,9 @@ public class LeftOrRightValueSerializer<V1, V2> implements 
WrappingNullableSeria
             return null;
         }
 
-        final byte[] rawValue = (data.getLeftValue() != null)
-            ? leftSerializer.serialize(topic, data.getLeftValue())
-            : rightSerializer.serialize(topic, data.getRightValue());
+        final byte[] rawValue = (data.leftValue() != null)
+            ? leftSerializer.serialize(topic, data.leftValue())
+            : rightSerializer.serialize(topic, data.rightValue());
 
         if (rawValue == null) {
             return null;
@@ -75,7 +75,7 @@ public class LeftOrRightValueSerializer<V1, V2> implements 
WrappingNullableSeria
 
         return ByteBuffer
             .allocate(1 + rawValue.length)
-            .put((byte) (data.getLeftValue() != null ? 1 : 0))
+            .put((byte) (data.leftValue() != null ? 1 : 0))
             .put(rawValue)
             .array();
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
index 77e788d879f..6b9cd747de2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
@@ -89,7 +89,7 @@ class LogicalKeyValueSegment implements 
Comparable<LogicalKeyValueSegment>, Segm
                 + "an entire store is closed, via the close() method rather 
than destroy().");
         }
 
-        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+        final Bytes keyPrefix = prefixKeyFormatter.prefix();
 
         // this deleteRange() call deletes all entries with the given prefix, 
because the
         // deleteRange() implementation calls Bytes.increment() in order to 
make keyTo inclusive
@@ -192,8 +192,8 @@ class LogicalKeyValueSegment implements 
Comparable<LogicalKeyValueSegment>, Segm
         }
     }
 
-    public Snapshot getSnapshot() {
-        return physicalStore.getSnapshot();
+    public Snapshot snapshot() {
+        return physicalStore.snapshot();
     }
 
     public void releaseSnapshot(final Snapshot snapshot) {
@@ -204,14 +204,14 @@ class LogicalKeyValueSegment implements 
Comparable<LogicalKeyValueSegment>, Segm
     public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes 
from, final Bytes to) {
         // from bound is inclusive. if the provided bound is null, replace 
with prefix
         final Bytes fromBound = from == null
-            ? prefixKeyFormatter.getPrefix()
+            ? prefixKeyFormatter.prefix()
             : prefixKeyFormatter.addPrefix(from);
         // to bound is inclusive. if the provided bound is null, replace with 
the next prefix.
         // this requires potentially filtering out the element corresponding 
to the next prefix
         // with empty bytes from the returned iterator. this filtering is 
accomplished by
         // passing the prefix filter into 
StrippedPrefixKeyValueIteratorAdapter().
         final Bytes toBound = to == null
-            ? incrementWithoutOverflow(prefixKeyFormatter.getPrefix())
+            ? incrementWithoutOverflow(prefixKeyFormatter.prefix())
             : prefixKeyFormatter.addPrefix(to);
         final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = 
physicalStore.range(
             fromBound,
@@ -226,7 +226,7 @@ class LogicalKeyValueSegment implements 
Comparable<LogicalKeyValueSegment>, Segm
     @Override
     public synchronized KeyValueIterator<Bytes, byte[]> all() {
         final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = 
physicalStore.prefixScan(
-            prefixKeyFormatter.getPrefix(),
+            prefixKeyFormatter.prefix(),
             new BytesSerializer(),
             openIterators);
         return new StrippedPrefixKeyValueIteratorAdapter(
@@ -288,7 +288,7 @@ class LogicalKeyValueSegment implements 
Comparable<LogicalKeyValueSegment>, Segm
             return rawKey;
         }
 
-        Bytes getPrefix() {
+        Bytes prefix() {
             return Bytes.wrap(prefix);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index 85985f9d373..bcbeb4689b3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -29,7 +29,7 @@ import java.util.Map;
  * Regular segments with {@code segmentId >= 0} expire according to the 
specified
  * retention period. "Reserved" segments with {@code segmentId < 0} do not 
expire
  * and are completely separate from regular segments in that methods such as
- * {@link #getSegmentForTimestamp(long)}, {@link #getOrCreateSegment(long, 
ProcessorContext)},
+ * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, 
ProcessorContext)},
  * {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)},
  * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)}
  * only return regular segments and not reserved segments. The methods {@link 
#flush()}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java
index 3acdeac2273..d31d9942af6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java
@@ -95,16 +95,16 @@ public class LogicalSegmentIterator implements 
VersionedRecordIterator {
                 // fact all use the same physical RocksDB under-the-hood.
                 this.snapshotOwner = segment;
                 // take a RocksDB snapshot to return the segments content at 
the query time (in order to guarantee consistency)
-                this.snapshot = snapshotOwner.getSnapshot();
+                this.snapshot = snapshotOwner.snapshot();
             }
 
             final byte[] rawSegmentValue = segment.get(key, snapshot);
             if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
                 if (segment.id() == -1) { // this is the latestValueStore
-                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
+                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.timestamp(rawSegmentValue);
                     if (recordTimestamp <= toTime) {
                         // latest value satisfies timestamp bound
-                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
+                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.value(rawSegmentValue),
 recordTimestamp));
                     }
                 } else {
                     // this segment contains records with the specified key 
and time range
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index ba3829cda56..6816910758e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -61,7 +61,6 @@ import java.util.function.Function;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static 
org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
 
 /**
  * A Metered {@link KeyValueStore} wrapper that is used for recording 
operation metrics, and hence its
@@ -263,7 +262,7 @@ public class MeteredKeyValueStore<K, V>
             final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
                 iterator,
                 getSensor,
-                getDeserializeValue(serdes, wrapped())
+                StoreQueryUtils.deserializeValue(serdes, wrapped())
             );
             final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
                 InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
@@ -289,7 +288,7 @@ public class MeteredKeyValueStore<K, V>
         final QueryResult<byte[]> rawResult =
             wrapped().query(rawKeyQuery, positionBound, config);
         if (rawResult.isSuccess()) {
-            final Function<byte[], V> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final Function<byte[], V> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
             final V value = deserializer.apply(rawResult.getResult());
             final QueryResult<V> typedQueryResult =
                 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 376d74e7b0d..ad20af61bda 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -455,7 +455,7 @@ public class MeteredSessionStore<K, V>
                         iteratorDurationSensor,
                         streamsMetrics,
                         serdes::keyFrom,
-                        StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
+                        StoreQueryUtils.deserializeValue(serdes, wrapped()),
                         time,
                         numOpenIterators,
                         openIterators
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 61e6533fb8d..6f90ef56d86 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -44,7 +44,6 @@ import java.util.function.Function;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static 
org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
 
 /**
  * A Metered {@link TimestampedKeyValueStore} wrapper that is used for 
recording operation metrics, and hence its
@@ -186,7 +185,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
         final QueryResult<byte[]> rawResult =
                 wrapped().query(rawKeyQuery, positionBound, config);
         if (rawResult.isSuccess()) {
-            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
             final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
             final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
                     
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
valueAndTimestamp);
@@ -224,7 +223,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
             final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = 
(KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreIterator(
                     iterator,
                     getSensor,
-                    getDeserializeValue(serdes, wrapped()),
+                    StoreQueryUtils.deserializeValue(serdes, wrapped()),
                     false
             );
             final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> 
typedQueryResult =
@@ -251,7 +250,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
         final QueryResult<byte[]> rawResult =
                 wrapped().query(rawKeyQuery, positionBound, config);
         if (rawResult.isSuccess()) {
-            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
             final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
             final V plainValue = valueAndTimestamp == null ? null : 
valueAndTimestamp.value();
             final QueryResult<V> typedQueryResult =
@@ -290,7 +289,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
             final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreIterator(
                 iterator,
                 getSensor,
-                getDeserializeValue(serdes, wrapped()),
+                StoreQueryUtils.deserializeValue(serdes, wrapped()),
                 true
             );
             final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index b331bb6ac8f..acdc3796646 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -267,7 +267,7 @@ public class MeteredVersionedKeyValueStore<K, V>
                             rawResult.getResult(),
                             iteratorDurationSensor,
                             time,
-                            
StoreQueryUtils.getDeserializeValue(plainValueSerdes),
+                            StoreQueryUtils.deserializeValue(plainValueSerdes),
                             numOpenIterators,
                             openIterators
                         );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index b2a84ce002d..05d423a985c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -56,7 +56,6 @@ import java.util.concurrent.atomic.LongAdder;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static 
org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
 
 public class MeteredWindowStore<K, V>
     extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
@@ -417,7 +416,7 @@ public class MeteredWindowStore<K, V>
                         iteratorDurationSensor,
                         streamsMetrics,
                         serdes::keyFrom,
-                        getDeserializeValue(serdes, wrapped()),
+                        StoreQueryUtils.deserializeValue(serdes, wrapped()),
                         time,
                         numOpenIterators,
                         openIterators
@@ -469,7 +468,7 @@ public class MeteredWindowStore<K, V>
                     fetchSensor,
                     iteratorDurationSensor,
                     streamsMetrics,
-                    getDeserializeValue(serdes, wrapped()),
+                    StoreQueryUtils.deserializeValue(serdes, wrapped()),
                     time,
                     numOpenIterators,
                     openIterators
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index b643fe1c7a7..52c193c86e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -365,7 +365,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         }
     }
 
-    public Snapshot getSnapshot() {
+    public Snapshot snapshot() {
         return db.getSnapshot();
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index 21c56a6328f..eaaed6f30e3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -191,8 +191,8 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
         if (rawLatestValueAndTimestamp != null) {
             return new VersionedRecord<>(
-                LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
-                LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp)
+                LatestValueFormatter.value(rawLatestValueAndTimestamp),
+                LatestValueFormatter.timestamp(rawLatestValueAndTimestamp)
             );
         } else {
             return null;
@@ -210,12 +210,12 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
             // still be returned (i.e., the latest record version per key 
never expires).
             final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
             if (rawLatestValueAndTimestamp != null) {
-                final long latestTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                final long latestTimestamp = 
LatestValueFormatter.timestamp(rawLatestValueAndTimestamp);
                 if (latestTimestamp <= asOfTimestamp) {
                     // latest value satisfies timestamp bound
                     return new VersionedRecord<>(
-                            
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
-                            latestTimestamp
+                        LatestValueFormatter.value(rawLatestValueAndTimestamp),
+                        latestTimestamp
                     );
                 }
             }
@@ -230,9 +230,9 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         // first check the latest value store
         final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
         if (rawLatestValueAndTimestamp != null) {
-            final long latestTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+            final long latestTimestamp = 
LatestValueFormatter.timestamp(rawLatestValueAndTimestamp);
             if (latestTimestamp <= asOfTimestamp) {
-                return new 
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), 
latestTimestamp);
+                return new 
VersionedRecord<>(LatestValueFormatter.value(rawLatestValueAndTimestamp), 
latestTimestamp);
             }
         }
 
@@ -241,14 +241,14 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         for (final LogicalKeyValueSegment segment : segments) {
             final byte[] rawSegmentValue = segment.get(key);
             if (rawSegmentValue != null) {
-                final long nextTs = 
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue);
+                final long nextTs = 
RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue);
                 if (nextTs <= asOfTimestamp) {
                     // this segment contains no data for the queried 
timestamp, so earlier segments
                     // cannot either
                     return null;
                 }
 
-                if 
(RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue) > 
asOfTimestamp) {
+                if 
(RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(rawSegmentValue) > 
asOfTimestamp) {
                     // the segment only contains data for after the queried 
timestamp. skip and
                     // continue the search to earlier segments. as an 
optimization, this code
                     // could be updated to skip forward to the segment 
containing the minTimestamp
@@ -474,7 +474,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         /**
          * @return the contents of the latest value store, for the given key
          */
-        byte[] getLatestValue(Bytes key);
+        byte[] latestValue(Bytes key);
 
         /**
          * Puts the provided key and value into the latest value store.
@@ -496,7 +496,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
          * timestamp bound, in reverse order by segment id (and time), i.e., 
such that
          * the most recent segment is first
          */
-        List<T> getReverseSegments(long timestampFrom);
+        List<T> reversedSegments(long timestampFrom);
 
         /**
          * @return the segment id associated with the provided timestamp
@@ -510,7 +510,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
     class RocksDBVersionedStoreClient implements 
VersionedStoreClient<LogicalKeyValueSegment> {
 
         @Override
-        public byte[] getLatestValue(final Bytes key) {
+        public byte[] latestValue(final Bytes key) {
             return latestValueStore.get(key);
         }
 
@@ -530,7 +530,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         }
 
         @Override
-        public List<LogicalKeyValueSegment> getReverseSegments(final long 
timestampFrom) {
+        public List<LogicalKeyValueSegment> reversedSegments(final long 
timestampFrom) {
             return segmentStores.segments(timestampFrom, Long.MAX_VALUE, 
false);
         }
 
@@ -668,9 +668,9 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         // that the segment should be inserted into the latest value store.
         long foundTs = SENTINEL_TIMESTAMP;
 
-        final byte[] rawLatestValueAndTimestamp = 
versionedStoreClient.getLatestValue(key);
+        final byte[] rawLatestValueAndTimestamp = 
versionedStoreClient.latestValue(key);
         if (rawLatestValueAndTimestamp != null) {
-            final long latestValueStoreTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+            final long latestValueStoreTimestamp = 
LatestValueFormatter.timestamp(rawLatestValueAndTimestamp);
             if (timestamp >= latestValueStoreTimestamp) {
                 // new record belongs in the latest value store
                 if (timestamp > latestValueStoreTimestamp) {
@@ -692,7 +692,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
                     // is expired.) so, there is nothing to do for this step 
if `segment == null`,
                     // but we do still update the latest value store with the 
new record below.
                     if (segment != null) {
-                        final byte[] rawValueToMove = 
LatestValueFormatter.getValue(rawLatestValueAndTimestamp);
+                        final byte[] rawValueToMove = 
LatestValueFormatter.value(rawLatestValueAndTimestamp);
                         final byte[] rawSegmentValue = segment.get(key);
                         if (rawSegmentValue == null) {
                             segment.put(
@@ -734,11 +734,11 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         // initialize with current foundTs value
         long foundTs = prevFoundTs;
 
-        final List<T> segments = 
versionedStoreClient.getReverseSegments(timestamp);
+        final List<T> segments = 
versionedStoreClient.reversedSegments(timestamp);
         for (final T segment : segments) {
             final byte[] rawSegmentValue = segment.get(key);
             if (rawSegmentValue != null) {
-                final long foundNextTs = 
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue);
+                final long foundNextTs = 
RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue);
                 if (foundNextTs <= timestamp) {
                     // this segment (and all earlier segments) does not 
contain records affected by
                     // this put. insert into the segment specified by foundTs 
(i.e., the next
@@ -746,7 +746,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
                     return new PutStatus(false, foundTs);
                 }
 
-                final long foundMinTs = 
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue);
+                final long foundMinTs = 
RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(rawSegmentValue);
                 if (foundMinTs <= timestamp) {
                     // the record being inserted belongs in this segment.
                     // insert and conclude the procedure.
@@ -906,7 +906,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
                     );
                 } else {
                     // insert as latest, since foundTs = sentinel means 
nothing later exists
-                    if 
(RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue) 
== timestamp) {
+                    if 
(RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue) == 
timestamp) {
                         // next timestamp equal to put() timestamp already 
represents a tombstone,
                         // so no additional insertion is needed in this case
                         return foundTs;
@@ -914,7 +914,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
                     final SegmentValue segmentValue
                             = 
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
                     segmentValue.insertAsLatest(
-                            
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue),
+                            
RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue),
                             timestamp,
                             null
                     );
@@ -948,7 +948,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
                                 .serialize()
                 );
             } else {
-                final long foundNextTs = 
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue);
+                final long foundNextTs = 
RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue);
                 if (foundNextTs <= timestamp) {
                     // insert as latest. this case is possible if the found 
segment is "degenerate"
                     // (cf RocksDBVersionedStoreSegmentValueFormatter.java for 
details) as older
@@ -980,7 +980,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
          * @return the timestamp, from the latest value store value bytes 
(representing value
          * and timestamp)
          */
-        static long getTimestamp(final byte[] rawLatestValueAndTimestamp) {
+        static long timestamp(final byte[] rawLatestValueAndTimestamp) {
             return ByteBuffer.wrap(rawLatestValueAndTimestamp).getLong();
         }
 
@@ -988,7 +988,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
          * @return the actual record value, from the latest value store value 
bytes (representing
          * value and timestamp)
          */
-        static byte[] getValue(final byte[] rawLatestValueAndTimestamp) {
+        static byte[] value(final byte[] rawLatestValueAndTimestamp) {
             final byte[] rawValue = new byte[rawLatestValueAndTimestamp.length 
- TIMESTAMP_SIZE];
             System.arraycopy(rawLatestValueAndTimestamp, TIMESTAMP_SIZE, 
rawValue, 0, rawValue.length);
             return rawValue;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
index 1cd37b6ab90..bd82465ec49 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
@@ -90,7 +90,7 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
         // flush segments first, as this is consistent with the store always 
writing to
         // older segments/stores before later ones
         try (final WriteBatch segmentsBatch = new WriteBatch()) {
-            final List<WriteBufferSegmentWithDbFallback> allSegments = 
restoreClient.getReverseSegments(Long.MIN_VALUE);
+            final List<WriteBufferSegmentWithDbFallback> allSegments = 
restoreClient.reversedSegments(Long.MIN_VALUE);
             if (allSegments.size() > 0) {
                 // collect entries into write batch
                 for (final WriteBufferSegmentWithDbFallback bufferSegment : 
allSegments) {
@@ -186,12 +186,12 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
     private class RocksDBVersionedStoreRestoreClient implements 
VersionedStoreClient<WriteBufferSegmentWithDbFallback> {
 
         @Override
-        public byte[] getLatestValue(final Bytes key) {
+        public byte[] latestValue(final Bytes key) {
             final Optional<byte[]> bufferValue = 
latestValueWriteBuffer.get(key);
             if (bufferValue != null) {
                 return bufferValue.orElse(null);
             }
-            return dbClient.getLatestValue(key);
+            return dbClient.latestValue(key);
         }
 
         @Override
@@ -221,13 +221,13 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
         }
 
         @Override
-        public List<WriteBufferSegmentWithDbFallback> getReverseSegments(final 
long timestampFrom) {
+        public List<WriteBufferSegmentWithDbFallback> reversedSegments(final 
long timestampFrom) {
             // head and not tail because the map is sorted in reverse order
             final long segmentFrom = segmentIdForTimestamp(timestampFrom);
             final List<WriteBufferSegmentWithDbFallback> bufferSegments =
                 new ArrayList<>(segmentsWriteBuffer.headMap(segmentFrom, 
true).values());
 
-            final List<LogicalKeyValueSegment> dbSegments = 
dbClient.getReverseSegments(timestampFrom);
+            final List<LogicalKeyValueSegment> dbSegments = 
dbClient.reversedSegments(timestampFrom);
 
             // merge segments from db with segments from write buffer
             final List<WriteBufferSegmentWithDbFallback> allSegments = new 
ArrayList<>();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
index 0b22ada12f6..9b60bd4f063 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
@@ -101,14 +101,14 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
     /**
      * @return the validTo timestamp of the latest record in the provided 
segment
      */
-    static long getNextTimestamp(final byte[] segmentValue) {
+    static long nextTimestamp(final byte[] segmentValue) {
         return ByteBuffer.wrap(segmentValue).getLong(0);
     }
 
     /**
      * @return the (validFrom) timestamp of the earliest record in the 
provided segment.
      */
-    static long getMinTimestamp(final byte[] segmentValue) {
+    static long minTimestamp(final byte[] segmentValue) {
         return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
     }
 
@@ -271,9 +271,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
         private PartiallyDeserializedSegmentValue(final byte[] segmentValue) {
             this.segmentValue = segmentValue;
             this.nextTimestamp =
-                
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+                
RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(segmentValue);
             this.minTimestamp =
-                
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+                
RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(segmentValue);
             this.isDegenerate = nextTimestamp == minTimestamp;
             resetDeserHelpers();
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 7e50b984521..a3ef2426c3c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -26,7 +26,7 @@ interface Segments<S extends Segment> {
 
     String segmentName(final long segmentId);
 
-    S getSegmentForTimestamp(final long timestamp);
+    S segmentForTimestamp(final long timestamp);
 
     S getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext 
context, final long streamTime);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index fa2081ad25b..bc7a52a4814 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -410,7 +410,7 @@ public final class StoreQueryUtils {
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    public static <V> Function<byte[], V> getDeserializeValue(final 
StateSerdes<?, V> serdes, final StateStore wrapped) {
+    public static <V> Function<byte[], V> deserializeValue(final 
StateSerdes<?, V> serdes, final StateStore wrapped) {
         final Serde<V> valueSerde = serdes.valueSerde();
         final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) 
|| isAdapter(wrapped);
         final Deserializer<V> deserializer;
@@ -435,7 +435,7 @@ public final class StoreQueryUtils {
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>> 
getDeserializeValue(final StateSerdes<?, V> serdes) {
+    public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>> 
deserializeValue(final StateSerdes<?, V> serdes) {
         final Serde<V> valueSerde = serdes.valueSerde();
         final Deserializer<V> deserializer = valueSerde.deserializer();
         return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent() 
? new VersionedRecord<>(deserializer.deserialize(serdes.topic(), 
rawVersionedRecord.value()),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java
index 3b799e81539..127835438df 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java
@@ -65,11 +65,11 @@ public class TimestampedKeyAndJoinSide<K> {
         return leftSide;
     }
 
-    public K getKey() {
+    public K key() {
         return key;
     }
 
-    public long getTimestamp() {
+    public long timestamp() {
         return timestamp;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
index 801c417e1ed..1c4e3675058 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
@@ -56,8 +56,8 @@ public class TimestampedKeyAndJoinSideSerializer<K> 
implements WrappingNullableS
     @Override
     public byte[] serialize(final String topic, final 
TimestampedKeyAndJoinSide<K> data) {
         final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0);
-        final byte[] keyBytes = keySerializer.serialize(topic, data.getKey());
-        final byte[] timestampBytes = timestampSerializer.serialize(topic, 
data.getTimestamp());
+        final byte[] keyBytes = keySerializer.serialize(topic, data.key());
+        final byte[] timestampBytes = timestampSerializer.serialize(topic, 
data.timestamp());
 
         return ByteBuffer
             .allocate(timestampBytes.length + 1 + keyBytes.length)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 437d4683eef..0e25187e788 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -292,7 +292,6 @@ public class StreamsMetadataStateTest {
 
         final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, 
mkSet(hostOne), 2);
 
-
         final KeyQueryMetadata actual = 
metadataState.keyQueryMetadataForKey("merged-table",  "the-key",
             (topic, key, value, numPartitions) -> 
Optional.of(Collections.singleton(2)));
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index 93d00fb306c..9e083ebbaf2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -133,7 +133,7 @@ public class KeyValueSegmentsTest {
     public void shouldGetSegmentForTimestamp() {
         final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
         segments.getOrCreateSegmentIfLive(1, context, -1L);
-        assertEquals(segment, segments.getSegmentForTimestamp(0L));
+        assertEquals(segment, segments.segmentForTimestamp(0L));
     }
 
     @Test
@@ -169,11 +169,11 @@ public class KeyValueSegmentsTest {
         segments = new KeyValueSegments("test",  METRICS_SCOPE, 4, 1);
         segments.openExisting(context, -1L);
 
-        assertTrue(segments.getSegmentForTimestamp(0).isOpen());
-        assertTrue(segments.getSegmentForTimestamp(1).isOpen());
-        assertTrue(segments.getSegmentForTimestamp(2).isOpen());
-        assertTrue(segments.getSegmentForTimestamp(3).isOpen());
-        assertTrue(segments.getSegmentForTimestamp(4).isOpen());
+        assertTrue(segments.segmentForTimestamp(0).isOpen());
+        assertTrue(segments.segmentForTimestamp(1).isOpen());
+        assertTrue(segments.segmentForTimestamp(2).isOpen());
+        assertTrue(segments.segmentForTimestamp(3).isOpen());
+        assertTrue(segments.segmentForTimestamp(4).isOpen());
     }
 
     @Test
@@ -342,7 +342,7 @@ public class KeyValueSegmentsTest {
     public void shouldClearSegmentsOnClose() {
         segments.getOrCreateSegmentIfLive(0, context, -1L);
         segments.close();
-        assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
+        assertThat(segments.segmentForTimestamp(0), is(nullValue()));
     }
 
     private void verifyCorrectSegments(final long first, final int 
numSegments) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
index 74a44d31ab9..8e5a27b1218 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
@@ -167,10 +167,10 @@ public class LogicalKeyValueSegmentsTest {
         final LogicalKeyValueSegment segment1 = 
segments.getOrCreateSegmentIfLive(0, context, 0L);
         final LogicalKeyValueSegment segment2 = 
segments.getOrCreateSegmentIfLive(1, context, SEGMENT_INTERVAL);
 
-        assertEquals(segment1, segments.getSegmentForTimestamp(0L));
-        assertEquals(segment1, 
segments.getSegmentForTimestamp(SEGMENT_INTERVAL - 1));
-        assertEquals(segment2, 
segments.getSegmentForTimestamp(SEGMENT_INTERVAL));
-        assertEquals(segment2, segments.getSegmentForTimestamp(2 * 
SEGMENT_INTERVAL - 1));
+        assertEquals(segment1, segments.segmentForTimestamp(0L));
+        assertEquals(segment1, segments.segmentForTimestamp(SEGMENT_INTERVAL - 
1));
+        assertEquals(segment2, segments.segmentForTimestamp(SEGMENT_INTERVAL));
+        assertEquals(segment2, segments.segmentForTimestamp(2 * 
SEGMENT_INTERVAL - 1));
     }
 
     @Test
@@ -226,7 +226,7 @@ public class LogicalKeyValueSegmentsTest {
 
         segments.close();
 
-        assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
+        assertThat(segments.segmentForTimestamp(0), is(nullValue()));
         assertThat(segments.getReservedSegment(-1), is(nullValue()));
         // verify iterators closed as well
         assertThrows(InvalidStateStoreException.class, all1::hasNext);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
index 1507b2f5958..7a4f0937ee5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
@@ -270,8 +270,8 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest 
{
     public void shouldGetTimestamps(final TestCase testCase) {
         final byte[] segmentValue = 
buildSegmentWithInsertLatest(testCase).serialize();
 
-        
assertThat(RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue),
 equalTo(testCase.nextTimestamp));
-        
assertThat(RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue),
 equalTo(testCase.minTimestamp));
+        
assertThat(RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(segmentValue),
 equalTo(testCase.nextTimestamp));
+        
assertThat(RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(segmentValue),
 equalTo(testCase.minTimestamp));
     }
 
     @ParameterizedTest
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index b4601ba7c6e..63a70acb11c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -134,7 +134,7 @@ public class TimestampedSegmentsTest {
     public void shouldGetSegmentForTimestamp() {
         final TimestampedSegment segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
         segments.getOrCreateSegmentIfLive(1, context, -1L);
-        assertEquals(segment, segments.getSegmentForTimestamp(0L));
+        assertEquals(segment, segments.segmentForTimestamp(0L));
     }
 
     @Test
@@ -170,11 +170,11 @@ public class TimestampedSegmentsTest {
         segments = new TimestampedSegments("test", METRICS_SCOPE, 4, 1);
         segments.openExisting(context, -1L);
 
-        assertTrue(segments.getSegmentForTimestamp(0).isOpen());
-        assertTrue(segments.getSegmentForTimestamp(1).isOpen());
-        assertTrue(segments.getSegmentForTimestamp(2).isOpen());
-        assertTrue(segments.getSegmentForTimestamp(3).isOpen());
-        assertTrue(segments.getSegmentForTimestamp(4).isOpen());
+        assertTrue(segments.segmentForTimestamp(0).isOpen());
+        assertTrue(segments.segmentForTimestamp(1).isOpen());
+        assertTrue(segments.segmentForTimestamp(2).isOpen());
+        assertTrue(segments.segmentForTimestamp(3).isOpen());
+        assertTrue(segments.segmentForTimestamp(4).isOpen());
     }
 
     @Test
@@ -343,7 +343,7 @@ public class TimestampedSegmentsTest {
     public void shouldClearSegmentsOnClose() {
         segments.getOrCreateSegmentIfLive(0, context, -1L);
         segments.close();
-        assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
+        assertThat(segments.segmentForTimestamp(0), is(nullValue()));
     }
 
     private void verifyCorrectSegments(final long first, final int 
numSegments) {

Reply via email to