This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ce83e5b KAFKA-10540: Migrate KStream aggregate operations (#11315) ce83e5b is described below commit ce83e5be6644884d5c727ed49982f45eca8c136f Author: Jorge Esteban Quilcate Otoya <quilcate.jo...@gmail.com> AuthorDate: Thu Sep 30 17:40:40 2021 +0100 KAFKA-10540: Migrate KStream aggregate operations (#11315) As part of the migration of KStream/KTable operations to the new Processor API https://issues.apache.org/jira/browse/KAFKA-8410, this PR includes the migration of KStream aggregate/reduce operations. Reviewers: John Roesler <vvcep...@apache.org> --- .../internals/CogroupedStreamAggregateBuilder.java | 18 +- .../internals/GroupedStreamAggregateBuilder.java | 2 +- .../kstream/internals/KGroupedStreamImpl.java | 2 +- .../internals/KStreamAggProcessorSupplier.java | 7 +- .../kstream/internals/KStreamAggregate.java | 76 ++++--- .../streams/kstream/internals/KStreamReduce.java | 49 +++-- .../internals/KStreamSessionWindowAggregate.java | 152 +++++++------ .../internals/KStreamSlidingWindowAggregate.java | 236 ++++++++++++--------- .../kstream/internals/KStreamWindowAggregate.java | 131 +++++++----- .../streams/kstream/internals/KTableImpl.java | 2 +- .../internals/SessionCacheFlushListener.java | 8 +- .../kstream/internals/SessionTupleForwarder.java | 13 +- .../internals/graph/GraphGraceSearchUtil.java | 5 +- .../streams/processor/api/ContextualProcessor.java | 2 +- .../kstream/internals/KGroupedStreamImplTest.java | 4 +- ...KStreamSessionWindowAggregateProcessorTest.java | 142 ++++++------- .../KStreamSlidingWindowAggregateTest.java | 16 +- .../internals/KStreamWindowAggregateTest.java | 30 +-- .../internals/SessionCacheFlushListenerTest.java | 9 +- .../internals/SessionTupleForwarderTest.java | 31 +-- 20 files changed, 522 insertions(+), 413 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index ebaa954..5a04a2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.state.StoreBuilder; class CogroupedStreamAggregateBuilder<K, VOut> { @@ -60,7 +61,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> { boolean stateCreated = false; int counter = 0; for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { - final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor = + final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()); parentProcessors.add(parentProcessor); final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode( @@ -94,8 +95,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> { boolean stateCreated = false; int counter = 0; for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { - final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor = - (KStreamAggProcessorSupplier<K, K, ?, ?>) new KStreamWindowAggregate<K, K, VOut, W>( + final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = + (KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamWindowAggregate<K, K, VOut, W>( windows, storeBuilder.name(), initializer, @@ -132,8 +133,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> { boolean stateCreated = false; int counter = 0; for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { - final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor = - (KStreamAggProcessorSupplier<K, K, ?, ?>) new KStreamSessionWindowAggregate<K, K, VOut>( + final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = + (KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamSessionWindowAggregate<K, K, VOut>( sessionWindows, storeBuilder.name(), initializer, @@ -170,8 +171,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> { boolean stateCreated = false; int counter = 0; for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { - final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor = - (KStreamAggProcessorSupplier<K, K, ?, ?>) new KStreamSlidingWindowAggregate<K, K, VOut>( + final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = + (KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamSlidingWindowAggregate<K, K, VOut>( slidingWindows, storeBuilder.name(), initializer, @@ -253,11 +254,10 @@ class CogroupedStreamAggregateBuilder<K, VOut> { builder); } - @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String processorName, final boolean stateCreated, final StoreBuilder<?> storeBuilder, - final org.apache.kafka.streams.processor.ProcessorSupplier<K, ?> kStreamAggregate) { + final ProcessorSupplier<K, ?, K, ?> kStreamAggregate) { final StatefulProcessorNode<K, ?> statefulProcessorNode; if (!stateCreated) { statefulProcessorNode = diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index 4896d97..dfcf63d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -68,7 +68,7 @@ class GroupedStreamAggregateBuilder<K, V> { <KR, VR> KTable<KR, VR> build(final NamedInternal functionName, final StoreBuilder<?> storeBuilder, - final KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier, + final KStreamAggProcessorSupplier<K, V, KR, VR> aggregateSupplier, final String queryableStoreName, final Serde<KR> keySerde, final Serde<VR> valueSerde) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 1317996..d56caed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -233,7 +233,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS ); } - private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier, + private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, V, K, T> aggregateSupplier, final String functionName, final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) { return aggregateBuilder.build( diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java index fab61be..7d8ff94 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java @@ -16,10 +16,11 @@ */ package org.apache.kafka.streams.kstream.internals; -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. -public interface KStreamAggProcessorSupplier<K, RK, V, T> extends org.apache.kafka.streams.processor.ProcessorSupplier<K, V> { +import org.apache.kafka.streams.processor.api.ProcessorSupplier; - KTableValueGetterSupplier<RK, T> view(); +public interface KStreamAggProcessorSupplier<KIn, VIn, KAgg, VAgg> extends ProcessorSupplier<KIn, VIn, KAgg, Change<VAgg>> { + + KTableValueGetterSupplier<KAgg, VAgg> view(); void enableSendingOldValues(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 0f83849..8e0a910 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -19,6 +19,11 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -28,25 +33,26 @@ import org.slf4j.LoggerFactory; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. -public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> { +public class KStreamAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupplier<KIn, VIn, KIn, VAgg> { + private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class); + private final String storeName; - private final Initializer<T> initializer; - private final Aggregator<? super K, ? super V, T> aggregator; + private final Initializer<VAgg> initializer; + private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator; private boolean sendOldValues = false; KStreamAggregate(final String storeName, - final Initializer<T> initializer, - final Aggregator<? super K, ? super V, T> aggregator) { + final Initializer<VAgg> initializer, + final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) { this.storeName = storeName; this.initializer = initializer; this.aggregator = aggregator; } @Override - public org.apache.kafka.streams.processor.Processor<K, V> get() { + public Processor<KIn, VIn, KIn, Change<VAgg>> get() { return new KStreamAggregateProcessor(); } @@ -56,13 +62,13 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, } - private class KStreamAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> { - private TimestampedKeyValueStore<K, T> store; + private class KStreamAggregateProcessor extends ContextualProcessor<KIn, VIn, KIn, Change<VAgg>> { + private TimestampedKeyValueStore<KIn, VAgg> store; private Sensor droppedRecordsSensor; - private TimestampedTupleForwarder<K, T> tupleForwarder; + private TimestampedTupleForwarder<KIn, VAgg> tupleForwarder; @Override - public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { + public void init(final ProcessorContext<KIn, Change<VAgg>> context) { super.init(context); droppedRecordsSensor = droppedRecordsSensor( Thread.currentThread().getName(), @@ -77,43 +83,51 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, } @Override - public void process(final K key, final V value) { + public void process(final Record<KIn, VIn> record) { // If the key or value is null we don't need to proceed - if (key == null || value == null) { - LOG.warn( - "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", - key, value, context().topic(), context().partition(), context().offset() - ); + if (record.key() == null || record.value() == null) { + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + LOG.warn( + "Skipping record due to null key or value. " + + "topic=[{}] partition=[{}] offset=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() + ); + } else { + LOG.warn( + "Skipping record due to null key or value. Topic, partition, and offset not known." + ); + } droppedRecordsSensor.record(); return; } - final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key); - T oldAgg = getValueOrNull(oldAggAndTimestamp); + final ValueAndTimestamp<VAgg> oldAggAndTimestamp = store.get(record.key()); + VAgg oldAgg = getValueOrNull(oldAggAndTimestamp); - final T newAgg; + final VAgg newAgg; final long newTimestamp; if (oldAgg == null) { oldAgg = initializer.apply(); - newTimestamp = context().timestamp(); + newTimestamp = record.timestamp(); } else { oldAgg = oldAggAndTimestamp.value(); - newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp()); + newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp()); } - newAgg = aggregator.apply(key, value, oldAgg); + newAgg = aggregator.apply(record.key(), record.value(), oldAgg); - store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp)); - tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp); + store.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp)); + tupleForwarder.maybeForward(record.key(), newAgg, sendOldValues ? oldAgg : null, newTimestamp); } } @Override - public KTableValueGetterSupplier<K, T> view() { - return new KTableValueGetterSupplier<K, T>() { + public KTableValueGetterSupplier<KIn, VAgg> view() { + return new KTableValueGetterSupplier<KIn, VAgg>() { - public KTableValueGetter<K, T> get() { + public KTableValueGetter<KIn, VAgg> get() { return new KStreamAggregateValueGetter(); } @@ -124,8 +138,8 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, }; } - private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> { - private TimestampedKeyValueStore<K, T> store; + private class KStreamAggregateValueGetter implements KTableValueGetter<KIn, VAgg> { + private TimestampedKeyValueStore<KIn, VAgg> store; @Override public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { @@ -133,7 +147,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, } @Override - public ValueAndTimestamp<T> get(final K key) { + public ValueAndTimestamp<VAgg> get(final KIn key) { return store.get(key); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index d774217..080f9a4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -18,6 +18,11 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -27,8 +32,8 @@ import org.slf4j.LoggerFactory; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. -public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> { +public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, V, K, V> { + private static final Logger LOG = LoggerFactory.getLogger(KStreamReduce.class); private final String storeName; @@ -42,7 +47,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, } @Override - public org.apache.kafka.streams.processor.Processor<K, V> get() { + public Processor<K, V, K, Change<V>> get() { return new KStreamReduceProcessor(); } @@ -52,13 +57,13 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, } - private class KStreamReduceProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> { + private class KStreamReduceProcessor extends ContextualProcessor<K, V, K, Change<V>> { private TimestampedKeyValueStore<K, V> store; private TimestampedTupleForwarder<K, V> tupleForwarder; private Sensor droppedRecordsSensor; @Override - public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { + public void init(final ProcessorContext<K, Change<V>> context) { super.init(context); droppedRecordsSensor = droppedRecordsSensor( Thread.currentThread().getName(), @@ -74,33 +79,41 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, } @Override - public void process(final K key, final V value) { + public void process(final Record<K, V> record) { // If the key or value is null we don't need to proceed - if (key == null || value == null) { - LOG.warn( - "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", - key, value, context().topic(), context().partition(), context().offset() - ); + if (record.key() == null || record.value() == null) { + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + LOG.warn( + "Skipping record due to null key or value. " + + "topic=[{}] partition=[{}] offset=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() + ); + } else { + LOG.warn( + "Skipping record due to null key. Topic, partition, and offset not known." + ); + } droppedRecordsSensor.record(); return; } - final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key); + final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(record.key()); final V oldAgg = getValueOrNull(oldAggAndTimestamp); final V newAgg; final long newTimestamp; if (oldAgg == null) { - newAgg = value; - newTimestamp = context().timestamp(); + newAgg = record.value(); + newTimestamp = record.timestamp(); } else { - newAgg = reducer.apply(oldAgg, value); - newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp()); + newAgg = reducer.apply(oldAgg, record.value()); + newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp()); } - store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp)); - tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp); + store.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp)); + tupleForwarder.maybeForward(record.key(), newAgg, sendOldValues ? oldAgg : null, newTimestamp); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index f15997f..00b6959 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -24,6 +24,11 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Merger; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -36,23 +41,23 @@ import java.util.List; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. -public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { +public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupplier<KIn, VIn, Windowed<KIn>, VAgg> { + private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class); private final String storeName; private final SessionWindows windows; - private final Initializer<Agg> initializer; - private final Aggregator<? super K, ? super V, Agg> aggregator; - private final Merger<? super K, Agg> sessionMerger; + private final Initializer<VAgg> initializer; + private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator; + private final Merger<? super KIn, VAgg> sessionMerger; private boolean sendOldValues = false; public KStreamSessionWindowAggregate(final SessionWindows windows, - final String storeName, - final Initializer<Agg> initializer, - final Aggregator<? super K, ? super V, Agg> aggregator, - final Merger<? super K, Agg> sessionMerger) { + final String storeName, + final Initializer<VAgg> initializer, + final Aggregator<? super KIn, ? super VIn, VAgg> aggregator, + final Merger<? super KIn, VAgg> sessionMerger) { this.windows = windows; this.storeName = storeName; this.initializer = initializer; @@ -61,7 +66,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce } @Override - public org.apache.kafka.streams.processor.Processor<K, V> get() { + public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() { return new KStreamSessionWindowAggregateProcessor(); } @@ -74,92 +79,118 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce sendOldValues = true; } - private class KStreamSessionWindowAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> { + private class KStreamSessionWindowAggregateProcessor extends + ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> { - private SessionStore<K, Agg> store; - private SessionTupleForwarder<K, Agg> tupleForwarder; + private SessionStore<KIn, VAgg> store; + private SessionTupleForwarder<KIn, VAgg> tupleForwarder; private Sensor droppedRecordsSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; @Override - public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { + public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) { super.init(context); final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); final String threadId = Thread.currentThread().getName(); - droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); + droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), + metrics); store = context.getStateStore(storeName); - tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues); + tupleForwarder = new SessionTupleForwarder<>( + store, + context, + new SessionCacheFlushListener<>(context), + sendOldValues + ); } @Override - public void process(final K key, final V value) { + public void process(final Record<KIn, VIn> record) { // if the key is null, we do not need proceed aggregating // the record with the table - if (key == null) { - LOG.warn( - "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", - value, context().topic(), context().partition(), context().offset() - ); + if (record.key() == null) { + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + LOG.warn( + "Skipping record due to null key. " + + "topic=[{}] partition=[{}] offset=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() + ); + } else { + LOG.warn( + "Skipping record due to null key. Topic, partition, and offset not known." + ); + } droppedRecordsSensor.record(); return; } - final long timestamp = context().timestamp(); + final long timestamp = record.timestamp(); observedStreamTime = Math.max(observedStreamTime, timestamp); final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap(); - final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>(); + final List<KeyValue<Windowed<KIn>, VAgg>> merged = new ArrayList<>(); final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp); SessionWindow mergedWindow = newSessionWindow; - Agg agg = initializer.apply(); + VAgg agg = initializer.apply(); try ( - final KeyValueIterator<Windowed<K>, Agg> iterator = store.findSessions( - key, + final KeyValueIterator<Windowed<KIn>, VAgg> iterator = store.findSessions( + record.key(), timestamp - windows.inactivityGap(), timestamp + windows.inactivityGap() ) ) { while (iterator.hasNext()) { - final KeyValue<Windowed<K>, Agg> next = iterator.next(); + final KeyValue<Windowed<KIn>, VAgg> next = iterator.next(); merged.add(next); - agg = sessionMerger.apply(key, agg, next.value); + agg = sessionMerger.apply(record.key(), agg, next.value); mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window()); } } if (mergedWindow.end() < closeTime) { - LOG.warn( - "Skipping record for expired window. " + - "key=[{}] " + - "topic=[{}] " + - "partition=[{}] " + - "offset=[{}] " + - "timestamp=[{}] " + - "window=[{},{}] " + - "expiration=[{}] " + - "streamTime=[{}]", - key, - context().topic(), - context().partition(), - context().offset(), - timestamp, - mergedWindow.start(), - mergedWindow.end(), - closeTime, - observedStreamTime - ); + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + LOG.warn( + "Skipping record for expired window. " + + "topic=[{}] " + + "partition=[{}] " + + "offset=[{}] " + + "timestamp=[{}] " + + "window=[{},{}] " + + "expiration=[{}] " + + "streamTime=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), + timestamp, + mergedWindow.start(), mergedWindow.end(), + closeTime, + observedStreamTime + ); + } else { + LOG.warn( + "Skipping record for expired window. Topic, partition, and offset not known. " + + "timestamp=[{}] " + + "window=[{},{}] " + + "expiration=[{}] " + + "streamTime=[{}]", + timestamp, + mergedWindow.start(), mergedWindow.end(), + closeTime, + observedStreamTime + ); + } droppedRecordsSensor.record(); } else { if (!mergedWindow.equals(newSessionWindow)) { - for (final KeyValue<Windowed<K>, Agg> session : merged) { + for (final KeyValue<Windowed<KIn>, VAgg> session : merged) { store.remove(session.key); - tupleForwarder.maybeForward(session.key, null, sendOldValues ? session.value : null); + tupleForwarder.maybeForward(session.key, null, + sendOldValues ? session.value : null); } } - agg = aggregator.apply(key, value, agg); - final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow); + agg = aggregator.apply(record.key(), record.value(), agg); + final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow); store.put(sessionKey, agg); tupleForwarder.maybeForward(sessionKey, agg, null); } @@ -173,22 +204,23 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce } @Override - public KTableValueGetterSupplier<Windowed<K>, Agg> view() { - return new KTableValueGetterSupplier<Windowed<K>, Agg>() { + public KTableValueGetterSupplier<Windowed<KIn>, VAgg> view() { + return new KTableValueGetterSupplier<Windowed<KIn>, VAgg>() { @Override - public KTableValueGetter<Windowed<K>, Agg> get() { + public KTableValueGetter<Windowed<KIn>, VAgg> get() { return new KTableSessionWindowValueGetter(); } @Override public String[] storeNames() { - return new String[] {storeName}; + return new String[]{storeName}; } }; } - private class KTableSessionWindowValueGetter implements KTableValueGetter<Windowed<K>, Agg> { - private SessionStore<K, Agg> store; + private class KTableSessionWindowValueGetter implements KTableValueGetter<Windowed<KIn>, VAgg> { + + private SessionStore<KIn, VAgg> store; @Override public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { @@ -196,7 +228,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce } @Override - public ValueAndTimestamp<Agg> get(final Windowed<K> key) { + public ValueAndTimestamp<VAgg> get(final Windowed<KIn> key) { return ValueAndTimestamp.make( store.fetchSession(key.key(), key.window().start(), key.window().end()), key.window().end()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java index db91bb3..ac4710e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java @@ -24,6 +24,11 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; @@ -37,21 +42,21 @@ import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. -public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { +public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupplier<KIn, VIn, Windowed<KIn>, VAgg> { + private final Logger log = LoggerFactory.getLogger(getClass()); private final String storeName; private final SlidingWindows windows; - private final Initializer<Agg> initializer; - private final Aggregator<? super K, ? super V, Agg> aggregator; + private final Initializer<VAgg> initializer; + private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator; private boolean sendOldValues = false; public KStreamSlidingWindowAggregate(final SlidingWindows windows, final String storeName, - final Initializer<Agg> initializer, - final Aggregator<? super K, ? super V, Agg> aggregator) { + final Initializer<VAgg> initializer, + final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) { this.windows = windows; this.storeName = storeName; this.initializer = initializer; @@ -59,7 +64,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce } @Override - public org.apache.kafka.streams.processor.Processor<K, V> get() { + public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() { return new KStreamSlidingWindowAggregateProcessor(); } @@ -72,17 +77,18 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce sendOldValues = true; } - private class KStreamSlidingWindowAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> { - private TimestampedWindowStore<K, Agg> windowStore; - private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder; + private class KStreamSlidingWindowAggregateProcessor extends ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> { + private TimestampedWindowStore<KIn, VAgg> windowStore; + private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder; private Sensor droppedRecordsSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; private Boolean reverseIteratorPossible = null; @Override - public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { + public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) { super.init(context); - final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; + final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext = + (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context; final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); final String threadId = Thread.currentThread().getName(); droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); @@ -95,52 +101,71 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce } @Override - public void process(final K key, final V value) { - if (key == null || value == null) { - log.warn( - "Skipping record due to null key or value. value=[{}] topic=[{}] partition=[{}] offset=[{}]", - value, context().topic(), context().partition(), context().offset() - ); + public void process(final Record<KIn, VIn> record) { + if (record.key() == null || record.value() == null) { + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + log.warn( + "Skipping record due to null key or value. " + + "topic=[{}] partition=[{}] offset=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() + ); + } else { + log.warn( + "Skipping record due to null key or value. Topic, partition, and offset not known." + ); + } droppedRecordsSensor.record(); return; } - final long inputRecordTimestamp = context().timestamp(); + final long inputRecordTimestamp = record.timestamp(); observedStreamTime = Math.max(observedStreamTime, inputRecordTimestamp); final long closeTime = observedStreamTime - windows.gracePeriodMs(); if (inputRecordTimestamp + 1L + windows.timeDifferenceMs() <= closeTime) { - log.warn( - "Skipping record for expired window. " + - "key=[{}] " + - "topic=[{}] " + - "partition=[{}] " + - "offset=[{}] " + - "timestamp=[{}] " + - "window=[{},{}] " + - "expiration=[{}] " + - "streamTime=[{}]", - key, - context().topic(), - context().partition(), - context().offset(), - context().timestamp(), - inputRecordTimestamp - windows.timeDifferenceMs(), inputRecordTimestamp, - closeTime, - observedStreamTime - ); + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + log.warn( + "Skipping record for expired window. " + + "topic=[{}] " + + "partition=[{}] " + + "offset=[{}] " + + "timestamp=[{}] " + + "window=[{},{}] " + + "expiration=[{}] " + + "streamTime=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), + record.timestamp(), + inputRecordTimestamp - windows.timeDifferenceMs(), inputRecordTimestamp, + closeTime, + observedStreamTime + ); + } else { + log.warn( + "Skipping record for expired window. Topic, partition, and offset not known. " + + "timestamp=[{}] " + + "window=[{},{}] " + + "expiration=[{}] " + + "streamTime=[{}]", + record.timestamp(), + inputRecordTimestamp - windows.timeDifferenceMs(), inputRecordTimestamp, + closeTime, + observedStreamTime + ); + } droppedRecordsSensor.record(); return; } if (inputRecordTimestamp < windows.timeDifferenceMs()) { - processEarly(key, value, inputRecordTimestamp, closeTime); + processEarly(record.key(), record.value(), inputRecordTimestamp, closeTime); return; } if (reverseIteratorPossible == null) { try { - windowStore.backwardFetch(key, 0L, 0L); + windowStore.backwardFetch(record.key(), 0L, 0L); reverseIteratorPossible = true; log.debug("Sliding Windows aggregate using a reverse iterator"); } catch (final UnsupportedOperationException e) { @@ -150,19 +175,19 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce } if (reverseIteratorPossible) { - processReverse(key, value, inputRecordTimestamp, closeTime); + processReverse(record.key(), record.value(), inputRecordTimestamp, closeTime); } else { - processInOrder(key, value, inputRecordTimestamp, closeTime); + processInOrder(record.key(), record.value(), inputRecordTimestamp, closeTime); } } - public void processInOrder(final K key, final V value, final long inputRecordTimestamp, final long closeTime) { + public void processInOrder(final KIn key, final VIn value, final long inputRecordTimestamp, final long closeTime) { final Set<Long> windowStartTimes = new HashSet<>(); // aggregate that will go in the current record’s left/right window (if needed) - ValueAndTimestamp<Agg> leftWinAgg = null; - ValueAndTimestamp<Agg> rightWinAgg = null; + ValueAndTimestamp<VAgg> leftWinAgg = null; + ValueAndTimestamp<VAgg> rightWinAgg = null; //if current record's left/right windows already exist boolean leftWinAlreadyCreated = false; @@ -171,7 +196,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce Long previousRecordTimestamp = null; try ( - final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> iterator = windowStore.fetch( key, key, Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()), @@ -179,7 +204,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce inputRecordTimestamp + 1) ) { while (iterator.hasNext()) { - final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next(); + final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowBeingProcessed = iterator.next(); final long startTime = windowBeingProcessed.key.window().start(); windowStartTimes.add(startTime); final long endTime = startTime + windows.timeDifferenceMs(); @@ -211,13 +236,13 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp); } - public void processReverse(final K key, final V value, final long inputRecordTimestamp, final long closeTime) { + public void processReverse(final KIn key, final VIn value, final long inputRecordTimestamp, final long closeTime) { final Set<Long> windowStartTimes = new HashSet<>(); // aggregate that will go in the current record’s left/right window (if needed) - ValueAndTimestamp<Agg> leftWinAgg = null; - ValueAndTimestamp<Agg> rightWinAgg = null; + ValueAndTimestamp<VAgg> leftWinAgg = null; + ValueAndTimestamp<VAgg> rightWinAgg = null; //if current record's left/right windows already exist boolean leftWinAlreadyCreated = false; @@ -226,7 +251,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce Long previousRecordTimestamp = null; try ( - final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch( + final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> iterator = windowStore.backwardFetch( key, key, Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()), @@ -234,7 +259,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce inputRecordTimestamp + 1) ) { while (iterator.hasNext()) { - final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next(); + final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowBeingProcessed = iterator.next(); final long startTime = windowBeingProcessed.key.window().start(); windowStartTimes.add(startTime); final long endTime = startTime + windows.timeDifferenceMs(); @@ -275,7 +300,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce * windows with negative start times, which is not supported. Instead, we will put them into the [0, timeDifferenceMs] * window as a "workaround", and we will update or create their right windows as new records come in later */ - private void processEarly(final K key, final V value, final long inputRecordTimestamp, final long closeTime) { + private void processEarly(final KIn key, final VIn value, final long inputRecordTimestamp, final long closeTime) { if (inputRecordTimestamp < 0 || inputRecordTimestamp >= windows.timeDifferenceMs()) { log.error( "Early record for sliding windows must fall between fall between 0 <= inputRecordTimestamp. Timestamp {} does not fall between 0 <= {}", @@ -285,15 +310,15 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce } // A window from [0, timeDifferenceMs] that holds all early records - KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; - ValueAndTimestamp<Agg> rightWinAgg = null; + KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> combinedWindow = null; + ValueAndTimestamp<VAgg> rightWinAgg = null; boolean rightWinAlreadyCreated = false; final Set<Long> windowStartTimes = new HashSet<>(); Long previousRecordTimestamp = null; try ( - final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> iterator = windowStore.fetch( key, key, 0, @@ -301,7 +326,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce inputRecordTimestamp + 1) ) { while (iterator.hasNext()) { - final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next(); + final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowBeingProcessed = iterator.next(); final long startTime = windowBeingProcessed.key.window().start(); windowStartTimes.add(startTime); final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp(); @@ -352,7 +377,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce if (combinedWindow == null) { final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp); + final ValueAndTimestamp<VAgg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp); updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp); } else { @@ -362,13 +387,13 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce } - private void createWindows(final K key, - final V value, + private void createWindows(final KIn key, + final VIn value, final long inputRecordTimestamp, final long closeTime, final Set<Long> windowStartTimes, - final ValueAndTimestamp<Agg> rightWinAgg, - final ValueAndTimestamp<Agg> leftWinAgg, + final ValueAndTimestamp<VAgg> rightWinAgg, + final ValueAndTimestamp<VAgg> leftWinAgg, final boolean leftWinAlreadyCreated, final boolean rightWinAlreadyCreated, final Long previousRecordTimestamp) { @@ -382,7 +407,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce //create left window for new record if (!leftWinAlreadyCreated) { - final ValueAndTimestamp<Agg> valueAndTime; + final ValueAndTimestamp<VAgg> valueAndTime; if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp); } else { @@ -399,8 +424,8 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce } private void createCurrentRecordRightWindow(final long inputRecordTimestamp, - final ValueAndTimestamp<Agg> rightWinAgg, - final K key) { + final ValueAndTimestamp<VAgg> rightWinAgg, + final KIn key) { final TimeWindow window = new TimeWindow(inputRecordTimestamp + 1, inputRecordTimestamp + 1 + windows.timeDifferenceMs()); windowStore.put( key, @@ -415,11 +440,11 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce private void createPreviousRecordRightWindow(final long windowStart, final long inputRecordTimestamp, - final K key, - final V value, + final KIn key, + final VIn value, final long closeTime) { final TimeWindow window = new TimeWindow(windowStart, windowStart + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp); + final ValueAndTimestamp<VAgg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp); updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp); } @@ -436,22 +461,22 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce } // checks if the aggregate we found has records that fall into the current record's right window; if yes, the right window is not empty - private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long inputRecordTimestamp) { + private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<VAgg> rightWinAgg, final long inputRecordTimestamp) { return rightWinAgg != null && rightWinAgg.timestamp() > inputRecordTimestamp; } private void updateWindowAndForward(final Window window, - final ValueAndTimestamp<Agg> valueAndTime, - final K key, - final V value, + final ValueAndTimestamp<VAgg> valueAndTime, + final KIn key, + final VIn value, final long closeTime, final long inputRecordTimestamp) { final long windowStart = window.start(); final long windowEnd = window.end(); if (windowEnd > closeTime) { //get aggregate from existing window - final Agg oldAgg = getValueOrNull(valueAndTime); - final Agg newAgg = aggregator.apply(key, value, oldAgg); + final VAgg oldAgg = getValueOrNull(valueAndTime); + final VAgg newAgg = aggregator.apply(key, value, oldAgg); final long newTimestamp = oldAgg == null ? inputRecordTimestamp : Math.max(inputRecordTimestamp, valueAndTime.timestamp()); windowStore.put( @@ -464,35 +489,46 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce sendOldValues ? oldAgg : null, newTimestamp); } else { - log.warn( - "Skipping record for expired window. " + - "key=[{}] " + - "topic=[{}] " + - "partition=[{}] " + - "offset=[{}] " + - "timestamp=[{}] " + - "window=[{},{}] " + - "expiration=[{}] " + - "streamTime=[{}]", - key, - context().topic(), - context().partition(), - context().offset(), - context().timestamp(), - windowStart, windowEnd, - closeTime, - observedStreamTime - ); + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + log.warn( + "Skipping record for expired window. " + + "topic=[{}] " + + "partition=[{}] " + + "offset=[{}] " + + "timestamp=[{}] " + + "window=[{},{}] " + + "expiration=[{}] " + + "streamTime=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), + inputRecordTimestamp, + windowStart, windowEnd, + closeTime, + observedStreamTime + ); + } else { + log.warn( + "Skipping record for expired window. Topic, partition, and offset not known. " + + "timestamp=[{}] " + + "window=[{},{}] " + + "expiration=[{}] " + + "streamTime=[{}]", + inputRecordTimestamp, + windowStart, windowEnd, + closeTime, + observedStreamTime + ); + } droppedRecordsSensor.record(); } } } @Override - public KTableValueGetterSupplier<Windowed<K>, Agg> view() { - return new KTableValueGetterSupplier<Windowed<K>, Agg>() { + public KTableValueGetterSupplier<Windowed<KIn>, VAgg> view() { + return new KTableValueGetterSupplier<Windowed<KIn>, VAgg>() { - public KTableValueGetter<Windowed<K>, Agg> get() { + public KTableValueGetter<Windowed<KIn>, VAgg> get() { return new KStreamWindowAggregateValueGetter(); } @@ -503,8 +539,8 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce }; } - private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> { - private TimestampedWindowStore<K, Agg> windowStore; + private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<KIn>, VAgg> { + private TimestampedWindowStore<KIn, VAgg> windowStore; @Override public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { @@ -512,8 +548,8 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce } @Override - public ValueAndTimestamp<Agg> get(final Windowed<K> windowedKey) { - final K key = windowedKey.key(); + public ValueAndTimestamp<VAgg> get(final Windowed<KIn> windowedKey) { + final KIn key = windowedKey.key(); return windowStore.fetch(key, windowedKey.window().start()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index aaf6a34..5730ae6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -23,6 +23,11 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windows; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.TimestampedWindowStore; @@ -35,21 +40,21 @@ import java.util.Map; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. -public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { +public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements KStreamAggProcessorSupplier<KIn, VIn, Windowed<KIn>, VAgg> { + private final Logger log = LoggerFactory.getLogger(getClass()); private final String storeName; private final Windows<W> windows; - private final Initializer<Agg> initializer; - private final Aggregator<? super K, ? super V, Agg> aggregator; + private final Initializer<VAgg> initializer; + private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator; private boolean sendOldValues = false; public KStreamWindowAggregate(final Windows<W> windows, final String storeName, - final Initializer<Agg> initializer, - final Aggregator<? super K, ? super V, Agg> aggregator) { + final Initializer<VAgg> initializer, + final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) { this.windows = windows; this.storeName = storeName; this.initializer = initializer; @@ -57,7 +62,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr } @Override - public org.apache.kafka.streams.processor.Processor<K, V> get() { + public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() { return new KStreamWindowAggregateProcessor(); } @@ -71,16 +76,17 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr } - private class KStreamWindowAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> { - private TimestampedWindowStore<K, Agg> windowStore; - private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder; + private class KStreamWindowAggregateProcessor extends ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> { + private TimestampedWindowStore<KIn, VAgg> windowStore; + private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder; private Sensor droppedRecordsSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; @Override - public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { + public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) { super.init(context); - final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; + final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext = + (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context; final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); final String threadId = Thread.currentThread().getName(); droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); @@ -93,18 +99,26 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr } @Override - public void process(final K key, final V value) { - if (key == null) { - log.warn( - "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", - value, context().topic(), context().partition(), context().offset() - ); + public void process(final Record<KIn, VIn> record) { + if (record.key() == null) { + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + log.warn( + "Skipping record due to null key. " + + "topic=[{}] partition=[{}] offset=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() + ); + } else { + log.warn( + "Skipping record due to null key. Topic, partition, and offset not known." + ); + } droppedRecordsSensor.record(); return; } // first get the matching windows - final long timestamp = context().timestamp(); + final long timestamp = record.timestamp(); observedStreamTime = Math.max(observedStreamTime, timestamp); final long closeTime = observedStreamTime - windows.gracePeriodMs(); @@ -115,48 +129,59 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr final Long windowStart = entry.getKey(); final long windowEnd = entry.getValue().end(); if (windowEnd > closeTime) { - final ValueAndTimestamp<Agg> oldAggAndTimestamp = windowStore.fetch(key, windowStart); - Agg oldAgg = getValueOrNull(oldAggAndTimestamp); + final ValueAndTimestamp<VAgg> oldAggAndTimestamp = windowStore.fetch(record.key(), windowStart); + VAgg oldAgg = getValueOrNull(oldAggAndTimestamp); - final Agg newAgg; + final VAgg newAgg; final long newTimestamp; if (oldAgg == null) { oldAgg = initializer.apply(); - newTimestamp = context().timestamp(); + newTimestamp = record.timestamp(); } else { - newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp()); + newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp()); } - newAgg = aggregator.apply(key, value, oldAgg); + newAgg = aggregator.apply(record.key(), record.value(), oldAgg); // update the store with the new value - windowStore.put(key, ValueAndTimestamp.make(newAgg, newTimestamp), windowStart); + windowStore.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp), windowStart); tupleForwarder.maybeForward( - new Windowed<>(key, entry.getValue()), + new Windowed<>(record.key(), entry.getValue()), newAgg, sendOldValues ? oldAgg : null, newTimestamp); } else { - log.warn( - "Skipping record for expired window. " + - "key=[{}] " + - "topic=[{}] " + - "partition=[{}] " + - "offset=[{}] " + - "timestamp=[{}] " + - "window=[{},{}) " + - "expiration=[{}] " + - "streamTime=[{}]", - key, - context().topic(), - context().partition(), - context().offset(), - context().timestamp(), - windowStart, windowEnd, - closeTime, - observedStreamTime - ); + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + log.warn( + "Skipping record for expired window. " + + "topic=[{}] " + + "partition=[{}] " + + "offset=[{}] " + + "timestamp=[{}] " + + "window=[{},{}) " + + "expiration=[{}] " + + "streamTime=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), + record.timestamp(), + windowStart, windowEnd, + closeTime, + observedStreamTime + ); + } else { + log.warn( + "Skipping record for expired window. Topic, partition, and offset not known. " + + "timestamp=[{}] " + + "window=[{},{}] " + + "expiration=[{}] " + + "streamTime=[{}]", + record.timestamp(), + windowStart, windowEnd, + closeTime, + observedStreamTime + ); + } droppedRecordsSensor.record(); } } @@ -164,10 +189,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr } @Override - public KTableValueGetterSupplier<Windowed<K>, Agg> view() { - return new KTableValueGetterSupplier<Windowed<K>, Agg>() { + public KTableValueGetterSupplier<Windowed<KIn>, VAgg> view() { + return new KTableValueGetterSupplier<Windowed<KIn>, VAgg>() { - public KTableValueGetter<Windowed<K>, Agg> get() { + public KTableValueGetter<Windowed<KIn>, VAgg> get() { return new KStreamWindowAggregateValueGetter(); } @@ -178,8 +203,8 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr }; } - private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> { - private TimestampedWindowStore<K, Agg> windowStore; + private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<KIn>, VAgg> { + private TimestampedWindowStore<KIn, VAgg> windowStore; @Override public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { @@ -188,8 +213,8 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr @SuppressWarnings("unchecked") @Override - public ValueAndTimestamp<Agg> get(final Windowed<K> windowedKey) { - final K key = windowedKey.key(); + public ValueAndTimestamp<VAgg> get(final Windowed<KIn> windowedKey) { + final KIn key = windowedKey.key(); final W window = (W) windowedKey.window(); return windowStore.fetch(key, window.start()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 5339b6a..6f73044 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -832,7 +832,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< source.materialize(); return new KTableSourceValueGetterSupplier<>(source.queryableName()); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { - return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view(); + return ((KStreamAggProcessorSupplier<?, S, K, V>) processorSupplier).view(); } else if (processorSupplier instanceof KTableNewProcessorSupplier) { return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view(); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java index daa7c64..a2c95bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; @@ -30,8 +29,7 @@ class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Window @SuppressWarnings("rawtypes") private final ProcessorNode myNode; - @SuppressWarnings("unchecked") - SessionCacheFlushListener(final ProcessorContext context) { + SessionCacheFlushListener(final ProcessorContext<Windowed<KOut>, Change<VOut>> context) { this.context = (InternalProcessorContext<Windowed<KOut>, Change<VOut>>) context; myNode = this.context.currentNode(); } @@ -44,7 +42,7 @@ class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Window @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { - context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end())); + context.forward(new Record<>(key, new Change<>(newValue, oldValue), key.window().end())); } finally { context.setCurrentNode(prev); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java index bad255a..ac475a4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java @@ -17,9 +17,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.internals.CacheFlushListener; import org.apache.kafka.streams.state.internals.WrappedStateStore; @@ -32,13 +32,13 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; * @param <V> */ class SessionTupleForwarder<K, V> { - private final ProcessorContext context; + private final ProcessorContext<Windowed<K>, Change<V>> context; private final boolean sendOldValues; private final boolean cachingEnabled; @SuppressWarnings("unchecked") SessionTupleForwarder(final StateStore store, - final ProcessorContext context, + final ProcessorContext<Windowed<K>, Change<V>> context, final CacheFlushListener<Windowed<K>, V> flushListener, final boolean sendOldValues) { this.context = context; @@ -50,7 +50,10 @@ class SessionTupleForwarder<K, V> { final V newValue, final V oldValue) { if (!cachingEnabled) { - context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(key.window().end())); + context.forward(new Record<>( + key, + new Change<>(newValue, sendOldValues ? oldValue : null), + key.window().end())); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java index 4b39219..66ffdc0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate; import org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate; import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; public final class GraphGraceSearchUtil { private GraphGraceSearchUtil() {} @@ -69,10 +70,10 @@ public final class GraphGraceSearchUtil { return inheritedGrace; } + @SuppressWarnings("rawtypes") private static Long extractGracePeriod(final GraphNode node) { if (node instanceof StatefulProcessorNode) { - @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. - final org.apache.kafka.streams.processor.ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().oldProcessorSupplier(); + final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().processorSupplier(); if (processorSupplier instanceof KStreamWindowAggregate) { final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier; final Windows windows = kStreamWindowAggregate.windows(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java index d2522e3..96cc278 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java @@ -27,7 +27,7 @@ package org.apache.kafka.streams.processor.api; */ public abstract class ContextualProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> { - protected ProcessorContext<KOut, VOut> context; + private ProcessorContext<KOut, VOut> context; protected ContextualProcessor() {} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index eba39a7..333884e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -590,7 +590,7 @@ public class KGroupedStreamImplTest { assertThat( appender.getMessages(), - hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] " + hasItem("Skipping record due to null key or value. topic=[topic] partition=[0] " + "offset=[6]") ); } @@ -640,7 +640,7 @@ public class KGroupedStreamImplTest { assertThat( appender.getMessages(), - hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] " + hasItem("Skipping record due to null key or value. topic=[topic] partition=[0] " + "offset=[6]") ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index a897d11..ab92b70 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -32,11 +32,11 @@ import org.apache.kafka.streams.kstream.Merger; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; -import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.processor.internals.ToInternal; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event; import org.apache.kafka.streams.state.KeyValueIterator; @@ -70,29 +70,27 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. public class KStreamSessionWindowAggregateProcessorTest { private static final long GAP_MS = 5 * 60 * 1000L; private static final String STORE_NAME = "session-store"; private final String threadId = Thread.currentThread().getName(); - private final ToInternal toInternal = new ToInternal(); private final Initializer<Long> initializer = () -> 0L; private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1; private final Merger<String, Long> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo; private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator = new KStreamSessionWindowAggregate<>( - SessionWindows.with(ofMillis(GAP_MS)), + SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)), STORE_NAME, initializer, aggregator, sessionMerger); private final List<KeyValueTimestamp<Windowed<String>, Change<Long>>> results = new ArrayList<>(); - private final org.apache.kafka.streams.processor.Processor<String, String> processor = sessionAggregator.get(); + private final Processor<String, String, Windowed<String>, Change<Long>> processor = sessionAggregator.get(); private SessionStore<String, Long> sessionStore; - private InternalMockProcessorContext context; + private InternalMockProcessorContext<Windowed<String>, Change<Long>> context; private final Metrics metrics = new Metrics(); @Before @@ -103,7 +101,7 @@ public class KStreamSessionWindowAggregateProcessorTest { private void setup(final boolean enableCache) { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime()); - context = new InternalMockProcessorContext<Object, Object>( + context = new InternalMockProcessorContext<Windowed<String>, Change<Long>>( TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), @@ -113,11 +111,9 @@ public class KStreamSessionWindowAggregateProcessorTest { new ThreadCache(new LogContext("testCache "), 100000, streamsMetrics), Time.SYSTEM ) { - @SuppressWarnings("unchecked") @Override - public void forward(final Object key, final Object value, final To to) { - toInternal.update(to); - results.add(new KeyValueTimestamp<>((Windowed<String>) key, (Change<Long>) value, toInternal.timestamp())); + public <K extends Windowed<String>, V extends Change<Long>> void forward(final Record<K, V> record) { + results.add(new KeyValueTimestamp<>(record.key(), record.value(), record.timestamp())); } }; TaskMetrics.droppedRecordsSensor(threadId, context.taskId().toString(), streamsMetrics); @@ -152,10 +148,8 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldCreateSingleSessionWhenWithinGap() { - context.setTime(0); - processor.process("john", "first"); - context.setTime(500); - processor.process("john", "second"); + processor.process(new Record<>("john", "first", 0L)); + processor.process(new Record<>("john", "second", 500L)); try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions("john", 0, 2000)) { @@ -166,20 +160,17 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldMergeSessions() { - context.setTime(0); final String sessionId = "mel"; - processor.process(sessionId, "first"); + processor.process(new Record<>(sessionId, "first", 0L)); assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext()); // move time beyond gap - context.setTime(GAP_MS + 1); - processor.process(sessionId, "second"); + processor.process(new Record<>(sessionId, "second", GAP_MS + 1)); assertTrue(sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext()); // should still exist as not within gap assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext()); // move time back - context.setTime(GAP_MS / 2); - processor.process(sessionId, "third"); + processor.process(new Record<>(sessionId, "third", GAP_MS / 2)); try (final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions(sessionId, 0, GAP_MS + 1)) { @@ -192,9 +183,8 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldUpdateSessionIfTheSameTime() { - context.setTime(0); - processor.process("mel", "first"); - processor.process("mel", "second"); + processor.process(new Record<>("mel", "first", 0L)); + processor.process(new Record<>("mel", "second", 0L)); try (final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("mel", 0, 0)) { assertEquals(Long.valueOf(2L), iterator.next().value); @@ -206,15 +196,14 @@ public class KStreamSessionWindowAggregateProcessorTest { public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() { final String sessionId = "mel"; long time = 0; - context.setTime(time); - processor.process(sessionId, "first"); - context.setTime(time += GAP_MS + 1); - processor.process(sessionId, "second"); - processor.process(sessionId, "second"); - context.setTime(time += GAP_MS + 1); - processor.process(sessionId, "third"); - processor.process(sessionId, "third"); - processor.process(sessionId, "third"); + processor.process(new Record<>(sessionId, "first", time)); + final long time1 = time += GAP_MS + 1; + processor.process(new Record<>(sessionId, "second", time1)); + processor.process(new Record<>(sessionId, "second", time1)); + final long time2 = time += GAP_MS + 1; + processor.process(new Record<>(sessionId, "third", time2)); + processor.process(new Record<>(sessionId, "third", time2)); + processor.process(new Record<>(sessionId, "third", time2)); sessionStore.flush(); assertEquals( @@ -239,8 +228,7 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldRemoveMergedSessionsFromStateStore() { - context.setTime(0); - processor.process("a", "1"); + processor.process(new Record<>("a", "1", 0L)); // first ensure it is in the store try (final KeyValueIterator<Windowed<String>, Long> a1 = @@ -249,8 +237,7 @@ public class KStreamSessionWindowAggregateProcessorTest { } - context.setTime(100); - processor.process("a", "2"); + processor.process(new Record<>("a", "2", 100L)); // a1 from above should have been removed // should have merged session in store try (final KeyValueIterator<Windowed<String>, Long> a2 = @@ -262,19 +249,15 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldHandleMultipleSessionsAndMerging() { - context.setTime(0); - processor.process("a", "1"); - processor.process("b", "1"); - processor.process("c", "1"); - processor.process("d", "1"); - context.setTime(GAP_MS / 2); - processor.process("d", "2"); - context.setTime(GAP_MS + 1); - processor.process("a", "2"); - processor.process("b", "2"); - context.setTime(GAP_MS + 1 + GAP_MS / 2); - processor.process("a", "3"); - processor.process("c", "3"); + processor.process(new Record<>("a", "1", 0L)); + processor.process(new Record<>("b", "1", 0L)); + processor.process(new Record<>("c", "1", 0L)); + processor.process(new Record<>("d", "1", 0L)); + processor.process(new Record<>("d", "2", GAP_MS / 2)); + processor.process(new Record<>("a", "2", GAP_MS + 1)); + processor.process(new Record<>("b", "2", GAP_MS + 1)); + processor.process(new Record<>("a", "3", GAP_MS + 1 + GAP_MS / 2)); + processor.process(new Record<>("c", "3", GAP_MS + 1 + GAP_MS / 2)); sessionStore.flush(); @@ -317,11 +300,9 @@ public class KStreamSessionWindowAggregateProcessorTest { public void shouldGetAggregatedValuesFromValueGetter() { final KTableValueGetter<Windowed<String>, Long> getter = sessionAggregator.view().get(); getter.init(context); - context.setTime(0); - processor.process("a", "1"); - context.setTime(GAP_MS + 1); - processor.process("a", "1"); - processor.process("a", "2"); + processor.process(new Record<>("a", "1", 0L)); + processor.process(new Record<>("a", "1", GAP_MS + 1)); + processor.process(new Record<>("a", "2", GAP_MS + 1)); final long t0 = getter.get(new Windowed<>("a", new SessionWindow(0, 0))).value(); final long t1 = getter.get(new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1))).value(); assertEquals(1L, t0); @@ -333,10 +314,9 @@ public class KStreamSessionWindowAggregateProcessorTest { initStore(false); processor.init(context); - context.setTime(0); - processor.process("a", "1"); - processor.process("b", "1"); - processor.process("c", "1"); + processor.process(new Record<>("a", "1", 0L)); + processor.process(new Record<>("b", "1", 0L)); + processor.process(new Record<>("c", "1", 0L)); assertEquals( Arrays.asList( @@ -362,10 +342,8 @@ public class KStreamSessionWindowAggregateProcessorTest { initStore(false); processor.init(context); - context.setTime(0); - processor.process("a", "1"); - context.setTime(5); - processor.process("a", "1"); + processor.process(new Record<>("a", "1", 0L)); + processor.process(new Record<>("a", "1", 5L)); assertEquals( Arrays.asList( new KeyValueTimestamp<>( @@ -396,14 +374,14 @@ public class KStreamSessionWindowAggregateProcessorTest { try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) { - processor.process(null, "1"); + processor.process(new Record<>(null, "1", 0L)); assertThat( appender.getEvents().stream() .filter(e -> e.getLevel().equals("WARN")) .map(Event::getMessage) .collect(Collectors.toList()), - hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-3] offset=[-2]") + hasItem("Skipping record due to null key. topic=[topic] partition=[-3] offset=[-2]") ); } @@ -416,8 +394,8 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() { setup(false); - final org.apache.kafka.streams.processor.Processor<String, String> processor = new KStreamSessionWindowAggregate<>( - SessionWindows.with(ofMillis(10L)).grace(ofMillis(0L)), + final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>( + SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(0L)), STORE_NAME, initializer, aggregator, @@ -427,27 +405,27 @@ public class KStreamSessionWindowAggregateProcessorTest { // dummy record to establish stream time = 0 context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders())); - processor.process("dummy", "dummy"); + processor.process(new Record<>("dummy", "dummy", 0L)); // record arrives on time, should not be skipped context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders())); - processor.process("OnTime1", "1"); + processor.process(new Record<>("OnTime1", "1", 0L)); // dummy record to advance stream time = 11, 10 for gap time plus 1 to place outside window context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders())); - processor.process("dummy", "dummy"); + processor.process(new Record<>("dummy", "dummy", 11L)); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) { // record is late context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders())); - processor.process("Late1", "1"); + processor.process(new Record<>("Late1", "1", 0L)); assertThat( appender.getMessages(), hasItem("Skipping record for expired window." + - " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]") + " topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]") ); } @@ -481,8 +459,8 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() { setup(false); - final org.apache.kafka.streams.processor.Processor<String, String> processor = new KStreamSessionWindowAggregate<>( - SessionWindows.with(ofMillis(10L)).grace(ofMillis(1L)), + final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>( + SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1L)), STORE_NAME, initializer, aggregator, @@ -495,32 +473,32 @@ public class KStreamSessionWindowAggregateProcessorTest { // dummy record to establish stream time = 0 context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders())); - processor.process("dummy", "dummy"); + processor.process(new Record<>("dummy", "dummy", 0L)); // record arrives on time, should not be skipped context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders())); - processor.process("OnTime1", "1"); + processor.process(new Record<>("OnTime1", "1", 0L)); // dummy record to advance stream time = 11, 10 for gap time plus 1 to place at edge of window context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders())); - processor.process("dummy", "dummy"); + processor.process(new Record<>("dummy", "dummy", 11L)); // delayed record arrives on time, should not be skipped context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders())); - processor.process("OnTime2", "1"); + processor.process(new Record<>("OnTime2", "1", 0L)); // dummy record to advance stream time = 12, 10 for gap time plus 2 to place outside window context.setRecordContext(new ProcessorRecordContext(12, -2, -3, "topic", new RecordHeaders())); - processor.process("dummy", "dummy"); + processor.process(new Record<>("dummy", "dummy", 12L)); // delayed record arrives late context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders())); - processor.process("Late1", "1"); + processor.process(new Record<>("Late1", "1", 0L)); assertThat( appender.getMessages(), hasItem("Skipping record for expired window." + - " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]") + " topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]") ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java index 798159d..38c3fa7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java @@ -726,7 +726,7 @@ public class KStreamSlidingWindowAggregateTest { .filter(e -> e.getLevel().equals("WARN")) .map(Event::getMessage) .collect(Collectors.toList()), - hasItem("Skipping record due to null key or value. value=[1] topic=[topic] partition=[0] offset=[0]") + hasItem("Skipping record due to null key or value. topic=[topic] partition=[0] offset=[0]") ); } } @@ -772,19 +772,19 @@ public class KStreamSlidingWindowAggregateTest { assertThat(appender.getMessages(), hasItems( // left window for k@100 - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[90,100] expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[90,100] expiration=[110] streamTime=[200]", // left window for k@101 - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[91,101] expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[91,101] expiration=[110] streamTime=[200]", // left window for k@102 - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[92,102] expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[92,102] expiration=[110] streamTime=[200]", // left window for k@103 - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[93,103] expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[93,103] expiration=[110] streamTime=[200]", // left window for k@104 - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[94,104] expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[94,104] expiration=[110] streamTime=[200]", // left window for k@105 - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[95,105] expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[95,105] expiration=[110] streamTime=[200]", // left window for k@15 - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] streamTime=[200]" + "Skipping record for expired window. topic=[topic] partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] streamTime=[200]" )); final TestOutputTopic<Windowed<String>, String> outputTopic = driver.createOutputTopic("output", new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), new StringDeserializer()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index b7759bb..df40c94 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -288,7 +288,7 @@ public class KStreamWindowAggregateTest { driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); inputTopic.pipeInput(null, "1"); - assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]")); + assertThat(appender.getMessages(), hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]")); } } @@ -335,13 +335,13 @@ public class KStreamWindowAggregateTest { ); assertThat(appender.getMessages(), hasItems( - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] streamTime=[100]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] streamTime=[100]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] streamTime=[100]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] streamTime=[100]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] streamTime=[100]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] streamTime=[100]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] streamTime=[100]" + "Skipping record for expired window. topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] streamTime=[100]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] streamTime=[100]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] streamTime=[100]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] streamTime=[100]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] streamTime=[100]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] streamTime=[100]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] streamTime=[100]" )); final TestOutputTopic<String, String> outputTopic = @@ -389,13 +389,13 @@ public class KStreamWindowAggregateTest { assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375)); assertThat(appender.getMessages(), hasItems( - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] streamTime=[200]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] streamTime=[200]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] streamTime=[200]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] streamTime=[200]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] streamTime=[200]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] streamTime=[200]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] streamTime=[200]" + "Skipping record for expired window. topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] streamTime=[200]", + "Skipping record for expired window. topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] streamTime=[200]" )); final TestOutputTopic<String, String> outputTopic = diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java index c60bcf4..da71149 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.junit.Test; @@ -35,9 +35,10 @@ public class SessionCacheFlushListenerTest { context.setCurrentNode(null); context.setCurrentNode(null); context.forward( - new Windowed<>("key", new SessionWindow(21L, 73L)), - new Change<>("newValue", "oldValue"), - To.all().withTimestamp(73L)); + new Record<>( + new Windowed<>("key", new SessionWindow(21L, 73L)), + new Change<>("newValue", "oldValue"), + 73L)); expectLastCall(); replay(context); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java index e99c684..f0a963d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java @@ -17,9 +17,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.junit.Test; @@ -57,25 +57,32 @@ public class SessionTupleForwarderTest { private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValued) { final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); - final ProcessorContext context = mock(ProcessorContext.class); + final ProcessorContext<Windowed<String>, Change<String>> context = mock( + ProcessorContext.class); expect(store.setFlushListener(null, sendOldValued)).andReturn(false); if (sendOldValued) { context.forward( - new Windowed<>("key", new SessionWindow(21L, 42L)), - new Change<>("value", "oldValue"), - To.all().withTimestamp(42L)); + new Record<>( + new Windowed<>("key", new SessionWindow(21L, 42L)), + new Change<>("value", "oldValue"), + 42L)); } else { context.forward( - new Windowed<>("key", new SessionWindow(21L, 42L)), - new Change<>("value", null), - To.all().withTimestamp(42L)); + new Record<>( + new Windowed<>("key", new SessionWindow(21L, 42L)), + new Change<>("value", null), + 42L)); } expectLastCall(); replay(store, context); - new SessionTupleForwarder<>(store, context, null, sendOldValued) - .maybeForward(new Windowed<>("key", new SessionWindow(21L, 42L)), "value", "oldValue"); + new SessionTupleForwarder<>(store, context, null, + sendOldValued) + .maybeForward( + new Windowed<>("key", new SessionWindow(21L, 42L)), + "value", + "oldValue"); verify(store, context); } @@ -83,7 +90,7 @@ public class SessionTupleForwarderTest { @Test public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() { final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); - final ProcessorContext context = mock(ProcessorContext.class); + final ProcessorContext<Windowed<String>, Change<String>> context = mock(ProcessorContext.class); expect(store.setFlushListener(null, false)).andReturn(true); replay(store, context);