This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b9f1179 MINOR: clean up window store interface to avoid confusion
(#5359)
b9f1179 is described below
commit b9f11796944056a5b3c7440033d587d19290b3c3
Author: John Roesler <[email protected]>
AuthorDate: Sat Aug 4 15:57:18 2018 -0500
MINOR: clean up window store interface to avoid confusion (#5359)
Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck
<[email protected]>
---
.../apache/kafka/streams/kstream/internals/KStreamImpl.java | 13 ++-----------
.../kafka/streams/kstream/internals/KStreamJoinWindow.java | 13 +++----------
.../main/java/org/apache/kafka/streams/state/Stores.java | 10 +++++++++-
.../java/org/apache/kafka/streams/state/WindowStore.java | 11 ++++++++---
.../kafka/streams/state/internals/CachingWindowStore.java | 4 ++--
.../state/internals/ChangeLoggingWindowBytesStore.java | 6 +++---
.../kafka/streams/state/internals/MeteredWindowStore.java | 4 ++--
.../kafka/streams/state/internals/RocksDBWindowStore.java | 4 ++--
8 files changed, 31 insertions(+), 34 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 2b37f24..becb03d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -871,22 +871,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
final StoreBuilder<WindowStore<K1, V2>> otherWindowStore =
createWindowedStateStore(windows, joined.keySerde(),
joined.otherValueSerde(), joinOtherName + "-store");
-
- final KStreamJoinWindow<K1, V1> thisWindowedStream = new
KStreamJoinWindow<>(
- thisWindowStore.name(),
- windows.beforeMs + windows.afterMs + 1,
- windows.maintainMs()
- );
+ final KStreamJoinWindow<K1, V1> thisWindowedStream = new
KStreamJoinWindow<>(thisWindowStore.name());
final ProcessorParameters thisWindowStreamProcessorParams = new
ProcessorParameters(thisWindowedStream, thisWindowStreamName);
final ProcessorGraphNode<K1, V1> thisWindowedStreamsNode = new
ProcessorGraphNode<>(thisWindowStreamName, thisWindowStreamProcessorParams);
builder.addGraphNode(thisStreamsGraphNode,
thisWindowedStreamsNode);
- final KStreamJoinWindow<K1, V2> otherWindowedStream = new
KStreamJoinWindow<>(
- otherWindowStore.name(),
- windows.beforeMs + windows.afterMs + 1,
- windows.maintainMs()
- );
+ final KStreamJoinWindow<K1, V2> otherWindowedStream = new
KStreamJoinWindow<>(otherWindowStore.name());
final ProcessorParameters otherWindowStreamProcessorParams = new
ProcessorParameters(otherWindowedStream, otherWindowStreamName);
final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new
ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 895dab4..34756d4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -27,15 +26,8 @@ class KStreamJoinWindow<K, V> implements
ProcessorSupplier<K, V> {
private final String windowName;
- /**
- * @throws TopologyException if retention period of the join window is
less than expected
- */
- KStreamJoinWindow(final String windowName, final long windowSizeMs, final
long retentionPeriodMs) {
+ KStreamJoinWindow(final String windowName) {
this.windowName = windowName;
-
- if (windowSizeMs > retentionPeriodMs)
- throw new TopologyException("The retention period of the join
window "
- + windowName + " must be no smaller than its window
size.");
}
@Override
@@ -61,7 +53,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K,
V> {
// since it will never be considered for join operations
if (key != null) {
context().forward(key, value);
- window.put(key, value);
+ // Every record basically starts a new window. We're using a
window store mostly for the retention.
+ window.put(key, value, context().timestamp());
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 03eaa07..3bda28d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -144,7 +144,10 @@ public class Stores {
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store
(cannot be negative)
* @param numSegments number of db segments (cannot be zero or
negative)
- * @param windowSize size of the windows (cannot be negative)
+ * @param windowSize size of the windows that are stored
(cannot be negative). Note: the window size
+ * is not stored with the records, so this
value is used to compute the keys that
+ * the store returns. No effort is made to
validate this parameter, so you must be
+ * careful to set it the same as the windowed
keys you're actually storing.
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String,
long, long, boolean, long)} instead
@@ -211,6 +214,11 @@ public class Stores {
if (segmentInterval < 1L) {
throw new IllegalArgumentException("segmentInterval cannot be zero
or negative");
}
+ if (windowSize > retentionPeriod) {
+ throw new IllegalArgumentException("The retention period of the
window store "
+ + name + " must be no
smaller than its window size. Got size=["
+ + windowSize + "],
retention=[" + retentionPeriod + "]");
+ }
return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod,
segmentInterval, windowSize, retainDuplicates);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index ee4c19a..1685123 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -27,8 +27,12 @@ import org.apache.kafka.streams.processor.StateStore;
public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K,
V> {
/**
- * Put a key-value pair with the current record time as the timestamp
- * into the corresponding window
+ * Use the current record timestamp as the {@code windowStartTimestamp} and
+ * delegate to {@link WindowStore#put(Object, Object, long)}.
+ *
+ * It's highly recommended to use {@link WindowStore#put(Object, Object,
long)} instead, as the record timestamp
+ * is unlikely to be the correct windowStartTimestamp in general.
+ *
* @param key The key to associate the value to
* @param value The value to update, it can be null;
* if the serialized bytes are also null it is interpreted as
deletes
@@ -40,7 +44,8 @@ public interface WindowStore<K, V> extends StateStore,
ReadOnlyWindowStore<K, V>
* Put a key-value pair with the given timestamp into the corresponding
window
* @param key The key to associate the value to
* @param value The value; can be null
+ * @param windowStartTimestamp The timestamp of the beginning of the
window to put the key/value into
* @throws NullPointerException If null is used for key.
*/
- void put(K key, V value, long timestamp);
+ void put(K key, V value, long windowStartTimestamp);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 4347811..688e889 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -144,12 +144,12 @@ class CachingWindowStore<K, V> extends
WrappedStateStore.AbstractStateStore impl
}
@Override
- public synchronized void put(final Bytes key, final byte[] value, final
long timestamp) {
+ public synchronized void put(final Bytes key, final byte[] value, final
long windowStartTimestamp) {
// since this function may not access the underlying inner store, we
need to validate
// if store is open outside as well.
validateStoreOpen();
- final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key,
timestamp, 0);
+ final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key,
windowStartTimestamp, 0);
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index aa9cbe6..785aacd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -77,9 +77,9 @@ class ChangeLoggingWindowBytesStore extends
WrappedStateStore.AbstractStateStore
}
@Override
- public void put(final Bytes key, final byte[] value, final long timestamp)
{
- bytesStore.put(key, value, timestamp);
- changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key,
timestamp, maybeUpdateSeqnumForDups()), value);
+ public void put(final Bytes key, final byte[] value, final long
windowStartTimestamp) {
+ bytesStore.put(key, value, windowStartTimestamp);
+ changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key,
windowStartTimestamp, maybeUpdateSeqnumForDups()), value);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 5a27ed4..5162eac 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -108,10 +108,10 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
}
@Override
- public void put(final K key, final V value, final long timestamp) {
+ public void put(final K key, final V value, final long
windowStartTimestamp) {
final long startNs = time.nanoseconds();
try {
- inner.put(keyBytes(key), serdes.rawValue(value), timestamp);
+ inner.put(keyBytes(key), serdes.rawValue(value),
windowStartTimestamp);
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key, value);
throw new ProcessorStateException(message, e);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 4c0e01f..d7bb523 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -70,10 +70,10 @@ public class RocksDBWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
}
@Override
- public void put(final K key, final V value, final long timestamp) {
+ public void put(final K key, final V value, final long
windowStartTimestamp) {
maybeUpdateSeqnumForDups();
- bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, timestamp,
seqnum, serdes), serdes.rawValue(value));
+ bytesStore.put(WindowKeySchema.toStoreKeyBinary(key,
windowStartTimestamp, seqnum, serdes), serdes.rawValue(value));
}
@Override