This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fc720d33a03 MINOR: remove get prefix for internal state methods
(#17053)
fc720d33a03 is described below
commit fc720d33a03e5b40a31b82c4baddb8a23b6e8f9d
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sat Aug 31 05:02:06 2024 -0700
MINOR: remove get prefix for internal state methods (#17053)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kstream/internals/KStreamKStreamJoin.java | 6 +--
.../internals/KStreamKStreamJoinLeftSide.java | 4 +-
.../internals/KStreamKStreamJoinRightSide.java | 4 +-
.../internals/AbstractReadWriteDecorator.java | 2 +-
.../processor/internals/DefaultStateUpdater.java | 4 +-
.../internals/GlobalProcessorContextImpl.java | 6 +--
.../processor/internals/ProcessorContextImpl.java | 4 +-
...stractDualSchemaRocksDBSegmentedBytesStore.java | 4 +-
.../AbstractRocksDBSegmentedBytesStore.java | 6 +--
.../streams/state/internals/AbstractSegments.java | 2 +-
.../streams/state/internals/LeftOrRightValue.java | 4 +-
.../internals/LeftOrRightValueSerializer.java | 8 ++--
.../state/internals/LogicalKeyValueSegment.java | 14 +++----
.../state/internals/LogicalKeyValueSegments.java | 2 +-
.../state/internals/LogicalSegmentIterator.java | 6 +--
.../state/internals/MeteredKeyValueStore.java | 5 +--
.../state/internals/MeteredSessionStore.java | 2 +-
.../internals/MeteredTimestampedKeyValueStore.java | 9 ++--
.../internals/MeteredVersionedKeyValueStore.java | 2 +-
.../state/internals/MeteredWindowStore.java | 5 +--
.../streams/state/internals/RocksDBStore.java | 2 +-
.../state/internals/RocksDBVersionedStore.java | 48 +++++++++++-----------
.../RocksDBVersionedStoreRestoreWriteBuffer.java | 10 ++---
...RocksDBVersionedStoreSegmentValueFormatter.java | 8 ++--
.../kafka/streams/state/internals/Segments.java | 2 +-
.../streams/state/internals/StoreQueryUtils.java | 4 +-
.../state/internals/TimestampedKeyAndJoinSide.java | 4 +-
.../TimestampedKeyAndJoinSideSerializer.java | 4 +-
.../internals/StreamsMetadataStateTest.java | 1 -
.../state/internals/KeyValueSegmentsTest.java | 14 +++----
.../internals/LogicalKeyValueSegmentsTest.java | 10 ++---
...sDBVersionedStoreSegmentValueFormatterTest.java | 4 +-
.../state/internals/TimestampedSegmentsTest.java | 14 +++----
33 files changed, 110 insertions(+), 114 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 745f7038257..9ba8f316271 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -207,7 +207,7 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut,
VThis, VOther> impleme
while (it.hasNext()) {
final KeyValue<TimestampedKeyAndJoinSide<K>,
LeftOrRightValue<VLeft, VRight>> nextKeyValue = it.next();
final TimestampedKeyAndJoinSide<K>
timestampedKeyAndJoinSide = nextKeyValue.key;
- sharedTimeTracker.minTime =
timestampedKeyAndJoinSide.getTimestamp();
+ sharedTimeTracker.minTime =
timestampedKeyAndJoinSide.timestamp();
if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
// if windows are open for both joinSides we can break
since there are no more candidates to emit
break;
@@ -250,8 +250,8 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut,
VThis, VOther> impleme
private void forwardNonJoinedOuterRecords(final Record<K, VThis>
record,
final
TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide,
final
LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
- final K key = timestampedKeyAndJoinSide.getKey();
- final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
+ final K key = timestampedKeyAndJoinSide.key();
+ final long timestamp = timestampedKeyAndJoinSide.timestamp();
final VThis thisValue = getThisValue(leftOrRightValue);
final VOther otherValue = getOtherValue(leftOrRightValue);
final VOut nullJoinedValue = joiner.apply(key, thisValue,
otherValue);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
index 3b4ee5b3393..df03e39b8c5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
@@ -60,12 +60,12 @@ class KStreamKStreamJoinLeftSide<K, VLeft, VRight, VOut>
extends KStreamKStreamJ
@Override
public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
- return leftOrRightValue.getLeftValue();
+ return leftOrRightValue.leftValue();
}
@Override
public VRight getOtherValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
- return leftOrRightValue.getRightValue();
+ return leftOrRightValue.rightValue();
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
index e4bcfe4e105..ec29d5f12b8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
@@ -59,12 +59,12 @@ class KStreamKStreamJoinRightSide<K, VLeft, VRight, VOut>
extends KStreamKStream
@Override
public VRight getThisValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
- return leftOrRightValue.getRightValue();
+ return leftOrRightValue.rightValue();
}
@Override
public VLeft getOtherValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
- return leftOrRightValue.getLeftValue();
+ return leftOrRightValue.leftValue();
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 11433af42cf..353c26ec253 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -53,7 +53,7 @@ abstract class AbstractReadWriteDecorator<T extends
StateStore, K, V> extends Wr
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
- static StateStore readWriteStore(final StateStore store) {
+ static StateStore wrapWithReadWriteStore(final StateStore store) {
if (store instanceof TimestampedKeyValueStore) {
return new
TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>)
store);
} else if (store instanceof VersionedKeyValueStore) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 2e4bb85db7c..addef5a9f15 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -200,7 +200,7 @@ public class DefaultStateUpdater implements StateUpdater {
private void performActionsOnTasks() {
tasksAndActionsLock.lock();
try {
- for (final TaskAndAction taskAndAction : getTasksAndActions())
{
+ for (final TaskAndAction taskAndAction : tasksAndActions()) {
final Action action = taskAndAction.action();
switch (action) {
case ADD:
@@ -458,7 +458,7 @@ public class DefaultStateUpdater implements StateUpdater {
changelogReader.clear();
}
- private List<TaskAndAction> getTasksAndActions() {
+ private List<TaskAndAction> tasksAndActions() {
final List<TaskAndAction> tasksAndActionsToProcess = new
ArrayList<>(tasksAndActions);
tasksAndActions.clear();
return tasksAndActionsToProcess;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 74148cd21b1..828ae3a0a79 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -34,7 +34,7 @@ import
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
import java.time.Duration;
-import static
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.readWriteStore;
+import static
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;
public class GlobalProcessorContextImpl extends
AbstractProcessorContext<Object, Object> {
@@ -60,7 +60,7 @@ public class GlobalProcessorContextImpl extends
AbstractProcessorContext<Object,
@Override
public <S extends StateStore> S getStateStore(final String name) {
final StateStore store = stateManager.globalStore(name);
- return (S) readWriteStore(store);
+ return (S) wrapWithReadWriteStore(store);
}
@SuppressWarnings("unchecked")
@@ -156,4 +156,4 @@ public class GlobalProcessorContextImpl extends
AbstractProcessorContext<Object,
public void registerCacheFlushListener(final String namespace, final
DirtyEntryFlushListener listener) {
cache.addDirtyEntryFlushListener(namespace, listener);
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 91ee78b1246..dc946ba3b6c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -47,7 +47,7 @@ import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTEN
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static
org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
-import static
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.readWriteStore;
+import static
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;
public class ProcessorContextImpl extends AbstractProcessorContext<Object,
Object> implements RecordCollector.Supplier {
// the below are null for standby tasks
@@ -182,7 +182,7 @@ public class ProcessorContextImpl extends
AbstractProcessorContext<Object, Objec
}
final StateStore store = stateManager.store(name);
- return (S) readWriteStore(store);
+ return (S) wrapWithReadWriteStore(store);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index f2d789be2b1..e55d8452fae 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -113,7 +113,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
public void remove(final Bytes rawBaseKey) {
final long timestamp = baseKeySchema.segmentTimestamp(rawBaseKey);
observedStreamTime = Math.max(observedStreamTime, timestamp);
- final S segment = segments.getSegmentForTimestamp(timestamp);
+ final S segment = segments.segmentForTimestamp(timestamp);
if (segment == null) {
return;
}
@@ -227,7 +227,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
}
}
- final S segment = segments.getSegmentForTimestamp(timestampFromRawKey);
+ final S segment = segments.segmentForTimestamp(timestampFromRawKey);
if (segment == null) {
return null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 9aee1de871e..f5b4366ae98 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -239,7 +239,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
public void remove(final Bytes key) {
final long timestamp = keySchema.segmentTimestamp(key);
observedStreamTime = Math.max(observedStreamTime, timestamp);
- final S segment = segments.getSegmentForTimestamp(timestamp);
+ final S segment = segments.segmentForTimestamp(timestamp);
if (segment == null) {
return;
}
@@ -249,7 +249,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
@Override
public void remove(final Bytes key, final long timestamp) {
final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key,
timestamp);
- final S segment = segments.getSegmentForTimestamp(timestamp);
+ final S segment = segments.segmentForTimestamp(timestamp);
if (segment != null) {
segment.deleteRange(keyBytes, keyBytes);
}
@@ -281,7 +281,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
key.toString(), timestampFromKey, observedStreamTime -
retentionPeriod + 1);
return null;
}
- final S segment = segments.getSegmentForTimestamp(timestampFromKey);
+ final S segment = segments.segmentForTimestamp(timestampFromKey);
if (segment == null) {
return null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index 24020e9d2c5..5611fe99d24 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -75,7 +75,7 @@ abstract class AbstractSegments<S extends Segment> implements
Segments<S> {
}
@Override
- public S getSegmentForTimestamp(final long timestamp) {
+ public S segmentForTimestamp(final long timestamp) {
return segments.get(segmentId(timestamp));
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
index 5f5e0ab7dd7..869a524bee2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
@@ -63,11 +63,11 @@ public class LeftOrRightValue<V1, V2> {
return new LeftOrRightValue<>(null, rightValue);
}
- public V1 getLeftValue() {
+ public V1 leftValue() {
return leftValue;
}
- public V2 getRightValue() {
+ public V2 rightValue() {
return rightValue;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
index 8f3c47d14d1..1c64c29fd5a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
@@ -65,9 +65,9 @@ public class LeftOrRightValueSerializer<V1, V2> implements
WrappingNullableSeria
return null;
}
- final byte[] rawValue = (data.getLeftValue() != null)
- ? leftSerializer.serialize(topic, data.getLeftValue())
- : rightSerializer.serialize(topic, data.getRightValue());
+ final byte[] rawValue = (data.leftValue() != null)
+ ? leftSerializer.serialize(topic, data.leftValue())
+ : rightSerializer.serialize(topic, data.rightValue());
if (rawValue == null) {
return null;
@@ -75,7 +75,7 @@ public class LeftOrRightValueSerializer<V1, V2> implements
WrappingNullableSeria
return ByteBuffer
.allocate(1 + rawValue.length)
- .put((byte) (data.getLeftValue() != null ? 1 : 0))
+ .put((byte) (data.leftValue() != null ? 1 : 0))
.put(rawValue)
.array();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
index 77e788d879f..6b9cd747de2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
@@ -89,7 +89,7 @@ class LogicalKeyValueSegment implements
Comparable<LogicalKeyValueSegment>, Segm
+ "an entire store is closed, via the close() method rather
than destroy().");
}
- final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+ final Bytes keyPrefix = prefixKeyFormatter.prefix();
// this deleteRange() call deletes all entries with the given prefix,
because the
// deleteRange() implementation calls Bytes.increment() in order to
make keyTo inclusive
@@ -192,8 +192,8 @@ class LogicalKeyValueSegment implements
Comparable<LogicalKeyValueSegment>, Segm
}
}
- public Snapshot getSnapshot() {
- return physicalStore.getSnapshot();
+ public Snapshot snapshot() {
+ return physicalStore.snapshot();
}
public void releaseSnapshot(final Snapshot snapshot) {
@@ -204,14 +204,14 @@ class LogicalKeyValueSegment implements
Comparable<LogicalKeyValueSegment>, Segm
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes
from, final Bytes to) {
// from bound is inclusive. if the provided bound is null, replace
with prefix
final Bytes fromBound = from == null
- ? prefixKeyFormatter.getPrefix()
+ ? prefixKeyFormatter.prefix()
: prefixKeyFormatter.addPrefix(from);
// to bound is inclusive. if the provided bound is null, replace with
the next prefix.
// this requires potentially filtering out the element corresponding
to the next prefix
// with empty bytes from the returned iterator. this filtering is
accomplished by
// passing the prefix filter into
StrippedPrefixKeyValueIteratorAdapter().
final Bytes toBound = to == null
- ? incrementWithoutOverflow(prefixKeyFormatter.getPrefix())
+ ? incrementWithoutOverflow(prefixKeyFormatter.prefix())
: prefixKeyFormatter.addPrefix(to);
final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes =
physicalStore.range(
fromBound,
@@ -226,7 +226,7 @@ class LogicalKeyValueSegment implements
Comparable<LogicalKeyValueSegment>, Segm
@Override
public synchronized KeyValueIterator<Bytes, byte[]> all() {
final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes =
physicalStore.prefixScan(
- prefixKeyFormatter.getPrefix(),
+ prefixKeyFormatter.prefix(),
new BytesSerializer(),
openIterators);
return new StrippedPrefixKeyValueIteratorAdapter(
@@ -288,7 +288,7 @@ class LogicalKeyValueSegment implements
Comparable<LogicalKeyValueSegment>, Segm
return rawKey;
}
- Bytes getPrefix() {
+ Bytes prefix() {
return Bytes.wrap(prefix);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index 85985f9d373..bcbeb4689b3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -29,7 +29,7 @@ import java.util.Map;
* Regular segments with {@code segmentId >= 0} expire according to the
specified
* retention period. "Reserved" segments with {@code segmentId < 0} do not
expire
* and are completely separate from regular segments in that methods such as
- * {@link #getSegmentForTimestamp(long)}, {@link #getOrCreateSegment(long,
ProcessorContext)},
+ * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long,
ProcessorContext)},
* {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)},
* {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)}
* only return regular segments and not reserved segments. The methods {@link
#flush()}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java
index 3acdeac2273..d31d9942af6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java
@@ -95,16 +95,16 @@ public class LogicalSegmentIterator implements
VersionedRecordIterator {
// fact all use the same physical RocksDB under-the-hood.
this.snapshotOwner = segment;
// take a RocksDB snapshot to return the segments content at
the query time (in order to guarantee consistency)
- this.snapshot = snapshotOwner.getSnapshot();
+ this.snapshot = snapshotOwner.snapshot();
}
final byte[] rawSegmentValue = segment.get(key, snapshot);
if (rawSegmentValue != null) { // this segment contains record(s)
with the specified key
if (segment.id() == -1) { // this is the latestValueStore
- final long recordTimestamp =
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
+ final long recordTimestamp =
RocksDBVersionedStore.LatestValueFormatter.timestamp(rawSegmentValue);
if (recordTimestamp <= toTime) {
// latest value satisfies timestamp bound
- queryResults.add(new
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
recordTimestamp));
+ queryResults.add(new
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.value(rawSegmentValue),
recordTimestamp));
}
} else {
// this segment contains records with the specified key
and time range
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index ba3829cda56..6816910758e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -61,7 +61,6 @@ import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static
org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
/**
* A Metered {@link KeyValueStore} wrapper that is used for recording
operation metrics, and hence its
@@ -263,7 +262,7 @@ public class MeteredKeyValueStore<K, V>
final KeyValueIterator<K, V> resultIterator = new
MeteredKeyValueTimestampedIterator(
iterator,
getSensor,
- getDeserializeValue(serdes, wrapped())
+ StoreQueryUtils.deserializeValue(serdes, wrapped())
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
@@ -289,7 +288,7 @@ public class MeteredKeyValueStore<K, V>
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
- final Function<byte[], V> deserializer =
getDeserializeValue(serdes, wrapped());
+ final Function<byte[], V> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
final V value = deserializer.apply(rawResult.getResult());
final QueryResult<V> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 376d74e7b0d..ad20af61bda 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -455,7 +455,7 @@ public class MeteredSessionStore<K, V>
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
- StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
+ StoreQueryUtils.deserializeValue(serdes, wrapped()),
time,
numOpenIterators,
openIterators
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 61e6533fb8d..6f90ef56d86 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -44,7 +44,6 @@ import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static
org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
/**
* A Metered {@link TimestampedKeyValueStore} wrapper that is used for
recording operation metrics, and hence its
@@ -186,7 +185,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
- final Function<byte[], ValueAndTimestamp<V>> deserializer =
getDeserializeValue(serdes, wrapped());
+ final Function<byte[], ValueAndTimestamp<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueAndTimestamp<V> valueAndTimestamp =
deserializer.apply(rawResult.getResult());
final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
valueAndTimestamp);
@@ -224,7 +223,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
(KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreIterator(
iterator,
getSensor,
- getDeserializeValue(serdes, wrapped()),
+ StoreQueryUtils.deserializeValue(serdes, wrapped()),
false
);
final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>>
typedQueryResult =
@@ -251,7 +250,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
- final Function<byte[], ValueAndTimestamp<V>> deserializer =
getDeserializeValue(serdes, wrapped());
+ final Function<byte[], ValueAndTimestamp<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueAndTimestamp<V> valueAndTimestamp =
deserializer.apply(rawResult.getResult());
final V plainValue = valueAndTimestamp == null ? null :
valueAndTimestamp.value();
final QueryResult<V> typedQueryResult =
@@ -290,7 +289,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final KeyValueIterator<K, V> resultIterator = new
MeteredTimestampedKeyValueStoreIterator(
iterator,
getSensor,
- getDeserializeValue(serdes, wrapped()),
+ StoreQueryUtils.deserializeValue(serdes, wrapped()),
true
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index b331bb6ac8f..acdc3796646 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -267,7 +267,7 @@ public class MeteredVersionedKeyValueStore<K, V>
rawResult.getResult(),
iteratorDurationSensor,
time,
-
StoreQueryUtils.getDeserializeValue(plainValueSerdes),
+ StoreQueryUtils.deserializeValue(plainValueSerdes),
numOpenIterators,
openIterators
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index b2a84ce002d..05d423a985c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -56,7 +56,6 @@ import java.util.concurrent.atomic.LongAdder;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static
org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
public class MeteredWindowStore<K, V>
extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
@@ -417,7 +416,7 @@ public class MeteredWindowStore<K, V>
iteratorDurationSensor,
streamsMetrics,
serdes::keyFrom,
- getDeserializeValue(serdes, wrapped()),
+ StoreQueryUtils.deserializeValue(serdes, wrapped()),
time,
numOpenIterators,
openIterators
@@ -469,7 +468,7 @@ public class MeteredWindowStore<K, V>
fetchSensor,
iteratorDurationSensor,
streamsMetrics,
- getDeserializeValue(serdes, wrapped()),
+ StoreQueryUtils.deserializeValue(serdes, wrapped()),
time,
numOpenIterators,
openIterators
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index b643fe1c7a7..52c193c86e0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -365,7 +365,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
}
}
- public Snapshot getSnapshot() {
+ public Snapshot snapshot() {
return db.getSnapshot();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index 21c56a6328f..eaaed6f30e3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -191,8 +191,8 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
if (rawLatestValueAndTimestamp != null) {
return new VersionedRecord<>(
- LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
- LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp)
+ LatestValueFormatter.value(rawLatestValueAndTimestamp),
+ LatestValueFormatter.timestamp(rawLatestValueAndTimestamp)
);
} else {
return null;
@@ -210,12 +210,12 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
// still be returned (i.e., the latest record version per key
never expires).
final byte[] rawLatestValueAndTimestamp =
latestValueStore.get(key);
if (rawLatestValueAndTimestamp != null) {
- final long latestTimestamp =
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+ final long latestTimestamp =
LatestValueFormatter.timestamp(rawLatestValueAndTimestamp);
if (latestTimestamp <= asOfTimestamp) {
// latest value satisfies timestamp bound
return new VersionedRecord<>(
-
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
- latestTimestamp
+ LatestValueFormatter.value(rawLatestValueAndTimestamp),
+ latestTimestamp
);
}
}
@@ -230,9 +230,9 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
// first check the latest value store
final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
if (rawLatestValueAndTimestamp != null) {
- final long latestTimestamp =
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+ final long latestTimestamp =
LatestValueFormatter.timestamp(rawLatestValueAndTimestamp);
if (latestTimestamp <= asOfTimestamp) {
- return new
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
latestTimestamp);
+ return new
VersionedRecord<>(LatestValueFormatter.value(rawLatestValueAndTimestamp),
latestTimestamp);
}
}
@@ -241,14 +241,14 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
for (final LogicalKeyValueSegment segment : segments) {
final byte[] rawSegmentValue = segment.get(key);
if (rawSegmentValue != null) {
- final long nextTs =
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue);
+ final long nextTs =
RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue);
if (nextTs <= asOfTimestamp) {
// this segment contains no data for the queried
timestamp, so earlier segments
// cannot either
return null;
}
- if
(RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue) >
asOfTimestamp) {
+ if
(RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(rawSegmentValue) >
asOfTimestamp) {
// the segment only contains data for after the queried
timestamp. skip and
// continue the search to earlier segments. as an
optimization, this code
// could be updated to skip forward to the segment
containing the minTimestamp
@@ -474,7 +474,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
/**
* @return the contents of the latest value store, for the given key
*/
- byte[] getLatestValue(Bytes key);
+ byte[] latestValue(Bytes key);
/**
* Puts the provided key and value into the latest value store.
@@ -496,7 +496,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
* timestamp bound, in reverse order by segment id (and time), i.e.,
such that
* the most recent segment is first
*/
- List<T> getReverseSegments(long timestampFrom);
+ List<T> reversedSegments(long timestampFrom);
/**
* @return the segment id associated with the provided timestamp
@@ -510,7 +510,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
class RocksDBVersionedStoreClient implements
VersionedStoreClient<LogicalKeyValueSegment> {
@Override
- public byte[] getLatestValue(final Bytes key) {
+ public byte[] latestValue(final Bytes key) {
return latestValueStore.get(key);
}
@@ -530,7 +530,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
}
@Override
- public List<LogicalKeyValueSegment> getReverseSegments(final long
timestampFrom) {
+ public List<LogicalKeyValueSegment> reversedSegments(final long
timestampFrom) {
return segmentStores.segments(timestampFrom, Long.MAX_VALUE,
false);
}
@@ -668,9 +668,9 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
// that the segment should be inserted into the latest value store.
long foundTs = SENTINEL_TIMESTAMP;
- final byte[] rawLatestValueAndTimestamp =
versionedStoreClient.getLatestValue(key);
+ final byte[] rawLatestValueAndTimestamp =
versionedStoreClient.latestValue(key);
if (rawLatestValueAndTimestamp != null) {
- final long latestValueStoreTimestamp =
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+ final long latestValueStoreTimestamp =
LatestValueFormatter.timestamp(rawLatestValueAndTimestamp);
if (timestamp >= latestValueStoreTimestamp) {
// new record belongs in the latest value store
if (timestamp > latestValueStoreTimestamp) {
@@ -692,7 +692,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
// is expired.) so, there is nothing to do for this step
if `segment == null`,
// but we do still update the latest value store with the
new record below.
if (segment != null) {
- final byte[] rawValueToMove =
LatestValueFormatter.getValue(rawLatestValueAndTimestamp);
+ final byte[] rawValueToMove =
LatestValueFormatter.value(rawLatestValueAndTimestamp);
final byte[] rawSegmentValue = segment.get(key);
if (rawSegmentValue == null) {
segment.put(
@@ -734,11 +734,11 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
// initialize with current foundTs value
long foundTs = prevFoundTs;
- final List<T> segments =
versionedStoreClient.getReverseSegments(timestamp);
+ final List<T> segments =
versionedStoreClient.reversedSegments(timestamp);
for (final T segment : segments) {
final byte[] rawSegmentValue = segment.get(key);
if (rawSegmentValue != null) {
- final long foundNextTs =
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue);
+ final long foundNextTs =
RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue);
if (foundNextTs <= timestamp) {
// this segment (and all earlier segments) does not
contain records affected by
// this put. insert into the segment specified by foundTs
(i.e., the next
@@ -746,7 +746,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
return new PutStatus(false, foundTs);
}
- final long foundMinTs =
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue);
+ final long foundMinTs =
RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(rawSegmentValue);
if (foundMinTs <= timestamp) {
// the record being inserted belongs in this segment.
// insert and conclude the procedure.
@@ -906,7 +906,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
);
} else {
// insert as latest, since foundTs = sentinel means
nothing later exists
- if
(RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue)
== timestamp) {
+ if
(RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue) ==
timestamp) {
// next timestamp equal to put() timestamp already
represents a tombstone,
// so no additional insertion is needed in this case
return foundTs;
@@ -914,7 +914,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
final SegmentValue segmentValue
=
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
segmentValue.insertAsLatest(
-
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue),
+
RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue),
timestamp,
null
);
@@ -948,7 +948,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
.serialize()
);
} else {
- final long foundNextTs =
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue);
+ final long foundNextTs =
RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue);
if (foundNextTs <= timestamp) {
// insert as latest. this case is possible if the found
segment is "degenerate"
// (cf RocksDBVersionedStoreSegmentValueFormatter.java for
details) as older
@@ -980,7 +980,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
* @return the timestamp, from the latest value store value bytes
(representing value
* and timestamp)
*/
- static long getTimestamp(final byte[] rawLatestValueAndTimestamp) {
+ static long timestamp(final byte[] rawLatestValueAndTimestamp) {
return ByteBuffer.wrap(rawLatestValueAndTimestamp).getLong();
}
@@ -988,7 +988,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
* @return the actual record value, from the latest value store value
bytes (representing
* value and timestamp)
*/
- static byte[] getValue(final byte[] rawLatestValueAndTimestamp) {
+ static byte[] value(final byte[] rawLatestValueAndTimestamp) {
final byte[] rawValue = new byte[rawLatestValueAndTimestamp.length
- TIMESTAMP_SIZE];
System.arraycopy(rawLatestValueAndTimestamp, TIMESTAMP_SIZE,
rawValue, 0, rawValue.length);
return rawValue;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
index 1cd37b6ab90..bd82465ec49 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
@@ -90,7 +90,7 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
// flush segments first, as this is consistent with the store always
writing to
// older segments/stores before later ones
try (final WriteBatch segmentsBatch = new WriteBatch()) {
- final List<WriteBufferSegmentWithDbFallback> allSegments =
restoreClient.getReverseSegments(Long.MIN_VALUE);
+ final List<WriteBufferSegmentWithDbFallback> allSegments =
restoreClient.reversedSegments(Long.MIN_VALUE);
if (allSegments.size() > 0) {
// collect entries into write batch
for (final WriteBufferSegmentWithDbFallback bufferSegment :
allSegments) {
@@ -186,12 +186,12 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
private class RocksDBVersionedStoreRestoreClient implements
VersionedStoreClient<WriteBufferSegmentWithDbFallback> {
@Override
- public byte[] getLatestValue(final Bytes key) {
+ public byte[] latestValue(final Bytes key) {
final Optional<byte[]> bufferValue =
latestValueWriteBuffer.get(key);
if (bufferValue != null) {
return bufferValue.orElse(null);
}
- return dbClient.getLatestValue(key);
+ return dbClient.latestValue(key);
}
@Override
@@ -221,13 +221,13 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
}
@Override
- public List<WriteBufferSegmentWithDbFallback> getReverseSegments(final
long timestampFrom) {
+ public List<WriteBufferSegmentWithDbFallback> reversedSegments(final
long timestampFrom) {
// head and not tail because the map is sorted in reverse order
final long segmentFrom = segmentIdForTimestamp(timestampFrom);
final List<WriteBufferSegmentWithDbFallback> bufferSegments =
new ArrayList<>(segmentsWriteBuffer.headMap(segmentFrom,
true).values());
- final List<LogicalKeyValueSegment> dbSegments =
dbClient.getReverseSegments(timestampFrom);
+ final List<LogicalKeyValueSegment> dbSegments =
dbClient.reversedSegments(timestampFrom);
// merge segments from db with segments from write buffer
final List<WriteBufferSegmentWithDbFallback> allSegments = new
ArrayList<>();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
index 0b22ada12f6..9b60bd4f063 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
@@ -101,14 +101,14 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
/**
* @return the validTo timestamp of the latest record in the provided
segment
*/
- static long getNextTimestamp(final byte[] segmentValue) {
+ static long nextTimestamp(final byte[] segmentValue) {
return ByteBuffer.wrap(segmentValue).getLong(0);
}
/**
* @return the (validFrom) timestamp of the earliest record in the
provided segment.
*/
- static long getMinTimestamp(final byte[] segmentValue) {
+ static long minTimestamp(final byte[] segmentValue) {
return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
}
@@ -271,9 +271,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
private PartiallyDeserializedSegmentValue(final byte[] segmentValue) {
this.segmentValue = segmentValue;
this.nextTimestamp =
-
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+
RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(segmentValue);
this.minTimestamp =
-
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+
RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(segmentValue);
this.isDegenerate = nextTimestamp == minTimestamp;
resetDeserHelpers();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 7e50b984521..a3ef2426c3c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -26,7 +26,7 @@ interface Segments<S extends Segment> {
String segmentName(final long segmentId);
- S getSegmentForTimestamp(final long timestamp);
+ S segmentForTimestamp(final long timestamp);
S getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext
context, final long streamTime);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index fa2081ad25b..bc7a52a4814 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -410,7 +410,7 @@ public final class StoreQueryUtils {
}
@SuppressWarnings({"unchecked", "rawtypes"})
- public static <V> Function<byte[], V> getDeserializeValue(final
StateSerdes<?, V> serdes, final StateStore wrapped) {
+ public static <V> Function<byte[], V> deserializeValue(final
StateSerdes<?, V> serdes, final StateStore wrapped) {
final Serde<V> valueSerde = serdes.valueSerde();
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped)
|| isAdapter(wrapped);
final Deserializer<V> deserializer;
@@ -435,7 +435,7 @@ public final class StoreQueryUtils {
}
@SuppressWarnings({"unchecked", "rawtypes"})
- public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>>
getDeserializeValue(final StateSerdes<?, V> serdes) {
+ public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>>
deserializeValue(final StateSerdes<?, V> serdes) {
final Serde<V> valueSerde = serdes.valueSerde();
final Deserializer<V> deserializer = valueSerde.deserializer();
return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent()
? new VersionedRecord<>(deserializer.deserialize(serdes.topic(),
rawVersionedRecord.value()),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java
index 3b799e81539..127835438df 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java
@@ -65,11 +65,11 @@ public class TimestampedKeyAndJoinSide<K> {
return leftSide;
}
- public K getKey() {
+ public K key() {
return key;
}
- public long getTimestamp() {
+ public long timestamp() {
return timestamp;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
index 801c417e1ed..1c4e3675058 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java
@@ -56,8 +56,8 @@ public class TimestampedKeyAndJoinSideSerializer<K>
implements WrappingNullableS
@Override
public byte[] serialize(final String topic, final
TimestampedKeyAndJoinSide<K> data) {
final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0);
- final byte[] keyBytes = keySerializer.serialize(topic, data.getKey());
- final byte[] timestampBytes = timestampSerializer.serialize(topic,
data.getTimestamp());
+ final byte[] keyBytes = keySerializer.serialize(topic, data.key());
+ final byte[] timestampBytes = timestampSerializer.serialize(topic,
data.timestamp());
return ByteBuffer
.allocate(timestampBytes.length + 1 + keyBytes.length)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 437d4683eef..0e25187e788 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -292,7 +292,6 @@ public class StreamsMetadataStateTest {
final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo,
mkSet(hostOne), 2);
-
final KeyQueryMetadata actual =
metadataState.keyQueryMetadataForKey("merged-table", "the-key",
(topic, key, value, numPartitions) ->
Optional.of(Collections.singleton(2)));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index 93d00fb306c..9e083ebbaf2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -133,7 +133,7 @@ public class KeyValueSegmentsTest {
public void shouldGetSegmentForTimestamp() {
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0,
context, -1L);
segments.getOrCreateSegmentIfLive(1, context, -1L);
- assertEquals(segment, segments.getSegmentForTimestamp(0L));
+ assertEquals(segment, segments.segmentForTimestamp(0L));
}
@Test
@@ -169,11 +169,11 @@ public class KeyValueSegmentsTest {
segments = new KeyValueSegments("test", METRICS_SCOPE, 4, 1);
segments.openExisting(context, -1L);
- assertTrue(segments.getSegmentForTimestamp(0).isOpen());
- assertTrue(segments.getSegmentForTimestamp(1).isOpen());
- assertTrue(segments.getSegmentForTimestamp(2).isOpen());
- assertTrue(segments.getSegmentForTimestamp(3).isOpen());
- assertTrue(segments.getSegmentForTimestamp(4).isOpen());
+ assertTrue(segments.segmentForTimestamp(0).isOpen());
+ assertTrue(segments.segmentForTimestamp(1).isOpen());
+ assertTrue(segments.segmentForTimestamp(2).isOpen());
+ assertTrue(segments.segmentForTimestamp(3).isOpen());
+ assertTrue(segments.segmentForTimestamp(4).isOpen());
}
@Test
@@ -342,7 +342,7 @@ public class KeyValueSegmentsTest {
public void shouldClearSegmentsOnClose() {
segments.getOrCreateSegmentIfLive(0, context, -1L);
segments.close();
- assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
+ assertThat(segments.segmentForTimestamp(0), is(nullValue()));
}
private void verifyCorrectSegments(final long first, final int
numSegments) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
index 74a44d31ab9..8e5a27b1218 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
@@ -167,10 +167,10 @@ public class LogicalKeyValueSegmentsTest {
final LogicalKeyValueSegment segment1 =
segments.getOrCreateSegmentIfLive(0, context, 0L);
final LogicalKeyValueSegment segment2 =
segments.getOrCreateSegmentIfLive(1, context, SEGMENT_INTERVAL);
- assertEquals(segment1, segments.getSegmentForTimestamp(0L));
- assertEquals(segment1,
segments.getSegmentForTimestamp(SEGMENT_INTERVAL - 1));
- assertEquals(segment2,
segments.getSegmentForTimestamp(SEGMENT_INTERVAL));
- assertEquals(segment2, segments.getSegmentForTimestamp(2 *
SEGMENT_INTERVAL - 1));
+ assertEquals(segment1, segments.segmentForTimestamp(0L));
+ assertEquals(segment1, segments.segmentForTimestamp(SEGMENT_INTERVAL -
1));
+ assertEquals(segment2, segments.segmentForTimestamp(SEGMENT_INTERVAL));
+ assertEquals(segment2, segments.segmentForTimestamp(2 *
SEGMENT_INTERVAL - 1));
}
@Test
@@ -226,7 +226,7 @@ public class LogicalKeyValueSegmentsTest {
segments.close();
- assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
+ assertThat(segments.segmentForTimestamp(0), is(nullValue()));
assertThat(segments.getReservedSegment(-1), is(nullValue()));
// verify iterators closed as well
assertThrows(InvalidStateStoreException.class, all1::hasNext);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
index 1507b2f5958..7a4f0937ee5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
@@ -270,8 +270,8 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest
{
public void shouldGetTimestamps(final TestCase testCase) {
final byte[] segmentValue =
buildSegmentWithInsertLatest(testCase).serialize();
-
assertThat(RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue),
equalTo(testCase.nextTimestamp));
-
assertThat(RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue),
equalTo(testCase.minTimestamp));
+
assertThat(RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(segmentValue),
equalTo(testCase.nextTimestamp));
+
assertThat(RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(segmentValue),
equalTo(testCase.minTimestamp));
}
@ParameterizedTest
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index b4601ba7c6e..63a70acb11c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -134,7 +134,7 @@ public class TimestampedSegmentsTest {
public void shouldGetSegmentForTimestamp() {
final TimestampedSegment segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
segments.getOrCreateSegmentIfLive(1, context, -1L);
- assertEquals(segment, segments.getSegmentForTimestamp(0L));
+ assertEquals(segment, segments.segmentForTimestamp(0L));
}
@Test
@@ -170,11 +170,11 @@ public class TimestampedSegmentsTest {
segments = new TimestampedSegments("test", METRICS_SCOPE, 4, 1);
segments.openExisting(context, -1L);
- assertTrue(segments.getSegmentForTimestamp(0).isOpen());
- assertTrue(segments.getSegmentForTimestamp(1).isOpen());
- assertTrue(segments.getSegmentForTimestamp(2).isOpen());
- assertTrue(segments.getSegmentForTimestamp(3).isOpen());
- assertTrue(segments.getSegmentForTimestamp(4).isOpen());
+ assertTrue(segments.segmentForTimestamp(0).isOpen());
+ assertTrue(segments.segmentForTimestamp(1).isOpen());
+ assertTrue(segments.segmentForTimestamp(2).isOpen());
+ assertTrue(segments.segmentForTimestamp(3).isOpen());
+ assertTrue(segments.segmentForTimestamp(4).isOpen());
}
@Test
@@ -343,7 +343,7 @@ public class TimestampedSegmentsTest {
public void shouldClearSegmentsOnClose() {
segments.getOrCreateSegmentIfLive(0, context, -1L);
segments.close();
- assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
+ assertThat(segments.segmentForTimestamp(0), is(nullValue()));
}
private void verifyCorrectSegments(final long first, final int
numSegments) {