This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ababc4261b [9/N][Emit final] Emit final for session window aggregations (#12204) ababc4261b is described below commit ababc4261bfa03ee9d29ae7254ddd0ba988f826d Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Wed Jun 29 09:22:37 2022 -0700 [9/N][Emit final] Emit final for session window aggregations (#12204) * Add a new API for session windows to range query session window by end time (KIP related). * Augment session window aggregator with emit strategy. * Minor: consolidated some dup classes. * Test: unit test on session window aggregator. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../streams/kstream/SessionWindowedKStream.java | 5 +- .../kafka/streams/kstream/TimeWindowedKStream.java | 1 + ...bstractKStreamTimeWindowAggregateProcessor.java | 11 +- .../internals/CogroupedStreamAggregateBuilder.java | 2 + .../internals/KStreamSessionWindowAggregate.java | 272 ++++++++++++++++----- .../kstream/internals/KStreamWindowAggregate.java | 7 - .../kstream/internals/SessionTupleForwarder.java | 56 ----- .../internals/SessionWindowedKStreamImpl.java | 29 ++- .../kstream/internals/TimeWindowedKStreamImpl.java | 55 +++-- .../internals/TimestampedTupleForwarder.java | 3 +- .../internals/AbstractReadWriteDecorator.java | 6 + .../apache/kafka/streams/state/SessionStore.java | 13 + ...tractRocksDBTimeOrderedSegmentedBytesStore.java | 6 +- .../internals/ChangeLoggingSessionBytesStore.java | 12 +- .../state/internals/InMemorySessionStore.java | 21 +- .../state/internals/MeteredSessionStore.java | 12 + .../state/internals/PrefixedSessionKeySchemas.java | 13 +- ...cksDBTimeOrderedSessionSegmentedBytesStore.java | 33 ++- .../internals/RocksDBTimeOrderedSessionStore.java | 7 + .../streams/state/internals/SegmentIterator.java | 2 +- .../state/internals/SegmentedBytesStore.java | 4 +- .../streams/state/internals/SessionKeySchema.java | 2 +- ...KStreamSessionWindowAggregateProcessorTest.java | 219 ++++++++++++----- .../internals/KStreamWindowAggregateTest.java | 2 +- .../internals/SessionTupleForwarderTest.java | 108 -------- .../internals/SessionWindowedKStreamImplTest.java | 171 +++++++++---- .../internals/TimeWindowedKStreamImplTest.java | 2 +- .../internals/graph/GraphGraceSearchUtilTest.java | 8 +- 28 files changed, 676 insertions(+), 406 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java index c561b62abf..fe897515a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java @@ -39,7 +39,7 @@ import java.time.Duration; * materialized view) that can be queried using the name provided in the {@link Materialized} instance. * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. - * New events are added to sessions until their grace period ends (see {@link SessionWindows#grace(Duration)}). + * New events are added to sessions until their grace period ends (see {@link SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}). * <p> * A {@code SessionWindowedKStream} must be obtained from a {@link KGroupedStream} via * {@link KGroupedStream#windowedBy(SessionWindows)}. @@ -643,4 +643,7 @@ public interface SessionWindowedKStream<K, V> { KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Named named, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized); + + // TODO: add javadoc + SessionWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java index 46ebd267f9..3f36838f20 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java @@ -649,5 +649,6 @@ public interface TimeWindowedKStream<K, V> { final Named named, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); + // TODO: add javadoc TimeWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java index d39dad1f79..a081a280ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java @@ -73,11 +73,10 @@ public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context; final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); final String threadId = Thread.currentThread().getName(); + final String processorName = internalProcessorContext.currentNode().name(); droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); - emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), - internalProcessorContext.currentNode().name(), metrics); - emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), - internalProcessorContext.currentNode().name(), metrics); + emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), processorName, metrics); + emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), processorName, metrics); windowStore = context.getStateStore(storeName); if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { @@ -175,7 +174,7 @@ public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg observedStreamTime = Math.max(observedStreamTime, timestamp); } - private boolean shouldEmitFinal(final long closeTime) { + private boolean shouldEmitFinal(final long windowCloseTime) { if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { return false; } @@ -192,7 +191,7 @@ public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg timeTracker.advanceNextTimeToEmit(); // Only EMIT if the window close time does progress - return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || lastEmitWindowCloseTime < closeTime; + return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || lastEmitWindowCloseTime < windowCloseTime; } private void fetchAndEmit(final Record<KIn, VIn> record, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index 17dd413c45..3adc8beec8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -100,6 +100,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> { (KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamWindowAggregate<K, K, VOut, W>( windows, storeBuilder.name(), + EmitStrategy.onWindowUpdate(), initializer, kGroupedStream.getValue()); parentProcessors.add(parentProcessor); @@ -138,6 +139,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> { (KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamSessionWindowAggregate<K, K, VOut>( sessionWindows, storeBuilder.name(), + EmitStrategy.onWindowUpdate(), initializer, kGroupedStream.getValue(), sessionMerger); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index eff7ac327a..f8252358b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -18,8 +18,11 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Merger; import org.apache.kafka.streams.kstream.SessionWindows; @@ -29,6 +32,7 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -39,6 +43,9 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; +import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupplier<KIn, VIn, Windowed<KIn>, VAgg> { @@ -50,16 +57,19 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg private final Initializer<VAgg> initializer; private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator; private final Merger<? super KIn, VAgg> sessionMerger; + private final EmitStrategy emitStrategy; private boolean sendOldValues = false; public KStreamSessionWindowAggregate(final SessionWindows windows, - final String storeName, - final Initializer<VAgg> initializer, - final Aggregator<? super KIn, ? super VIn, VAgg> aggregator, - final Merger<? super KIn, VAgg> sessionMerger) { + final String storeName, + final EmitStrategy emitStrategy, + final Initializer<VAgg> initializer, + final Aggregator<? super KIn, ? super VIn, VAgg> aggregator, + final Merger<? super KIn, VAgg> sessionMerger) { this.windows = windows; this.storeName = storeName; + this.emitStrategy = emitStrategy; this.initializer = initializer; this.aggregator = aggregator; this.sessionMerger = sessionMerger; @@ -83,24 +93,50 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> { private SessionStore<KIn, VAgg> store; - private SessionTupleForwarder<KIn, VAgg> tupleForwarder; + private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder; private Sensor droppedRecordsSensor; + private Sensor emittedRecordsSensor; + private Sensor emitFinalLatencySensor; + private long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext; + + private final Time time = Time.SYSTEM; + protected final KStreamImplJoin.TimeTracker timeTracker = new KStreamImplJoin.TimeTracker(); @Override public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) { super.init(context); + internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context; final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); final String threadId = Thread.currentThread().getName(); - droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), - metrics); + final String processorName = internalProcessorContext.currentNode().name(); + droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); + emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), processorName, metrics); + emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), processorName, metrics); store = context.getStateStore(storeName); - tupleForwarder = new SessionTupleForwarder<>( - store, - context, - new SessionCacheFlushListener<>(context), - sendOldValues - ); + + if (emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { + // Restore last emit close time for ON_WINDOW_CLOSE strategy + final Long lastEmitWindowCloseTime = internalProcessorContext.processorMetadataForKey(storeName); + if (lastEmitWindowCloseTime != null) { + this.lastEmitWindowCloseTime = lastEmitWindowCloseTime; + } + final long emitInterval = StreamsConfig.InternalConfig.getLong( + context.appConfigs(), + EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, + 1000L + ); + timeTracker.setEmitInterval(emitInterval); + + tupleForwarder = new TimestampedTupleForwarder<>(context, sendOldValues); + } else { + tupleForwarder = new TimestampedTupleForwarder<>( + store, + context, + new SessionCacheFlushListener<>(context), + sendOldValues); + } } @Override @@ -108,25 +144,13 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg // if the key is null, we do not need proceed aggregating // the record with the table if (record.key() == null) { - if (context().recordMetadata().isPresent()) { - final RecordMetadata recordMetadata = context().recordMetadata().get(); - LOG.warn( - "Skipping record due to null key. " - + "topic=[{}] partition=[{}] offset=[{}]", - recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() - ); - } else { - LOG.warn( - "Skipping record due to null key. Topic, partition, and offset not known." - ); - } - droppedRecordsSensor.record(); + logSkippedRecordForNullKey(); return; } final long timestamp = record.timestamp(); observedStreamTime = Math.max(observedStreamTime, timestamp); - final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap(); + final long windowCloseTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap(); final List<KeyValue<Windowed<KIn>, VAgg>> merged = new ArrayList<>(); final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp); @@ -148,55 +172,174 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg } } - if (mergedWindow.end() < closeTime) { - if (context().recordMetadata().isPresent()) { - final RecordMetadata recordMetadata = context().recordMetadata().get(); - LOG.warn( - "Skipping record for expired window. " + - "topic=[{}] " + - "partition=[{}] " + - "offset=[{}] " + - "timestamp=[{}] " + - "window=[{},{}] " + - "expiration=[{}] " + - "streamTime=[{}]", - recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), - timestamp, - mergedWindow.start(), mergedWindow.end(), - closeTime, - observedStreamTime - ); - } else { - LOG.warn( - "Skipping record for expired window. Topic, partition, and offset not known. " + - "timestamp=[{}] " + - "window=[{},{}] " + - "expiration=[{}] " + - "streamTime=[{}]", - timestamp, - mergedWindow.start(), mergedWindow.end(), - closeTime, - observedStreamTime - ); - } - droppedRecordsSensor.record(); + if (mergedWindow.end() < windowCloseTime) { + logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, mergedWindow); } else { if (!mergedWindow.equals(newSessionWindow)) { for (final KeyValue<Windowed<KIn>, VAgg> session : merged) { store.remove(session.key); - tupleForwarder.maybeForward( - record.withKey(session.key) - .withValue(new Change<>(null, session.value))); + + maybeForwardUpdate(session.key, session.value, null); } } agg = aggregator.apply(record.key(), record.value(), agg); final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow); store.put(sessionKey, agg); + + maybeForwardUpdate(sessionKey, null, agg); + } + + maybeForwardFinalResult(record, windowCloseTime); + } + + private void maybeForwardUpdate(final Windowed<KIn> windowedkey, + final VAgg oldAgg, + final VAgg newAgg) { + if (emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { + return; + } + + // Update the sent record timestamp to the window end time if possible + final long newTimestamp = windowedkey.window().end(); + tupleForwarder.maybeForward(new Record<>(windowedkey, new Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp)); + } + + // TODO: consolidate SessionWindow with TimeWindow to merge common functions + private void maybeForwardFinalResult(final Record<KIn, VIn> record, final long windowCloseTime) { + if (shouldEmitFinal(windowCloseTime)) { + final long emitRangeUpperBound = emitRangeUpperBound(windowCloseTime); + + // if the upper bound is smaller than 0, then there's no window closed ever; + // and we can skip range fetching + if (emitRangeUpperBound >= 0) { + final long emitRangeLowerBound = emitRangeLowerBound(); + + if (shouldRangeFetch(emitRangeLowerBound, emitRangeUpperBound)) { + fetchAndEmit(record, windowCloseTime, emitRangeLowerBound, emitRangeUpperBound); + } + } + } + } + + private boolean shouldEmitFinal(final long windowCloseTime) { + if (emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { + return false; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return false; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered every time + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Only EMIT if the window close time does progress + return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || lastEmitWindowCloseTime < windowCloseTime; + } + + private long emitRangeLowerBound() { + return Math.max(0L, lastEmitWindowCloseTime); + } + + private long emitRangeUpperBound(final long windowCloseTime) { + // Session window's start and end timestamps are inclusive, so + // we should minus 1 for the inclusive closed window-end upper bound + return windowCloseTime - 1; + } + + private boolean shouldRangeFetch(final long emitRangeLowerBound, final long emitRangeUpperBound) { + // since a session window could be a single point (i.e. [t, t]), + // we need to range fetch and emit even if the upper and lower bound are the same + return emitRangeUpperBound >= emitRangeLowerBound; + } + + private void fetchAndEmit(final Record<KIn, VIn> record, + final long windowCloseTime, + final long emitRangeLowerBound, + final long emitRangeUpperBound) { + final long startMs = time.milliseconds(); + + // Only time ordered (indexed) session store should have implemented + // this function, otherwise a not-supported exception would throw + final KeyValueIterator<Windowed<KIn>, VAgg> windowToEmit = store + .findSessions(emitRangeLowerBound, emitRangeUpperBound); + + int emittedCount = 0; + while (windowToEmit.hasNext()) { + emittedCount++; + final KeyValue<Windowed<KIn>, VAgg> kv = windowToEmit.next(); + tupleForwarder.maybeForward( - record.withKey(sessionKey) - .withValue(new Change<>(agg, null))); + record.withKey(kv.key) + .withValue(new Change<>(kv.value, null)) + // set the timestamp as the window end timestamp + .withTimestamp(kv.key.window().end()) + .withHeaders(record.headers())); } + emittedRecordsSensor.record(emittedCount); + emitFinalLatencySensor.record(time.milliseconds() - startMs); + + lastEmitWindowCloseTime = windowCloseTime; + internalProcessorContext.addProcessorMetadataKeyValue(storeName, windowCloseTime); + } + + private void logSkippedRecordForNullKey() { + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + LOG.warn( + "Skipping record due to null key. " + + "topic=[{}] partition=[{}] offset=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() + ); + } else { + LOG.warn( + "Skipping record due to null key. Topic, partition, and offset not known." + ); + } + droppedRecordsSensor.record(); + } + + private void logSkippedRecordForExpiredWindow(final long timestamp, + final long windowExpire, + final SessionWindow window) { + final String windowString = "[" + window.start() + "," + window.end() + "]"; + + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + LOG.warn("Skipping record for expired window. " + + "topic=[{}] " + + "partition=[{}] " + + "offset=[{}] " + + "timestamp=[{}] " + + "window={} " + + "expiration=[{}] " + + "streamTime=[{}]", + recordMetadata.topic(), + recordMetadata.partition(), + recordMetadata.offset(), + timestamp, + windowString, + windowExpire, + observedStreamTime + ); + } else { + LOG.warn("Skipping record for expired window. Topic, partition, and offset not known. " + + "timestamp=[{}] " + + "window={} " + + "expiration=[{}] " + + "streamTime=[{}]", + timestamp, + windowString, + windowExpire, + observedStreamTime + ); + } + droppedRecordsSensor.record(); } } @@ -237,5 +380,4 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg key.window().end()); } } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 81687cb9e0..561524f87e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -49,13 +49,6 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements private boolean sendOldValues = false; - public KStreamWindowAggregate(final Windows<W> windows, - final String storeName, - final Initializer<VAgg> initializer, - final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) { - this(windows, storeName, EmitStrategy.onWindowUpdate(), initializer, aggregator); - } - public KStreamWindowAggregate(final Windows<W> windows, final String storeName, final EmitStrategy emitStrategy, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java deleted file mode 100644 index e1c302f875..0000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.api.ProcessorContext; -import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.state.internals.CacheFlushListener; -import org.apache.kafka.streams.state.internals.WrappedStateStore; - -/** - * This class is used to determine if a processor should forward values to child nodes. - * Forwarding by this class only occurs when caching is not enabled. If caching is enabled, - * forwarding occurs in the flush listener when the cached store flushes. - * - * @param <K> - * @param <V> - */ -class SessionTupleForwarder<K, V> { - private final ProcessorContext<Windowed<K>, Change<V>> context; - private final boolean sendOldValues; - private final boolean cachingEnabled; - - @SuppressWarnings("unchecked") - SessionTupleForwarder(final StateStore store, - final ProcessorContext<Windowed<K>, Change<V>> context, - final CacheFlushListener<Windowed<K>, V> flushListener, - final boolean sendOldValues) { - this.context = context; - this.sendOldValues = sendOldValues; - cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); - } - - public void maybeForward(final Record<Windowed<K>, Change<V>> record) { - if (!cachingEnabled) { - context.forward( - record.withValue(new Change<>(record.value().newValue, sendOldValues ? record.value().oldValue : null)) - .withTimestamp(record.key() != null ? record.key().window().end() : record.timestamp())); - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index f18c6ef568..c3b05cb118 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -35,6 +36,7 @@ import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier; import java.time.Duration; import java.util.Objects; @@ -48,6 +50,8 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder; private final Merger<K, Long> countMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo; + private EmitStrategy emitStrategy = EmitStrategy.onWindowUpdate(); + SessionWindowedKStreamImpl(final SessionWindows windows, final InternalStreamsBuilder builder, final Set<String> subTopologySourceNodes, @@ -90,6 +94,12 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple return doCount(named, materialized); } + @Override + public SessionWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy) { + this.emitStrategy = emitStrategy; + return this; + } + private KTable<Windowed<K>, Long> doCount(final Named named, final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) { final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal = @@ -109,6 +119,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple new KStreamSessionWindowAggregate<>( windows, materializedInternal.storeName(), + emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, countMerger), @@ -157,6 +168,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple new KStreamSessionWindowAggregate<>( windows, materializedInternal.storeName(), + emitStrategy, aggregateBuilder.reduceInitializer, reduceAggregator, mergerForAggregator(reduceAggregator) @@ -214,6 +226,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple new KStreamSessionWindowAggregate<>( windows, materializedInternal.storeName(), + emitStrategy, initializer, aggregator, sessionMerger), @@ -246,10 +259,15 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple ); break; case ROCKS_DB: - supplier = Stores.persistentSessionStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod) - ); + supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ? + new RocksDbTimeOrderedSessionBytesStoreSupplier( + materialized.storeName(), + retentionPeriod, + true) : + Stores.persistentSessionStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod) + ); break; default: throw new IllegalStateException("Unknown store type: " + materialized.storeType()); @@ -268,7 +286,8 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple builder.withLoggingDisabled(); } - if (materialized.cachingEnabled()) { + // do not enable cache if the emit final strategy is used + if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { builder.withCachingEnabled(); } return builder; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index 1f2c732878..c07b783978 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -110,14 +110,17 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); return aggregateBuilder.build( - new NamedInternal(aggregateName), - materialize(materializedInternal), - new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), - materializedInternal.queryableStoreName(), - materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null, - materializedInternal.valueSerde()); - - + new NamedInternal(aggregateName), + materialize(materializedInternal), + new KStreamWindowAggregate<>( + windows, + materializedInternal.storeName(), + emitStrategy, + aggregateBuilder.countInitializer, + aggregateBuilder.countAggregator), + materializedInternal.queryableStoreName(), + materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null, + materializedInternal.valueSerde()); } @Override @@ -158,14 +161,17 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); return aggregateBuilder.build( - new NamedInternal(aggregateName), - materialize(materializedInternal), - new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, initializer, aggregator), - materializedInternal.queryableStoreName(), - materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null, - materializedInternal.valueSerde()); - - + new NamedInternal(aggregateName), + materialize(materializedInternal), + new KStreamWindowAggregate<>( + windows, + materializedInternal.storeName(), + emitStrategy, + initializer, + aggregator), + materializedInternal.queryableStoreName(), + materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null, + materializedInternal.valueSerde()); } @Override @@ -205,12 +211,17 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); return aggregateBuilder.build( - new NamedInternal(reduceName), - materialize(materializedInternal), - new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)), - materializedInternal.queryableStoreName(), - materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null, - materializedInternal.valueSerde()); + new NamedInternal(reduceName), + materialize(materializedInternal), + new KStreamWindowAggregate<>( + windows, + materializedInternal.storeName(), + emitStrategy, + aggregateBuilder.reduceInitializer, + aggregatorForReducer(reducer)), + materializedInternal.queryableStoreName(), + materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null, + materializedInternal.valueSerde()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java index 34acbd99bd..bc686ada72 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.state.internals.CacheFlushListener; import org.apache.kafka.streams.state.internals.WrappedStateStore; /** @@ -38,7 +39,7 @@ class TimestampedTupleForwarder<K, V> { @SuppressWarnings({"unchecked", "rawtypes"}) TimestampedTupleForwarder(final StateStore store, final ProcessorContext<K, Change<V>> context, - final TimestampedCacheFlushListener<K, V> flushListener, + final CacheFlushListener<K, ?> flushListener, final boolean sendOldValues) { this.context = (InternalProcessorContext<K, Change<V>>) context; this.sendOldValues = sendOldValues; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index aff099af5f..3c7f70ea07 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -259,6 +259,12 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); } + @Override + public KeyValueIterator<Windowed<K>, AGG> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + return wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime); + } + @Override public void remove(final Windowed<K> sessionKey) { wrapped().remove(sessionKey); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index cbc1cc5b96..76a4317394 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -39,6 +39,19 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail */ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> { + /** + * Return all the session window entries that ends between the specified range (both ends are inclusive). + * This function would be used to retrieve all closed and immutable windows. + * + * @param earliestSessionEndTime earliest session end time to search from, inclusive + * @param latestSessionEndTime latest session end time to search to, inclusive + */ + default KeyValueIterator<Windowed<K>, AGG> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of SessionStore."); + } + @Override default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java index f7216412f0..0398f0ca06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java @@ -142,8 +142,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst final long to, final boolean forward) { if (indexKeySchema.isPresent()) { - final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to, - forward); + final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to, forward); final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, from); final Bytes binaryTo = indexKeySchema.get().upperRangeFixedSize(key, to); @@ -156,8 +155,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst forward)); } - final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, from, to, - forward); + final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, from, to, forward); final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, from); final Bytes binaryTo = baseKeySchema.upperRangeFixedSize(key, to); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index ff387ef38e..fd32798801 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -31,9 +31,9 @@ import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils * Simple wrapper around a {@link SessionStore} to support writing * updates to a changelog */ -class ChangeLoggingSessionBytesStore - extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]> - implements SessionStore<Bytes, byte[]> { +public class ChangeLoggingSessionBytesStore + extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]> + implements SessionStore<Bytes, byte[]> { private InternalProcessorContext context; @@ -95,6 +95,12 @@ class ChangeLoggingSessionBytesStore return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + return wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime); + } + @Override public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) { return wrapped().backwardFetch(key); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 97984dd156..579abc3678 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -202,25 +202,36 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { @Override public byte[] fetchSession(final Bytes key, - final long earliestSessionEndTime, - final long latestSessionStartTime) { + final long sessionStartTime, + final long sessionEndTime) { removeExpiredSegments(); Objects.requireNonNull(key, "key cannot be null"); // Only need to search if the record hasn't expired yet - if (latestSessionStartTime > observedStreamTime - retentionPeriod) { - final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(latestSessionStartTime); + if (sessionEndTime > observedStreamTime - retentionPeriod) { + final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(sessionEndTime); if (keyMap != null) { final ConcurrentNavigableMap<Long, byte[]> startTimeMap = keyMap.get(key); if (startTimeMap != null) { - return startTimeMap.get(earliestSessionEndTime); + return startTimeMap.get(sessionStartTime); } } } return null; } + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + removeExpiredSegments(); + + final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>> endTimSubMap + = endTimeMap.subMap(earliestSessionEndTime, true, latestSessionEndTime, true); + + return registerNewIterator(null, null, Long.MAX_VALUE, endTimSubMap.entrySet().iterator(), true); + } + @Override public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index ad75e35e7a..bc4f2169b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -346,6 +346,18 @@ public class MeteredSessionStore<K, V> time); } + @Override + public KeyValueIterator<Windowed<K>, V> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + return new MeteredWindowedKeyValueIterator<>( + wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime), + fetchSensor, + streamsMetrics, + serdes::keyFrom, + serdes::valueFrom, + time); + } + @Override public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K keyFrom, final K keyTo, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java index c98ae83390..2ac25277ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java @@ -77,10 +77,8 @@ public class PrefixedSessionKeySchemas { } /** - * * @param key the key in the range * @param to the latest start time - * @return */ @Override public Bytes upperRangeFixedSize(final Bytes key, final long to) { @@ -88,10 +86,8 @@ public class PrefixedSessionKeySchemas { } /** - * * @param key the key in the range * @param from the earliest end timestamp in the range - * @return */ @Override public Bytes lowerRangeFixedSize(final Bytes key, final long from) { @@ -105,7 +101,10 @@ public class PrefixedSessionKeySchemas { @Override public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, - final Bytes binaryKeyTo, final long from, final long to, final boolean forward) { + final Bytes binaryKeyTo, + final long from, + final long to, + final boolean forward) { return iterator -> { while (iterator.hasNext()) { final Bytes bytes = iterator.peekNextKey(); @@ -204,7 +203,9 @@ public class PrefixedSessionKeySchemas { final long endTime) { buf.putLong(endTime); buf.putLong(startTime); - buf.put(key.get()); + if (key != null) { + buf.put(key.get()); + } } public static Bytes toBinary(final Bytes key, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java index 4265150eb9..172d321881 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -38,7 +39,7 @@ import org.rocksdb.WriteBatch; */ public class RocksDBTimeOrderedSessionSegmentedBytesStore extends AbstractRocksDBTimeOrderedSegmentedBytesStore { - private class SessionKeySchemaIndexToBaseStoreIterator extends IndexToBaseStoreIterator { + private class SessionKeySchemaIndexToBaseStoreIterator extends IndexToBaseStoreIterator { SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, byte[]> indexIterator) { super(indexIterator); } @@ -71,6 +72,36 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore extends AbstractRocksD )); } + public KeyValueIterator<Bytes, byte[]> fetchSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + final List<KeyValueSegment> searchSpace = segments.segments(earliestSessionEndTime, latestSessionEndTime, true); + + // here we want [0, latestSE, FF] as the upper bound to cover any possible keys, + // but since we can only get upper bound based on timestamps, we use a slight larger upper bound as [0, latestSE+1] + final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(null, earliestSessionEndTime); + final Bytes binaryTo = baseKeySchema.lowerRangeFixedSize(null, latestSessionEndTime + 1); + + return new SegmentIterator<>( + searchSpace.iterator(), + iterator -> { + while (iterator.hasNext()) { + final Bytes bytes = iterator.peekNextKey(); + + final Windowed<Bytes> windowedKey = TimeFirstSessionKeySchema.from(bytes); + final long endTime = windowedKey.window().end(); + + if (endTime <= latestSessionEndTime && endTime >= earliestSessionEndTime) { + return true; + } + iterator.next(); + } + return false; + }, + binaryFrom, + binaryTo, + true); + } + public void remove(final Windowed<Bytes> key) { remove(TimeFirstSessionKeySchema.toBinary(key)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java index 5b72163757..deb6028ef6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java @@ -61,6 +61,13 @@ public class RocksDBTimeOrderedSessionStore ); } + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchSessions(earliestSessionEndTime, latestSessionEndTime); + return new WrappedSessionStoreIterator(bytesIterator, TimeFirstSessionKeySchema::from); + } + @Override public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java index 6191c49888..9aabc787c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java @@ -91,7 +91,7 @@ class SegmentIterator<S extends Segment> implements KeyValueIterator<Bytes, byte try { hasNext = hasNextCondition.hasNext(currentIterator); } catch (final InvalidStateStoreException e) { - //already closed so ignore + // already closed so ignore } return hasNext; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java index 80b5a91ffa..1ef6a932f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java @@ -91,8 +91,8 @@ public interface SegmentedBytesStore extends StateStore { /** * Gets all the key-value pairs that belong to the windows within in the given time range. * - * @param from the beginning of the time slot from which to search - * @param to the end of the time slot from which to search + * @param from the beginning of the time slot from which to search (inclusive) + * @param to the end of the time slot from which to search (inclusive) * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>} * @throws InvalidStateStoreException if the store is not initialized * @throws NullPointerException if null is used for any key diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 505bbddc80..f21e47fd87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -35,7 +35,7 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE]; public static int keyByteLength(final Bytes key) { - return key.get().length + 2 * TIMESTAMP_SIZE; + return (key == null ? 0 : key.get().length) + 2 * TIMESTAMP_SIZE; } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 21c6e6af12..fc993b63a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -22,11 +22,11 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Merger; import org.apache.kafka.streams.kstream.SessionWindows; @@ -40,9 +40,11 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; @@ -51,13 +53,18 @@ import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; @@ -69,29 +76,39 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; - +@RunWith(Parameterized.class) public class KStreamSessionWindowAggregateProcessorTest { private static final long GAP_MS = 5 * 60 * 1000L; private static final String STORE_NAME = "session-store"; + private final MockTime time = new MockTime(); + private final Metrics metrics = new Metrics(); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, time); private final String threadId = Thread.currentThread().getName(); private final Initializer<Long> initializer = () -> 0L; private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1; private final Merger<String, Long> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo; - private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator = - new KStreamSessionWindowAggregate<>( - SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)), - STORE_NAME, - initializer, - aggregator, - sessionMerger); - private final List<KeyValueTimestamp<Windowed<String>, Change<Long>>> results = new ArrayList<>(); - private final Processor<String, String, Windowed<String>, Change<Long>> processor = sessionAggregator.get(); - private SessionStore<String, Long> sessionStore; + private InternalMockProcessorContext<Windowed<String>, Change<Long>> context; - private final Metrics metrics = new Metrics(); + private KStreamSessionWindowAggregate<String, String, Long> sessionAggregator; + private Processor<String, String, Windowed<String>, Change<Long>> processor; + private SessionStore<String, Long> sessionStore; + + @Parameterized.Parameter + public EmitStrategy.StrategyType type; + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> data() { + return asList(new Object[][] { + {EmitStrategy.StrategyType.ON_WINDOW_UPDATE}, + {EmitStrategy.StrategyType.ON_WINDOW_CLOSE} + }); + } + + private EmitStrategy emitStrategy; + private boolean emitFinal; @Before public void setup() { @@ -99,23 +116,44 @@ public class KStreamSessionWindowAggregateProcessorTest { } private void setup(final boolean enableCache) { - final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime()); + // Always process + final Properties prop = StreamsTestUtils.getStreamsConfig(); + prop.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0); + final StreamsConfig config = new StreamsConfig(prop); + context = new InternalMockProcessorContext<Windowed<String>, Change<Long>>( TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), streamsMetrics, - new StreamsConfig(StreamsTestUtils.getStreamsConfig()), + config, MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 100000, streamsMetrics), - Time.SYSTEM + time ) { @Override public <K extends Windowed<String>, V extends Change<Long>> void forward(final Record<K, V> record) { results.add(new KeyValueTimestamp<>(record.key(), record.value(), record.timestamp())); } }; + + + emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE); + emitStrategy = EmitStrategy.StrategyType.forType(type); + + sessionAggregator = new KStreamSessionWindowAggregate<>( + SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)), + STORE_NAME, + emitStrategy, + initializer, + aggregator, + sessionMerger); + + if (processor != null) { + processor.close(); + } + processor = sessionAggregator.get(); + // Set initial timestamp for CachingSessionStore to prepare entry from as default // InternalMockProcessorContext#timestamp returns -1. context.setTime(0L); @@ -126,14 +164,14 @@ public class KStreamSessionWindowAggregateProcessorTest { } private void initStore(final boolean enableCaching) { - final StoreBuilder<SessionStore<String, Long>> storeBuilder = - Stores.sessionStoreBuilder( - Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)), - Serdes.String(), - Serdes.Long()) + final SessionBytesStoreSupplier supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ? + new RocksDbTimeOrderedSessionBytesStoreSupplier(STORE_NAME, GAP_MS * 3, true) : + Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)); + + final StoreBuilder<SessionStore<String, Long>> storeBuilder = Stores.sessionStoreBuilder(supplier, Serdes.String(), Serdes.Long()) .withLoggingDisabled(); - if (enableCaching) { + if (enableCaching && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { storeBuilder.withCachingEnabled(); } @@ -147,6 +185,7 @@ public class KStreamSessionWindowAggregateProcessorTest { @After public void closeStore() { sessionStore.close(); + processor.close(); } @Test @@ -198,35 +237,51 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() { final String sessionId = "mel"; - long time = 0; - processor.process(new Record<>(sessionId, "first", time)); - final long time1 = time += GAP_MS + 1; - processor.process(new Record<>(sessionId, "second", time1)); - processor.process(new Record<>(sessionId, "second", time1)); - final long time2 = time += GAP_MS + 1; - processor.process(new Record<>(sessionId, "third", time2)); - processor.process(new Record<>(sessionId, "third", time2)); - processor.process(new Record<>(sessionId, "third", time2)); + long now = 0; + processor.process(new Record<>(sessionId, "first", now)); + now += GAP_MS + 1; + processor.process(new Record<>(sessionId, "second", now)); + processor.process(new Record<>(sessionId, "second", now)); + now += GAP_MS + 1; + processor.process(new Record<>(sessionId, "third", now)); + processor.process(new Record<>(sessionId, "third", now)); + processor.process(new Record<>(sessionId, "third", now)); sessionStore.flush(); - assertEquals( - Arrays.asList( - new KeyValueTimestamp<>( - new Windowed<>(sessionId, new SessionWindow(0, 0)), - new Change<>(1L, null), - 0L), - new KeyValueTimestamp<>( - new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)), - new Change<>(2L, null), - GAP_MS + 1), - new KeyValueTimestamp<>( - new Windowed<>(sessionId, new SessionWindow(time, time)), - new Change<>(3L, null), - time) - ), - results - ); + if (emitFinal) { + assertEquals( + Arrays.asList( + new KeyValueTimestamp<>( + new Windowed<>(sessionId, new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)), + new Change<>(2L, null), + GAP_MS + 1) + ), + results + ); + } else { + assertEquals( + Arrays.asList( + new KeyValueTimestamp<>( + new Windowed<>(sessionId, new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)), + new Change<>(2L, null), + GAP_MS + 1), + new KeyValueTimestamp<>( + new Windowed<>(sessionId, new SessionWindow(now, now)), + new Change<>(3L, null), + now) + ), + results + ); + } } @Test @@ -264,8 +319,8 @@ public class KStreamSessionWindowAggregateProcessorTest { sessionStore.flush(); - assertEquals( - Arrays.asList( + if (emitFinal) { + assertEquals(Arrays.asList( new KeyValueTimestamp<>( new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null), @@ -281,22 +336,44 @@ public class KStreamSessionWindowAggregateProcessorTest { new KeyValueTimestamp<>( new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)), new Change<>(2L, null), - GAP_MS / 2), - new KeyValueTimestamp<>( - new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)), - new Change<>(1L, null), - GAP_MS + 1), - new KeyValueTimestamp<>( - new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), - new Change<>(2L, null), - GAP_MS + 1 + GAP_MS / 2), - new KeyValueTimestamp<>(new Windowed<>( - "c", - new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null), - GAP_MS + 1 + GAP_MS / 2) - ), - results - ); + GAP_MS / 2) + ), + results); + } else { + assertEquals( + Arrays.asList( + new KeyValueTimestamp<>( + new Windowed<>("a", new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>("b", new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>("c", new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)), + new Change<>(2L, null), + GAP_MS / 2), + new KeyValueTimestamp<>( + new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)), + new Change<>(1L, null), + GAP_MS + 1), + new KeyValueTimestamp<>( + new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), + new Change<>(2L, null), + GAP_MS + 1 + GAP_MS / 2), + new KeyValueTimestamp<>(new Windowed<>( + "c", + new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null), + GAP_MS + 1 + GAP_MS / 2) + ), + results + ); + } } @Test @@ -314,6 +391,9 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() { + if (emitFinal) + return; + initStore(false); processor.init(context); @@ -342,6 +422,9 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldImmediatelyForwardRemovedSessionsWhenMerging() { + if (emitFinal) + return; + initStore(false); processor.init(context); @@ -399,6 +482,7 @@ public class KStreamSessionWindowAggregateProcessorTest { final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>( SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(0L)), STORE_NAME, + EmitStrategy.onWindowUpdate(), initializer, aggregator, sessionMerger @@ -464,6 +548,7 @@ public class KStreamSessionWindowAggregateProcessorTest { final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>( SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1L)), STORE_NAME, + EmitStrategy.onWindowUpdate(), initializer, aggregator, sessionMerger diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 484ad1f059..8af320ae70 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -99,7 +99,7 @@ public class KStreamWindowAggregateTest { @Parameter(1) public boolean withCache; - public EmitStrategy emitStrategy; + private EmitStrategy emitStrategy; private boolean emitFinal; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java deleted file mode 100644 index 60b37bb052..0000000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.api.ProcessorContext; -import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.state.internals.WrappedStateStore; -import org.junit.Test; - -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; - -public class SessionTupleForwarderTest { - - @Test - public void shouldSetFlushListenerOnWrappedStateStore() { - setFlushListener(true); - setFlushListener(false); - } - - private void setFlushListener(final boolean sendOldValues) { - final WrappedStateStore<StateStore, Windowed<Object>, Object> store = mock(WrappedStateStore.class); - final SessionCacheFlushListener<Object, Object> flushListener = mock(SessionCacheFlushListener.class); - - expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false); - replay(store); - - new SessionTupleForwarder<>(store, null, flushListener, sendOldValues); - - verify(store); - } - - @Test - public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() { - shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false); - shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true); - } - - private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValued) { - final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); - final ProcessorContext<Windowed<String>, Change<String>> context = mock( - ProcessorContext.class); - - expect(store.setFlushListener(null, sendOldValued)).andReturn(false); - if (sendOldValued) { - context.forward( - new Record<>( - new Windowed<>("key", new SessionWindow(21L, 42L)), - new Change<>("value", "oldValue"), - 42L)); - } else { - context.forward( - new Record<>( - new Windowed<>("key", new SessionWindow(21L, 42L)), - new Change<>("value", null), - 42L)); - } - expectLastCall(); - replay(store, context); - - new SessionTupleForwarder<>(store, context, null, sendOldValued) - .maybeForward( - new Record<>( - new Windowed<>("key", new SessionWindow(21L, 42L)), - new Change<>("value", "oldValue"), - 42L)); - - verify(store, context); - } - - @Test - public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() { - final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); - final ProcessorContext<Windowed<String>, Change<String>> context = mock(ProcessorContext.class); - - expect(store.setFlushListener(null, false)).andReturn(true); - replay(store, context); - - new SessionTupleForwarder<>(store, context, null, false) - .maybeForward( - new Record<>( - new Windowed<>("key", new SessionWindow(21L, 42L)), - new Change<>("value", "oldValue"), - 42L)); - - verify(store, context); - } - -} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index a77dcdb0a2..41876581b3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -21,10 +21,12 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; @@ -34,8 +36,11 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.SessionStore; -import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore; +import org.apache.kafka.streams.state.internals.MeteredSessionStore; +import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedSessionStore; +import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockInitializer; @@ -43,29 +48,58 @@ import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.StreamsTestUtils; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Properties; import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +@RunWith(Parameterized.class) public class SessionWindowedKStreamImplTest { private static final String TOPIC = "input"; private final StreamsBuilder builder = new StreamsBuilder(); private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final Merger<String, String> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + "+" + aggTwo; + private SessionWindowedKStream<String, String> stream; + @Parameterized.Parameter + public EmitStrategy.StrategyType type; + + private boolean emitFinal; + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> data() { + return asList(new Object[][] { + {EmitStrategy.StrategyType.ON_WINDOW_UPDATE}, + {EmitStrategy.StrategyType.ON_WINDOW_CLOSE} + }); + } + @Before public void before() { + final EmitStrategy emitStrategy = EmitStrategy.StrategyType.forType(type); + emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE); + + // Set interval to 0 so that it always tries to emit + props.setProperty(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, "0"); + final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); this.stream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500))); + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500))) + .emitStrategy(emitStrategy); } @Test @@ -89,19 +123,30 @@ public class SessionWindowedKStreamImplTest { processData(driver); } - final Map<Windowed<String>, ValueAndTimestamp<Long>> result = - supplier.theCapturedProcessor().lastValueAndTimestampPerKey(); - - assertThat(result.size(), equalTo(3)); - assertThat( - result.get(new Windowed<>("1", new SessionWindow(10L, 15L))), - equalTo(ValueAndTimestamp.make(2L, 15L))); - assertThat( - result.get(new Windowed<>("2", new SessionWindow(599L, 600L))), - equalTo(ValueAndTimestamp.make(2L, 600L))); - assertThat( - result.get(new Windowed<>("1", new SessionWindow(600L, 600L))), - equalTo(ValueAndTimestamp.make(1L, 600L))); + final ArrayList<KeyValueTimestamp<Windowed<String>, Long>> processed = + supplier.theCapturedProcessor().processed(); + + if (emitFinal) { + assertEquals( + Collections.singletonList( + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), 2L, 15L) + ), + processed + ); + } else { + assertEquals( + asList( + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), 1L, 10L), + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), null, 10L), + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), 2L, 15L), + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(600L, 600L)), 1L, 600L), + new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), 1L, 600L), + new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), null, 600L), + new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(599L, 600L)), 2L, 600L) + ), + processed + ); + } } @Test @@ -115,19 +160,30 @@ public class SessionWindowedKStreamImplTest { processData(driver); } - final Map<Windowed<String>, ValueAndTimestamp<String>> result = - supplier.theCapturedProcessor().lastValueAndTimestampPerKey(); - - assertThat(result.size(), equalTo(3)); - assertThat( - result.get(new Windowed<>("1", new SessionWindow(10, 15))), - equalTo(ValueAndTimestamp.make("1+2", 15L))); - assertThat( - result.get(new Windowed<>("2", new SessionWindow(599L, 600))), - equalTo(ValueAndTimestamp.make("1+2", 600L))); - assertThat( - result.get(new Windowed<>("1", new SessionWindow(600, 600))), - equalTo(ValueAndTimestamp.make("3", 600L))); + final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed = + supplier.theCapturedProcessor().processed(); + + if (emitFinal) { + assertEquals( + Collections.singletonList( + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), "1+2", 15L) + ), + processed + ); + } else { + assertEquals( + asList( + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), "1", 10L), + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), null, 10L), + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), "1+2", 15L), + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(600L, 600L)), "3", 600L), + new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), "1", 600L), + new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), null, 600L), + new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(599L, 600L)), "1+2", 600L) + ), + processed + ); + } } @Test @@ -143,19 +199,30 @@ public class SessionWindowedKStreamImplTest { processData(driver); } - final Map<Windowed<String>, ValueAndTimestamp<String>> result = - supplier.theCapturedProcessor().lastValueAndTimestampPerKey(); - - assertThat(result.size(), equalTo(3)); - assertThat( - result.get(new Windowed<>("1", new SessionWindow(10, 15))), - equalTo(ValueAndTimestamp.make("0+0+1+2", 15L))); - assertThat( - result.get(new Windowed<>("2", new SessionWindow(599, 600))), - equalTo(ValueAndTimestamp.make("0+0+1+2", 600L))); - assertThat( - result.get(new Windowed<>("1", new SessionWindow(600, 600))), - equalTo(ValueAndTimestamp.make("0+3", 600L))); + final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed = + supplier.theCapturedProcessor().processed(); + + if (emitFinal) { + assertEquals( + Collections.singletonList( + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), "0+0+1+2", 15L) + ), + processed + ); + } else { + assertEquals( + asList( + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), "0+1", 10L), + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 10L)), null, 10L), + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(10L, 15L)), "0+0+1+2", 15L), + new KeyValueTimestamp<>(new Windowed<>("1", new SessionWindow(600L, 600L)), "0+3", 600L), + new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), "0+1", 600L), + new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(600L, 600L)), null, 600L), + new KeyValueTimestamp<>(new Windowed<>("2", new SessionWindow(599L, 600L)), "0+0+1+2", 600L) + ), + processed + ); + } } @Test @@ -292,6 +359,26 @@ public class SessionWindowedKStreamImplTest { assertThrows(NullPointerException.class, () -> stream.count((Materialized<String, Long, SessionStore<Bytes, byte[]>>) null)); } + @Test + public void shouldNotEnableCachingWithEmitFinal() { + if (!emitFinal) + return; + + stream.aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + sessionMerger, + Materialized.<String, String, SessionStore<Bytes, byte[]>>as("aggregated").withValueSerde(Serdes.String())); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final SessionStore<String, String> store = driver.getSessionStore("aggregated"); + final WrappedStateStore changeLogging = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); + assertThat(store, instanceOf(MeteredSessionStore.class)); + assertThat(changeLogging, instanceOf(ChangeLoggingSessionBytesStore.class)); + assertThat(changeLogging.wrapped(), instanceOf(RocksDBTimeOrderedSessionStore.class)); + } + } + private void processData(final TopologyTestDriver driver) { final TestInputTopic<String, String> inputTopic = driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java index 82017317c8..5ac43ac808 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java @@ -85,7 +85,7 @@ public class TimeWindowedKStreamImplTest { private boolean emitFinal; @Parameterized.Parameters(name = "{0}_cache:{1}") - public static Collection<Object[]> getKeySchema() { + public static Collection<Object[]> data() { return asList(new Object[][] { {StrategyType.ON_WINDOW_UPDATE, true}, {StrategyType.ON_WINDOW_UPDATE, false}, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index f8a7073dab..1de78a8b85 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals.graph; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate; @@ -86,6 +87,7 @@ public class GraphGraceSearchUtilTest { new KStreamWindowAggregate<String, Long, Integer, TimeWindow>( windows, "asdf", + EmitStrategy.onWindowUpdate(), null, null ), @@ -108,6 +110,7 @@ public class GraphGraceSearchUtilTest { new KStreamSessionWindowAggregate<String, Long, Integer>( windows, "asdf", + EmitStrategy.onWindowUpdate(), null, null, null @@ -127,7 +130,7 @@ public class GraphGraceSearchUtilTest { final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>( "asdf", new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>( - windows, "asdf", null, null, null + windows, "asdf", EmitStrategy.onWindowUpdate(), null, null, null ), "asdf"), (StoreBuilder<?>) null ); @@ -167,6 +170,7 @@ public class GraphGraceSearchUtilTest { new KStreamSessionWindowAggregate<String, Long, Integer>( windows, "asdf", + EmitStrategy.onWindowUpdate(), null, null, null @@ -194,6 +198,7 @@ public class GraphGraceSearchUtilTest { new KStreamSessionWindowAggregate<String, Long, Integer>( SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)), "asdf", + EmitStrategy.onWindowUpdate(), null, null, null @@ -209,6 +214,7 @@ public class GraphGraceSearchUtilTest { new KStreamWindowAggregate<String, Long, Integer, TimeWindow>( TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)), "asdf", + EmitStrategy.onWindowUpdate(), null, null ),