This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new babfb1778b1 KAFKA-14864: Close iterator in KStream windowed
aggregation emit on window close (#13470)
babfb1778b1 is described below
commit babfb1778b1fd57d86261adab72ee42bc04caa8b
Author: Victoria Xia <[email protected]>
AuthorDate: Tue Apr 4 00:29:40 2023 -0400
KAFKA-14864: Close iterator in KStream windowed aggregation emit on window
close (#13470)
Reviewers: Matthias J. Sax <[email protected]>
---
...bstractKStreamTimeWindowAggregateProcessor.java | 31 +++++++++++-----------
.../internals/KStreamSlidingWindowAggregate.java | 9 ++++---
2 files changed, 22 insertions(+), 18 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
index a081a280baf..dfef0d4aaae 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
@@ -200,22 +200,23 @@ public abstract class
AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
final long emitRangeUpperBound) {
final long startMs = time.milliseconds();
- final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>>
windowToEmit = windowStore
- .fetchAll(emitRangeLowerBound, emitRangeUpperBound);
-
- int emittedCount = 0;
- while (windowToEmit.hasNext()) {
- emittedCount++;
- final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv =
windowToEmit.next();
-
- tupleForwarder.maybeForward(
- record.withKey(kv.key)
- .withValue(new Change<>(kv.value.value(), null))
- .withTimestamp(kv.value.timestamp())
- .withHeaders(record.headers()));
+ try (final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>>
windowToEmit
+ = windowStore.fetchAll(emitRangeLowerBound,
emitRangeUpperBound)) {
+
+ int emittedCount = 0;
+ while (windowToEmit.hasNext()) {
+ emittedCount++;
+ final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv =
windowToEmit.next();
+
+ tupleForwarder.maybeForward(
+ record.withKey(kv.key)
+ .withValue(new Change<>(kv.value.value(), null))
+ .withTimestamp(kv.value.timestamp())
+ .withHeaders(record.headers()));
+ }
+ emittedRecordsSensor.record(emittedCount);
+ emitFinalLatencySensor.record(time.milliseconds() - startMs);
}
- emittedRecordsSensor.record(emittedCount);
- emitFinalLatencySensor.record(time.milliseconds() - startMs);
lastEmitWindowCloseTime = windowCloseTime;
internalProcessorContext.addProcessorMetadataKeyValue(storeName,
windowCloseTime);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index aa0841a38f0..e75427d6b89 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
@@ -122,9 +123,11 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
if (reverseIteratorPossible == null) {
try {
- windowStore.backwardFetch(record.key(), 0L, 0L);
- reverseIteratorPossible = true;
- log.debug("Sliding Windows aggregate using a reverse
iterator");
+ try (final WindowStoreIterator<ValueAndTimestamp<VAgg>>
iterator
+ = windowStore.backwardFetch(record.key(), 0L,
0L)) {
+ reverseIteratorPossible = true;
+ log.debug("Sliding Windows aggregate using a reverse
iterator");
+ }
} catch (final UnsupportedOperationException e) {
reverseIteratorPossible = false;
log.debug("Sliding Windows aggregate using a forward
iterator");