This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4b46bb49047 KAFKA-12562: Fix KafkaStreams#store old references in
comments (#13774)
4b46bb49047 is described below
commit 4b46bb4904775c614b2ef4402b3d9777d968d706
Author: Milind Mantri <[email protected]>
AuthorDate: Thu Jun 1 12:46:00 2023 +0530
KAFKA-12562: Fix KafkaStreams#store old references in comments (#13774)
Following method was deprecated in 2.5 and was removed in 3.0.0.
// KafkaStreams.java
public <T> T store(final String storeName, final QueryableStoreType<T>
queryableStoreType);
However, many comments reference the removed method which can be confusing
in generated JavaDocs. The code in java doc comments has been changed to
reflect the new method, store(final StoreQueryParameters<T>
storeQueryParameters).
Also, minor changes to variable names in java doc to be context specific.
Reviewer: Bruno Cadonna <[email protected]>
---
.../org/apache/kafka/streams/StreamsBuilder.java | 9 ++++---
.../kafka/streams/kstream/CogroupedKStream.java | 12 ++++++---
.../kafka/streams/kstream/KGroupedStream.java | 18 ++++++++-----
.../kafka/streams/kstream/KGroupedTable.java | 18 ++++++++-----
.../org/apache/kafka/streams/kstream/KTable.java | 15 +++++++----
.../kstream/SessionWindowedCogroupedKStream.java | 6 +++--
.../streams/kstream/SessionWindowedKStream.java | 30 +++++++++++++---------
.../kstream/TimeWindowedCogroupedKStream.java | 7 ++---
.../kafka/streams/kstream/TimeWindowedKStream.java | 18 ++++++++-----
9 files changed, 86 insertions(+), 47 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index bed61271e9c..63d9b6fc525 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -243,7 +243,8 @@ public class StreamsBuilder {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -432,7 +433,8 @@ public class StreamsBuilder {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<V> valueForKey = localStore.get(key);
* }</pre>
@@ -476,7 +478,8 @@ public class StreamsBuilder {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<V> valueForKey = localStore.get(key);
* }</pre>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index b0f1deca1cb..ccb003ea755 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -86,7 +86,8 @@ public interface CogroupedKStream<K, VOut> {
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the
name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -136,7 +137,8 @@ public interface CogroupedKStream<K, VOut> {
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the
name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -187,7 +189,8 @@ public interface CogroupedKStream<K, VOut> {
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the
name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -240,7 +243,8 @@ public interface CogroupedKStream<K, VOut> {
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the
name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<VOut>> timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VOut>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 513d94dae65..486377addd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -120,7 +120,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* String queryableStoreName = "storeName"; // the store name should be
the name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<Long>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must
be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -166,7 +167,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* String queryableStoreName = "storeName"; // the store name should be
the name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<Long>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must
be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -270,7 +272,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* String queryableStoreName = "storeName" // the store name should be the
name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -334,7 +337,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* String queryableStoreName = "storeName" // the store name should be the
name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -439,7 +443,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the
name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<VR>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -498,7 +503,8 @@ public interface KGroupedStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
* String queryableStoreName = "storeName" // the store name should be the
name of the store as defined by the Materialized instance
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<String,
ValueAndTimestamp<VR>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore =
streams.store(storeQueryParams);
* K key = "some-key";
* ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 5733aef319c..1eb9be3a02c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -59,7 +59,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<Long>> timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must
be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -102,7 +103,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<Long>> timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must
be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -230,7 +232,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>
timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must
be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -303,7 +306,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>
timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must
be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -441,7 +445,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<VR>> timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key
must be local (application state is shared over all running Kafka Streams
instances)
* }</pre>
@@ -525,7 +530,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<VR>> timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key
must be local (application state is shared over all running Kafka Streams
instances)
* }</pre>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index cf7af720abe..e530769be4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -54,7 +54,8 @@ import java.util.function.Function;
* streams.start()
* ...
* final String queryableStoreName = table.queryableStoreName(); //
returns null if KTable is not queryable
- * ReadOnlyKeyValueStore view = streams.store(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * final StoreQueryParameters<ReadOnlyKeyValueStore<K,
ValueAndTimestamp<V>>> storeQueryParams =
StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> view =
streams.store(storeQueryParams);
* view.get(key);
*}</pre>
*<p>
@@ -135,7 +136,8 @@ public interface KTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -174,7 +176,8 @@ public interface KTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -260,7 +263,8 @@ public interface KTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -298,7 +302,8 @@ public interface KTable<K, V> {
* {@link KafkaStreams#store(StoreQueryParameters)
KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
- * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedKeyValueStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedKeyValueStore());
+ * ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore =
streams.store(storeQueryParams);
* K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be
local (application state is shared over all running Kafka Streams instances)
* }</pre>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
index eeeb3e1a036..633372f788a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
@@ -174,7 +174,8 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlySessionStore<String,Long> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<String,
Long>sessionStore());
+ * StoreQueryParameters<ReadOnlySessionStore<String, Long>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.sessionStore());
+ * ReadOnlySessionStore<String,Long> localWindowStore =
streams.store(storeQueryParams);
*
* String key = "some-word";
* long fromTime = ...;
@@ -234,7 +235,8 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type
double
* Sting queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlySessionStore<String, Long> sessionStore =
streams.store(queryableStoreName, QueryableStoreTypes.<String,
Long>sessionStore());
+ * StoreQueryParameters<ReadOnlySessionStore<String, Long>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.sessionStore());
+ * ReadOnlySessionStore<String,Long> localWindowStore =
streams.store(storeQueryParams);
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession =
localWindowStore.fetch(key); // key must be local (application state is shared
over all running Kafka Streams instances)
* }</pre>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index bd8e52a3cfa..3dc22e682f1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -134,9 +134,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlySessionStore<String,Long> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<String,
Long>ReadOnlySessionStore<String, Long>);
+ * StoreQueryParameters<ReadOnlySessionStore<String, Long>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.sessionStore());
+ * ReadOnlySessionStore<String,Long> sessionStore =
streams.store(storeQueryParams);
* String key = "some-key";
- * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows =
localWindowStore.fetch(key); // key must be local (application state is shared
over all running Kafka Streams instances)
+ * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows =
sessionStore.fetch(key); // key must be local (application state is shared over
all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using
{@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka
Streams application.
@@ -180,9 +181,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlySessionStore<String,Long> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<String,
Long>ReadOnlySessionStore<String, Long>);
+ * StoreQueryParameters<ReadOnlySessionStore<String, Long>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.sessionStore());
+ * ReadOnlySessionStore<String,Long> sessionStore =
streams.store(storeQueryParams);
* String key = "some-key";
- * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows =
localWindowStore.fetch(key); // key must be local (application state is shared
over all running Kafka Streams instances)
+ * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows =
sessionStore.fetch(key); // key must be local (application state is shared over
all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using
{@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka
Streams application.
@@ -338,9 +340,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type
double
* Sting queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlySessionStore<String, Long> sessionStore =
streams.store(queryableStoreName, QueryableStoreTypes.<String,
Long>sessionStore());
+ * StoreQueryParameters<ReadOnlySessionStore<String, Long>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.sessionStore());
+ * ReadOnlySessionStore<String,Long> sessionStore =
streams.store(storeQueryParams);
* String key = "some-key";
- * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession =
localWindowStore.fetch(key); // key must be local (application state is shared
over all running Kafka Streams instances)
+ * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession =
sessionStore.fetch(key); // key must be local (application state is shared over
all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using
{@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka
Streams application.
@@ -399,9 +402,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type
double
* Sting queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlySessionStore<String, Long> sessionStore =
streams.store(queryableStoreName, QueryableStoreTypes.<String,
Long>sessionStore());
+ * StoreQueryParameters<ReadOnlySessionStore<String, Long>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.sessionStore());
+ * ReadOnlySessionStore<String,Long> sessionStore =
streams.store(storeQueryParams);
* String key = "some-key";
- * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession =
localWindowStore.fetch(key); // key must be local (application state is shared
over all running Kafka Streams instances)
+ * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession =
sessionStore.fetch(key); // key must be local (application state is shared over
all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using
{@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka
Streams application.
@@ -557,9 +561,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlySessionStore<String,Long> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<String,
Long>ReadOnlySessionStore<String, Long>);
+ * StoreQueryParameters<ReadOnlySessionStore<String, Long>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.sessionStore());
+ * ReadOnlySessionStore<String,Long> sessionStore =
streams.store(storeQueryParams);
* String key = "some-key";
- * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows =
localWindowStore.fetch(key); // key must be local (application state is shared
over all running Kafka Streams instances)
+ * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows =
sessionStore.fetch(key); // key must be local (application state is shared over
all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using
{@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka
Streams application.
@@ -617,9 +622,10 @@ public interface SessionWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // compute sum
* Sting queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlySessionStore<String,Long> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<String,
Long>ReadOnlySessionStore<String, Long>);
+ * StoreQueryParameters<ReadOnlySessionStore<String, Long>>
storeQueryParams =
StoreQueryParameters.fromNameAndType(QueryableStoreTypes.sessionStore());
+ * ReadOnlySessionStore<String,Long> sessionStore =
streams.store(storeQueryParams);
* String key = "some-key";
- * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows =
localWindowStore.fetch(key); // key must be local (application state is shared
over all running Kafka Streams instances)
+ * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows =
sessionStore.fetch(key); // key must be local (application state is shared over
all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using
{@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka
Streams application.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
index a46da057a08..582f7f57b77 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
@@ -164,8 +164,8 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedWindowStore());
- *
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedWindowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore =
streams.store(storeQueryParams);
* K key = "some-word";
* long fromTime = ...;
* long toTime = ...;
@@ -221,7 +221,8 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedWindowStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedWindowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore =
streams.store(storeQueryParams);
*
* K key = "some-word";
* long fromTime = ...;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index a6c7c1f4b50..fc16bd54460 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -134,7 +134,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<Long>>timestampedWindowStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedWindowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore =
streams.store(storeQueryParams);
*
* K key = "some-word";
* long fromTime = ...;
@@ -183,7 +184,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<Long>>timestampedWindowStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedWindowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> localWindowStore =
streams.store(storeQueryParams);
*
* K key = "some-word";
* long fromTime = ...;
@@ -334,7 +336,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<VR>>timestampedWindowStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedWindowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore =
streams.store(storeQueryParams);
*
* K key = "some-word";
* long fromTime = ...;
@@ -395,7 +398,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<VR>>timestampedWindowStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedWindowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<VR>> localWindowStore =
streams.store(storeQueryParams);
*
* K key = "some-word";
* long fromTime = ...;
@@ -555,7 +559,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedWindowStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedWindowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore =
streams.store(storeQueryParams);
*
* K key = "some-word";
* long fromTime = ...;
@@ -618,7 +623,8 @@ public interface TimeWindowedKStream<K, V> {
* <pre>{@code
* KafkaStreams streams = ... // counting words
* Store queryableStoreName = ... // the queryableStoreName should be the
name of the store as defined by the Materialized instance
- * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore =
streams.store(queryableStoreName, QueryableStoreTypes.<K,
ValueAndTimestamp<V>>timestampedWindowStore());
+ * StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName,
QueryableStoreTypes.timestampedWindowStore());
+ * ReadOnlyWindowStore<K, ValueAndTimestamp<V>> localWindowStore =
streams.store(storeQueryParams);
*
* K key = "some-word";
* long fromTime = ...;