This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b9f1179  MINOR: clean up window store interface to avoid confusion 
(#5359)
b9f1179 is described below

commit b9f11796944056a5b3c7440033d587d19290b3c3
Author: John Roesler <[email protected]>
AuthorDate: Sat Aug 4 15:57:18 2018 -0500

    MINOR: clean up window store interface to avoid confusion (#5359)
    
    Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck 
<[email protected]>
---
 .../apache/kafka/streams/kstream/internals/KStreamImpl.java | 13 ++-----------
 .../kafka/streams/kstream/internals/KStreamJoinWindow.java  | 13 +++----------
 .../main/java/org/apache/kafka/streams/state/Stores.java    | 10 +++++++++-
 .../java/org/apache/kafka/streams/state/WindowStore.java    | 11 ++++++++---
 .../kafka/streams/state/internals/CachingWindowStore.java   |  4 ++--
 .../state/internals/ChangeLoggingWindowBytesStore.java      |  6 +++---
 .../kafka/streams/state/internals/MeteredWindowStore.java   |  4 ++--
 .../kafka/streams/state/internals/RocksDBWindowStore.java   |  4 ++--
 8 files changed, 31 insertions(+), 34 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 2b37f24..becb03d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -871,22 +871,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
             final StoreBuilder<WindowStore<K1, V2>> otherWindowStore =
                 createWindowedStateStore(windows, joined.keySerde(), 
joined.otherValueSerde(), joinOtherName + "-store");
 
-
-            final KStreamJoinWindow<K1, V1> thisWindowedStream = new 
KStreamJoinWindow<>(
-                thisWindowStore.name(),
-                windows.beforeMs + windows.afterMs + 1,
-                windows.maintainMs()
-            );
+            final KStreamJoinWindow<K1, V1> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindowStore.name());
 
             final ProcessorParameters thisWindowStreamProcessorParams = new 
ProcessorParameters(thisWindowedStream, thisWindowStreamName);
             final ProcessorGraphNode<K1, V1> thisWindowedStreamsNode = new 
ProcessorGraphNode<>(thisWindowStreamName, thisWindowStreamProcessorParams);
             builder.addGraphNode(thisStreamsGraphNode, 
thisWindowedStreamsNode);
 
-            final KStreamJoinWindow<K1, V2> otherWindowedStream = new 
KStreamJoinWindow<>(
-                otherWindowStore.name(),
-                windows.beforeMs + windows.afterMs + 1,
-                windows.maintainMs()
-            );
+            final KStreamJoinWindow<K1, V2> otherWindowedStream = new 
KStreamJoinWindow<>(otherWindowStore.name());
 
             final ProcessorParameters otherWindowStreamProcessorParams = new 
ProcessorParameters(otherWindowedStream, otherWindowStreamName);
             final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 895dab4..34756d4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -27,15 +26,8 @@ class KStreamJoinWindow<K, V> implements 
ProcessorSupplier<K, V> {
 
     private final String windowName;
 
-    /**
-     * @throws TopologyException if retention period of the join window is 
less than expected
-     */
-    KStreamJoinWindow(final String windowName, final long windowSizeMs, final 
long retentionPeriodMs) {
+    KStreamJoinWindow(final String windowName) {
         this.windowName = windowName;
-
-        if (windowSizeMs > retentionPeriodMs)
-            throw new TopologyException("The retention period of the join 
window "
-                    + windowName + " must be no smaller than its window 
size.");
     }
 
     @Override
@@ -61,7 +53,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, 
V> {
             // since it will never be considered for join operations
             if (key != null) {
                 context().forward(key, value);
-                window.put(key, value);
+                // Every record basically starts a new window. We're using a 
window store mostly for the retention.
+                window.put(key, value, context().timestamp());
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 03eaa07..3bda28d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -144,7 +144,10 @@ public class Stores {
      * @param name                  name of the store (cannot be {@code null})
      * @param retentionPeriod       length of time to retain data in the store 
(cannot be negative)
      * @param numSegments           number of db segments (cannot be zero or 
negative)
-     * @param windowSize            size of the windows (cannot be negative)
+     * @param windowSize            size of the windows that are stored 
(cannot be negative). Note: the window size
+     *                              is not stored with the records, so this 
value is used to compute the keys that
+     *                              the store returns. No effort is made to 
validate this parameter, so you must be
+     *                              careful to set it the same as the windowed 
keys you're actually storing.
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, 
long, long, boolean, long)} instead
@@ -211,6 +214,11 @@ public class Stores {
         if (segmentInterval < 1L) {
             throw new IllegalArgumentException("segmentInterval cannot be zero 
or negative");
         }
+        if (windowSize > retentionPeriod) {
+            throw new IllegalArgumentException("The retention period of the 
window store "
+                                                   + name + " must be no 
smaller than its window size. Got size=["
+                                                   + windowSize + "], 
retention=[" + retentionPeriod + "]");
+        }
 
         return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, 
segmentInterval, windowSize, retainDuplicates);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index ee4c19a..1685123 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -27,8 +27,12 @@ import org.apache.kafka.streams.processor.StateStore;
 public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, 
V> {
 
     /**
-     * Put a key-value pair with the current record time as the timestamp
-     * into the corresponding window
+     * Use the current record timestamp as the {@code windowStartTimestamp} and
+     * delegate to {@link WindowStore#put(Object, Object, long)}.
+     *
+     * It's highly recommended to use {@link WindowStore#put(Object, Object, 
long)} instead, as the record timestamp
+     * is unlikely to be the correct windowStartTimestamp in general.
+     *
      * @param key The key to associate the value to
      * @param value The value to update, it can be null;
      *              if the serialized bytes are also null it is interpreted as 
deletes
@@ -40,7 +44,8 @@ public interface WindowStore<K, V> extends StateStore, 
ReadOnlyWindowStore<K, V>
      * Put a key-value pair with the given timestamp into the corresponding 
window
      * @param key The key to associate the value to
      * @param value The value; can be null
+     * @param windowStartTimestamp The timestamp of the beginning of the 
window to put the key/value into
      * @throws NullPointerException If null is used for key.
      */
-    void put(K key, V value, long timestamp);
+    void put(K key, V value, long windowStartTimestamp);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 4347811..688e889 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -144,12 +144,12 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
     }
 
     @Override
-    public synchronized void put(final Bytes key, final byte[] value, final 
long timestamp) {
+    public synchronized void put(final Bytes key, final byte[] value, final 
long windowStartTimestamp) {
         // since this function may not access the underlying inner store, we 
need to validate
         // if store is open outside as well.
         validateStoreOpen();
         
-        final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, 
timestamp, 0);
+        final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, 
windowStartTimestamp, 0);
         final LRUCacheEntry entry =
             new LRUCacheEntry(
                 value,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index aa9cbe6..785aacd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -77,9 +77,9 @@ class ChangeLoggingWindowBytesStore extends 
WrappedStateStore.AbstractStateStore
     }
 
     @Override
-    public void put(final Bytes key, final byte[] value, final long timestamp) 
{
-        bytesStore.put(key, value, timestamp);
-        changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, 
timestamp, maybeUpdateSeqnumForDups()), value);
+    public void put(final Bytes key, final byte[] value, final long 
windowStartTimestamp) {
+        bytesStore.put(key, value, windowStartTimestamp);
+        changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, 
windowStartTimestamp, maybeUpdateSeqnumForDups()), value);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 5a27ed4..5162eac 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -108,10 +108,10 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public void put(final K key, final V value, final long timestamp) {
+    public void put(final K key, final V value, final long 
windowStartTimestamp) {
         final long startNs = time.nanoseconds();
         try {
-            inner.put(keyBytes(key), serdes.rawValue(value), timestamp);
+            inner.put(keyBytes(key), serdes.rawValue(value), 
windowStartTimestamp);
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key, value);
             throw new ProcessorStateException(message, e);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 4c0e01f..d7bb523 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -70,10 +70,10 @@ public class RocksDBWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public void put(final K key, final V value, final long timestamp) {
+    public void put(final K key, final V value, final long 
windowStartTimestamp) {
         maybeUpdateSeqnumForDups();
 
-        bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, timestamp, 
seqnum, serdes), serdes.rawValue(value));
+        bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, 
windowStartTimestamp, seqnum, serdes), serdes.rawValue(value));
     }
 
     @Override

Reply via email to