This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d44627da994 MINOR: remove get prefix for internal KeyValueStoreWrapper
(#17065)
d44627da994 is described below
commit d44627da9949d84f9369ceae0c742f789661c1c1
Author: Chia Chuan Yu <[email protected]>
AuthorDate: Sun Sep 1 10:35:08 2024 +0800
MINOR: remove get prefix for internal KeyValueStoreWrapper (#17065)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/streams/kstream/internals/KStreamAggregate.java | 2 +-
.../org/apache/kafka/streams/kstream/internals/KStreamReduce.java | 2 +-
.../org/apache/kafka/streams/kstream/internals/KTableAggregate.java | 2 +-
.../java/org/apache/kafka/streams/kstream/internals/KTableFilter.java | 2 +-
.../kafka/streams/kstream/internals/KTableKTableJoinMerger.java | 2 +-
.../org/apache/kafka/streams/kstream/internals/KTableMapValues.java | 2 +-
.../java/org/apache/kafka/streams/kstream/internals/KTableReduce.java | 2 +-
.../java/org/apache/kafka/streams/kstream/internals/KTableSource.java | 2 +-
.../apache/kafka/streams/kstream/internals/KTableTransformValues.java | 2 +-
.../apache/kafka/streams/state/internals/KeyValueStoreWrapper.java | 2 +-
.../kafka/streams/state/internals/KeyValueStoreWrapperTest.java | 4 ++--
11 files changed, 12 insertions(+), 12 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index cadee61088a..3e906813d84 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -79,7 +79,7 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements
KStreamAggProcessorSupp
(StreamsMetricsImpl) context.metrics());
store = new KeyValueStoreWrapper<>(context, storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store.getStore(),
+ store.store(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index e405d5740c9..15528f5d150 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -75,7 +75,7 @@ public class KStreamReduce<K, V> implements
KStreamAggProcessorSupplier<K, V, K,
);
store = new KeyValueStoreWrapper<>(context, storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store.getStore(),
+ store.store(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index b54375765bf..f71143ff209 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -70,7 +70,7 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
public void init(final ProcessorContext<KIn, Change<VAgg>> context) {
store = new KeyValueStoreWrapper<>(context, storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store.getStore(),
+ store.store(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index f554cd99978..475ea85db94 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -109,7 +109,7 @@ public class KTableFilter<KIn, VIn> implements
KTableProcessorSupplier<KIn, VIn,
if (queryableName != null) {
store = new KeyValueStoreWrapper<>(context, queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store.getStore(),
+ store.store(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 000311f6586..9d381635586 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -110,7 +110,7 @@ public class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K,
if (queryableName != null) {
store = new KeyValueStoreWrapper<>(context, queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store.getStore(),
+ store.store(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 8bc0c6ed3e5..c26488c12a1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -117,7 +117,7 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
if (queryableName != null) {
store = new KeyValueStoreWrapper<>(context, queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store.getStore(),
+ store.store(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 4a9c18b6a4a..c577d30d984 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -64,7 +64,7 @@ public class KTableReduce<K, V> implements
KTableProcessorSupplier<K, V, K, V> {
public void init(final ProcessorContext<K, Change<V>> context) {
store = new KeyValueStoreWrapper<>(context, storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store.getStore(),
+ store.store(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 451ca6e7ab6..b29a4fa51f1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -94,7 +94,7 @@ public class KTableSource<KIn, VIn> implements
ProcessorSupplier<KIn, VIn, KIn,
if (queryableName != null) {
store = new KeyValueStoreWrapper<>(context, queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store.getStore(),
+ store.store(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index ed7169e0501..f953471f9f3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -107,7 +107,7 @@ class KTableTransformValues<K, V, VOut> implements
KTableProcessorSupplier<K, V,
if (queryableName != null) {
store = new KeyValueStoreWrapper<>(context, queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
- store.getStore(),
+ store.store(),
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
index 7c336296b76..43efc9159d5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
@@ -108,7 +108,7 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped or versioned store");
}
- public StateStore getStore() {
+ public StateStore store() {
return store;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
index 3a5aee7d8e9..cb6e15be9fe 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
@@ -161,14 +161,14 @@ public class KeyValueStoreWrapperTest {
public void shouldGetTimestampedStore() {
givenWrapperWithTimestampedStore();
- assertThat(wrapper.getStore(), equalTo(timestampedStore));
+ assertThat(wrapper.store(), equalTo(timestampedStore));
}
@Test
public void shouldGetVersionedStore() {
givenWrapperWithVersionedStore();
- assertThat(wrapper.getStore(), equalTo(versionedStore));
+ assertThat(wrapper.store(), equalTo(versionedStore));
}
@Test