This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch poc-478-ktable-1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit ddd0f5d0dd801bbcf9b0c7206e67a8dcfd4db7a4 Author: John Roesler <[email protected]> AuthorDate: Fri May 21 14:25:24 2021 -0500 builds --- .../streams/kstream/internals/KTableFilter.java | 2 +- .../kstream/internals/KTableValueGetter.java | 4 +-- .../internals/TimestampedCacheFlushListener.java | 1 + .../ForeignJoinSubscriptionProcessorSupplier.java | 2 +- .../SubscriptionStoreReceiveProcessorSupplier.java | 2 +- .../internals/AbstractProcessorContext.java | 2 +- .../internals/GlobalProcessorContextImpl.java | 2 +- .../processor/internals/GlobalStateUpdateTask.java | 5 ++-- .../processor/internals/ProcessorContextImpl.java | 2 +- .../state/internals/MeteredKeyValueStore.java | 30 +++++++++++++++++----- .../state/internals/MeteredSessionStore.java | 30 +++++++++++++++++----- .../state/internals/MeteredWindowStore.java | 30 +++++++++++++++++----- .../streams/kstream/internals/KStreamImplTest.java | 8 +++--- ...KStreamSessionWindowAggregateProcessorTest.java | 2 +- .../kstream/internals/KTableReduceTest.java | 2 +- .../internals/SessionCacheFlushListenerTest.java | 2 +- .../TimestampedCacheFlushListenerTest.java | 9 ++++--- .../internals/TimestampedTupleForwarderTest.java | 9 +++++-- .../internals/AbstractProcessorContextTest.java | 2 +- .../processor/internals/GlobalStateTaskTest.java | 6 ++--- .../processor/internals/ProcessorNodeTest.java | 8 +++--- .../internals/ProcessorTopologyFactories.java | 2 +- .../internals/RecordDeserializerTest.java | 2 +- .../processor/internals/RecordQueueTest.java | 6 +++-- .../streams/processor/internals/SinkNodeTest.java | 8 +++--- .../processor/internals/SourceNodeTest.java | 8 +++--- .../processor/internals/StreamTaskTest.java | 22 ++++++++-------- .../streams/state/KeyValueStoreTestDriver.java | 1 + .../state/internals/AbstractKeyValueStoreTest.java | 2 ++ .../AbstractRocksDBSegmentedBytesStoreTest.java | 2 +- .../internals/AbstractSessionBytesStoreTest.java | 3 ++- .../internals/AbstractWindowBytesStoreTest.java | 2 +- .../state/internals/CacheFlushListenerStub.java | 12 +++++++++ .../CachingInMemoryKeyValueStoreTest.java | 4 +-- .../internals/CachingInMemorySessionStoreTest.java | 18 +++++++++++-- .../CachingPersistentSessionStoreTest.java | 18 +++++++++++-- .../CachingPersistentWindowStoreTest.java | 4 +-- .../ChangeLoggingKeyValueBytesStoreTest.java | 3 ++- ...geLoggingTimestampedKeyValueBytesStoreTest.java | 3 ++- .../CompositeReadOnlyKeyValueStoreTest.java | 11 ++++++-- .../state/internals/InMemoryKeyValueStoreTest.java | 1 + .../state/internals/InMemoryLRUCacheStoreTest.java | 1 + .../state/internals/InMemoryWindowStoreTest.java | 1 + .../state/internals/KeyValueSegmentsTest.java | 2 +- .../MeteredTimestampedWindowStoreTest.java | 2 +- .../state/internals/MeteredWindowStoreTest.java | 2 +- .../streams/state/internals/RocksDBStoreTest.java | 3 ++- .../RocksDBTimeOrderedWindowStoreTest.java | 2 +- .../state/internals/RocksDBWindowStoreTest.java | 1 + .../state/internals/SegmentIteratorTest.java | 3 ++- .../state/internals/TimestampedSegmentsTest.java | 2 +- .../kafka/test/InternalMockProcessorContext.java | 8 +++--- .../kafka/test/MockInternalProcessorContext.java | 2 +- .../org/apache/kafka/test/MockProcessorNode.java | 2 +- .../java/org/apache/kafka/test/MockSourceNode.java | 4 +-- .../apache/kafka/test/NoOpProcessorContext.java | 2 +- 56 files changed, 227 insertions(+), 102 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index bbffea6..a23dce4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -166,7 +166,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn } @Override - public void init(final ProcessorContext<Void, Void> context) { + public void init(org.apache.kafka.streams.processor.ProcessorContext context) { parentGetter.init(context); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java index c939234..12145fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.ValueAndTimestamp; public interface KTableValueGetter<K, V> { - void init(ProcessorContext<Void, Void> context); + void init(ProcessorContext context); ValueAndTimestamp<V> get(K key); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java index 6dbf435..97ef6cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.CacheFlushListener; class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java index fd95105..3c12afa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java @@ -63,7 +63,7 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements Proc @Override public void init(final ProcessorContext context) { super.init(context); - final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; + final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context; droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor( Thread.currentThread().getName(), internalProcessorContext.taskId().toString(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java index 61fb1c1..98bcd4b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java @@ -60,7 +60,7 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO> @Override public void init(final ProcessorContext context) { super.init(context); - final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; + final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context; droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor( Thread.currentThread().getName(), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 37ffbdc..79b2d0d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -34,7 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -public abstract class AbstractProcessorContext implements InternalProcessorContext<Object, Object> { +public abstract class AbstractProcessorContext<KOut, VOut> implements InternalProcessorContext<KOut, VOut> { private final TaskId taskId; private final String applicationId; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index be3cf55..dbdd6a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -34,7 +34,7 @@ import java.time.Duration; import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; -public class GlobalProcessorContextImpl extends AbstractProcessorContext { +public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, Object> { private final GlobalStateManager stateManager; private final Time time; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 6b1378b..e5f591c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -69,7 +69,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic(); for (final String storeName : storeNames) { final String sourceTopic = storeNameToTopic.get(storeName); - final SourceNode<?, ?, ?, ?> source = topology.source(sourceTopic); + final SourceNode<?, ?> source = topology.source(sourceTopic); deserializers.put( sourceTopic, new RecordDeserializer( @@ -111,7 +111,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { processorContext.timestamp(), processorContext.headers() ); - ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess); + ((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess); } offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1); @@ -138,6 +138,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { } } + @SuppressWarnings("unchecked") private void initTopology() { for (final ProcessorNode<?, ?, ?, ?> node : this.topology.processors()) { processorContext.setCurrentNode(node); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index dcae2ab..bd7ece4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -42,7 +42,7 @@ import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDur import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore; import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; -public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { +public class ProcessorContextImpl extends AbstractProcessorContext<Object, Object> implements RecordCollector.Supplier { // the below are null for standby tasks private StreamTask streamTask; private RecordCollector collector; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 18c44e8..62542e6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -23,10 +23,12 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; @@ -173,12 +175,28 @@ public class MeteredKeyValueStore<K, V> final KeyValueStore<Bytes, byte[]> wrapped = wrapped(); if (wrapped instanceof CachedStateStore) { return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener( - (rawKey, rawNewValue, rawOldValue, timestamp) -> listener.apply( - serdes.keyFrom(rawKey), - rawNewValue != null ? serdes.valueFrom(rawNewValue) : null, - rawOldValue != null ? serdes.valueFrom(rawOldValue) : null, - timestamp - ), + new CacheFlushListener<byte[], byte[]>() { + @Override + public void apply(byte[] rawKey, byte[] rawNewValue, byte[] rawOldValue, long timestamp) { + listener.apply( + serdes.keyFrom(rawKey), + rawNewValue != null ? serdes.valueFrom(rawNewValue) : null, + rawOldValue != null ? serdes.valueFrom(rawOldValue) : null, + timestamp + ); + } + + @Override + public void apply(Record<byte[], Change<byte[]>> record) { + listener.apply( + record.withKey(serdes.keyFrom(record.key())) + .withValue(new Change<>( + record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null, + record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null + )) + ); + } + }, sendOldValues); } return false; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 1fbc8db..d305951 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -22,10 +22,12 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; @@ -144,12 +146,28 @@ public class MeteredSessionStore<K, V> final SessionStore<Bytes, byte[]> wrapped = wrapped(); if (wrapped instanceof CachedStateStore) { return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener( - (key, newValue, oldValue, timestamp) -> listener.apply( - SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()), - newValue != null ? serdes.valueFrom(newValue) : null, - oldValue != null ? serdes.valueFrom(oldValue) : null, - timestamp - ), + new CacheFlushListener<byte[], byte[]>() { + @Override + public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) { + listener.apply( + SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()), + newValue != null ? serdes.valueFrom(newValue) : null, + oldValue != null ? serdes.valueFrom(oldValue) : null, + timestamp + ); + } + + @Override + public void apply(Record<byte[], Change<byte[]>> record) { + listener.apply( + record.withKey(SessionKeySchema.from(record.key(), serdes.keyDeserializer(), serdes.topic())) + .withValue(new Change<>( + record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null, + record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null + )) + ); + } + }, sendOldValues); } return false; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 91b4387..82f65a6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -22,10 +22,12 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; @@ -148,12 +150,28 @@ public class MeteredWindowStore<K, V> final WindowStore<Bytes, byte[]> wrapped = wrapped(); if (wrapped instanceof CachedStateStore) { return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener( - (key, newValue, oldValue, timestamp) -> listener.apply( - WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()), - newValue != null ? serdes.valueFrom(newValue) : null, - oldValue != null ? serdes.valueFrom(oldValue) : null, - timestamp - ), + new CacheFlushListener<byte[], byte[]>() { + @Override + public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) { + listener.apply( + WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()), + newValue != null ? serdes.valueFrom(newValue) : null, + oldValue != null ? serdes.valueFrom(oldValue) : null, + timestamp + ); + } + + @Override + public void apply(Record<byte[], Change<byte[]>> record) { + listener.apply( + record.withKey(WindowKeySchema.fromStoreKey(record.key(), windowSizeMs, serdes.keyDeserializer(), serdes.topic())) + .withValue(new Change<>( + record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null, + record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null + )) + ); + } + }, sendOldValues); } return false; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index ac5db68..7bdcea8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -1525,9 +1525,9 @@ public class KStreamImplTest { final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology(); - final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1"); + final SourceNode<?, ?> originalSourceNode = topology.source("topic-1"); - for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) { + for (final SourceNode<?, ?> sourceNode : topology.sources()) { if (sourceNode.name().equals(originalSourceNode.name())) { assertNull(sourceNode.getTimestampExtractor()); } else { @@ -1554,9 +1554,9 @@ public class KStreamImplTest { final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology(); - final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1"); + final SourceNode<?, ?> originalSourceNode = topology.source("topic-1"); - for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) { + for (final SourceNode<?, ?> sourceNode : topology.sources()) { if (sourceNode.name().equals(originalSourceNode.name())) { assertNull(sourceNode.getTimestampExtractor()); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 0982337..9ee3347 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -98,7 +98,7 @@ public class KStreamSessionWindowAggregateProcessorTest { private void setup(final String builtInMetricsVersion, final boolean enableCache) { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, new MockTime()); - context = new InternalMockProcessorContext( + context = new InternalMockProcessorContext<Object, Object>( TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java index b360151..87d6e87 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java @@ -36,7 +36,7 @@ public class KTableReduceTest { @Test public void shouldAddAndSubtract() { - final InternalMockProcessorContext context = new InternalMockProcessorContext(); + final InternalMockProcessorContext<String, Change<Set<String>>> context = new InternalMockProcessorContext<>(); final Processor<String, Change<Set<String>>> reduceProcessor = new KTableReduce<String, Set<String>>( diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java index b25febf..a826d50 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java @@ -30,7 +30,7 @@ import static org.easymock.EasyMock.verify; public class SessionCacheFlushListenerTest { @Test public void shouldForwardKeyNewValueOldValueAndTimestamp() { - final InternalProcessorContext context = mock(InternalProcessorContext.class); + final InternalProcessorContext<Windowed<String>,Change<String>> context = mock(InternalProcessorContext.class); expect(context.currentNode()).andReturn(null).anyTimes(); context.setCurrentNode(null); context.setCurrentNode(null); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java index 38ef5c6..7c1b0e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.junit.Test; @@ -31,7 +32,7 @@ public class TimestampedCacheFlushListenerTest { @Test public void shouldForwardValueTimestampIfNewValueExists() { - final InternalProcessorContext context = mock(InternalProcessorContext.class); + final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class); expect(context.currentNode()).andReturn(null).anyTimes(); context.setCurrentNode(null); context.setCurrentNode(null); @@ -42,7 +43,7 @@ public class TimestampedCacheFlushListenerTest { expectLastCall(); replay(context); - new TimestampedCacheFlushListener<>(context).apply( + new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply( "key", ValueAndTimestamp.make("newValue", 42L), ValueAndTimestamp.make("oldValue", 21L), @@ -53,7 +54,7 @@ public class TimestampedCacheFlushListenerTest { @Test public void shouldForwardParameterTimestampIfNewValueIsNull() { - final InternalProcessorContext context = mock(InternalProcessorContext.class); + final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class); expect(context.currentNode()).andReturn(null).anyTimes(); context.setCurrentNode(null); context.setCurrentNode(null); @@ -64,7 +65,7 @@ public class TimestampedCacheFlushListenerTest { expectLastCall(); replay(context); - new TimestampedCacheFlushListener<>(context).apply( + new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply( "key", null, ValueAndTimestamp.make("oldValue", 21L), diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java index 52a5fcf..dc2767c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java @@ -39,12 +39,17 @@ public class TimestampedTupleForwarderTest { private void setFlushListener(final boolean sendOldValues) { final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class); - final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class); + final TimestampedCacheFlushListener<Object, ValueAndTimestamp<Object>> flushListener = mock(TimestampedCacheFlushListener.class); expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false); replay(store); - new TimestampedTupleForwarder<>(store, null, flushListener, sendOldValues); + new TimestampedTupleForwarder<>( + store, + (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<ValueAndTimestamp<Object>>>) null, + flushListener, + sendOldValues + ); verify(store); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index b813422..e4968e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -171,7 +171,7 @@ public class AbstractProcessorContextTest { ); } - private static class TestProcessorContext extends AbstractProcessorContext { + private static class TestProcessorContext extends AbstractProcessorContext<Object, Object> { static Properties config; static { config = getStreamsConfig(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index e4bc600..31be9dc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -61,10 +61,10 @@ public class GlobalStateTaskTest { private final String topic2 = "t2"; private final TopicPartition t1 = new TopicPartition(topic1, 1); private final TopicPartition t2 = new TopicPartition(topic2, 1); - private final MockSourceNode<String, String, ?, ?> sourceOne = new MockSourceNode<>( + private final MockSourceNode<String, String> sourceOne = new MockSourceNode<>( new StringDeserializer(), new StringDeserializer()); - private final MockSourceNode<Integer, Integer, ?, ?> sourceTwo = new MockSourceNode<>( + private final MockSourceNode<Integer, Integer> sourceTwo = new MockSourceNode<>( new IntegerDeserializer(), new IntegerDeserializer()); private final MockProcessorNode<?, ?, ?, ?> processorOne = new MockProcessorNode<>(); @@ -81,7 +81,7 @@ public class GlobalStateTaskTest { @Before public void before() { final Set<String> storeNames = Utils.mkSet("t1-store", "t2-store"); - final Map<String, SourceNode<?, ?, ?, ?>> sourceByTopics = new HashMap<>(); + final Map<String, SourceNode<?, ?>> sourceByTopics = new HashMap<>(); sourceByTopics.put(topic1, sourceOne); sourceByTopics.put(topic2, sourceTwo); final Map<String, String> storeToTopic = new HashMap<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 84bdf51..ef46ab3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -110,8 +110,8 @@ public class ProcessorNodeTest { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion, new MockTime()); - final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics); - final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet()); + final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics); + final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet()); node.init(context); final String threadId = Thread.currentThread().getName(); @@ -196,8 +196,8 @@ public class ProcessorNodeTest { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); - final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics); - final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet()); + final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics); + final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet()); node.init(context); final StreamsException se = assertThrows( StreamsException.class, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java index b4cef8a..57e4490 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java @@ -27,7 +27,7 @@ public final class ProcessorTopologyFactories { public static ProcessorTopology with(final List<ProcessorNode<?, ?, ?, ?>> processorNodes, - final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic, + final Map<String, SourceNode<?, ?>> sourcesByTopic, final List<StateStore> stateStoresByName, final Map<String, String> storeToChangelogTopic) { return new ProcessorTopology(processorNodes, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java index 18d17ae..448ceaf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java @@ -68,7 +68,7 @@ public class RecordDeserializerTest { assertEquals(rawRecord.headers(), record.headers()); } - static class TheSourceNode extends SourceNode<Object, Object, Object, Object> { + static class TheSourceNode extends SourceNode<Object, Object> { private final boolean keyThrowsException; private final boolean valueThrowsException; private final Object key; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 142e85a..d23311b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -62,11 +62,12 @@ public class RecordQueueTest { private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); - final InternalMockProcessorContext context = new InternalMockProcessorContext( + @SuppressWarnings("rawtypes") + final InternalMockProcessorContext context = new InternalMockProcessorContext<>( StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), new MockRecordCollector() ); - private final MockSourceNode<Integer, Integer, ?, ?> mockSourceNodeWithMetrics + private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics = new MockSourceNode<>(intDeserializer, intDeserializer); private final RecordQueue queue = new RecordQueue( new TopicPartition("topic", 1), @@ -86,6 +87,7 @@ public class RecordQueueTest { private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); + @SuppressWarnings("unchecked") @Before public void before() { mockSourceNodeWithMetrics.init(context); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index 30c7b1b..7e7f7b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -33,13 +33,13 @@ public class SinkNodeTest { private final StateSerdes<Bytes, Bytes> anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); private final Serializer<byte[]> anySerializer = Serdes.ByteArray().serializer(); private final RecordCollector recordCollector = new MockRecordCollector(); - private final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde, recordCollector); - private final SinkNode<byte[], byte[], ?, ?> sink = new SinkNode<>("anyNodeName", + private final InternalMockProcessorContext<Void, Void> context = new InternalMockProcessorContext<>(anyStateSerde, recordCollector); + private final SinkNode<byte[], byte[]> sink = new SinkNode<>("anyNodeName", new StaticTopicNameExtractor<>("any-output-topic"), anySerializer, anySerializer, null); // Used to verify that the correct exceptions are thrown if the compiler checks are bypassed - @SuppressWarnings("unchecked") - private final SinkNode<Object, Object, ?, ?> illTypedSink = (SinkNode) sink; + @SuppressWarnings({"unchecked", "rawtypes"}) + private final SinkNode<Object, Object> illTypedSink = (SinkNode) sink; @Before public void before() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index 92e4719..00c5647 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -44,7 +44,7 @@ import static org.junit.Assert.assertTrue; public class SourceNodeTest { @Test public void shouldProvideTopicHeadersAndDataToKeyDeserializer() { - final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer()); + final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer()); final RecordHeaders headers = new RecordHeaders(); final String deserializeKey = sourceNode.deserializeKey("topic", headers, "data".getBytes(StandardCharsets.UTF_8)); assertThat(deserializeKey, is("topic" + headers + "data")); @@ -52,7 +52,7 @@ public class SourceNodeTest { @Test public void shouldProvideTopicHeadersAndDataToValueDeserializer() { - final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer()); + final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer()); final RecordHeaders headers = new RecordHeaders(); final String deserializedValue = sourceNode.deserializeValue("topic", headers, "data".getBytes(StandardCharsets.UTF_8)); assertThat(deserializedValue, is("topic" + headers + "data")); @@ -84,8 +84,8 @@ public class SourceNodeTest { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion, new MockTime()); - final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics); - final SourceNode<String, String, ?, ?> node = + final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics); + final SourceNode<String, String> node = new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); node.init(context); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 9763c4f..0e0f314 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -137,9 +137,9 @@ public class StreamTaskTest { private final Serializer<Integer> intSerializer = Serdes.Integer().serializer(); private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer(); - private final MockSourceNode<Integer, Integer, Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer); - private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer); - private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) { + private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) { @Override public void process(final Record<Integer, Integer> record) { throw new RuntimeException("KABOOM!"); @@ -150,7 +150,7 @@ public class StreamTaskTest { throw new RuntimeException("KABOOM!"); } }; - private final MockSourceNode<Integer, Integer, ?, ?> timeoutSource = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) { + private final MockSourceNode<Integer, Integer> timeoutSource = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) { @Override public void process(final Record<Integer, Integer> record) { throw new TimeoutException("Kaboom!"); @@ -192,7 +192,7 @@ public class StreamTaskTest { }; private static ProcessorTopology withRepartitionTopics(final List<ProcessorNode<?, ?, ?, ?>> processorNodes, - final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic, + final Map<String, SourceNode<?, ?>> sourcesByTopic, final Set<String> repartitionTopics) { return new ProcessorTopology(processorNodes, sourcesByTopic, @@ -204,7 +204,7 @@ public class StreamTaskTest { } private static ProcessorTopology withSources(final List<ProcessorNode<?, ?, ?, ?>> processorNodes, - final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic) { + final Map<String, SourceNode<?, ?>> sourcesByTopic) { return new ProcessorTopology(processorNodes, sourcesByTopic, emptyMap(), @@ -622,11 +622,11 @@ public class StreamTaskTest { metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time); // Create a processor that only forwards even keys to test the metrics at the source and terminal nodes - final MockSourceNode<Integer, Integer, Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer, Integer, Integer>(intDeserializer, intDeserializer) { - InternalProcessorContext context; + final MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) { + InternalProcessorContext<Integer, Integer> context; @Override - public void init(final InternalProcessorContext context) { + public void init(final InternalProcessorContext<Integer, Integer> context) { this.context = context; super.init(context); } @@ -1528,6 +1528,7 @@ public class StreamTaskTest { assertFalse(checkpointFile.exists()); } + @SuppressWarnings("unchecked") @Test public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() { task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST); @@ -1568,6 +1569,7 @@ public class StreamTaskTest { assertThrows(IllegalStateException.class, () -> task.schedule(1, PunctuationType.STREAM_TIME, timestamp -> { })); } + @SuppressWarnings("unchecked") @Test public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() { task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST); @@ -2480,7 +2482,7 @@ public class StreamTaskTest { ); } - private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode<Integer, Integer, Integer, Integer> sourceNode) { + private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode<Integer, Integer> sourceNode) { final ProcessorTopology topology = withSources( asList(sourceNode, processorStreamTime), singletonMap(topic1, sourceNode) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 38e9860..3f438f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -187,6 +187,7 @@ public class KeyValueStoreTestDriver<K, V> { private final InternalMockProcessorContext context; private final StateSerdes<K, V> stateSerdes; + @SuppressWarnings("unchecked") private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) { props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id"); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 5959a80..e050a21 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -49,6 +49,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +@SuppressWarnings("unchecked") public abstract class AbstractKeyValueStoreTest { protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context); @@ -80,6 +81,7 @@ public abstract class AbstractKeyValueStoreTest { return result; } + @SuppressWarnings("unchecked") @Test public void shouldNotIncludeDeletedFromRangeResult() { store.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index b103e98..da1050a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -124,7 +124,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment> bytesStore = getBytesStore(); stateDir = TestUtils.tempDirectory(); - context = new InternalMockProcessorContext( + context = new InternalMockProcessorContext<>( stateDir, Serdes.String(), Serdes.Long(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index ed60837..b005905 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -89,7 +89,7 @@ public abstract class AbstractSessionBytesStoreTest { public void setUp() { sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.String(), Serdes.Long()); recordCollector = new MockRecordCollector(); - context = new InternalMockProcessorContext( + context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), @@ -536,6 +536,7 @@ public abstract class AbstractSessionBytesStoreTest { assertFalse(iterator.hasNext()); } + @SuppressWarnings("unchecked") @Test public void shouldRestore() { final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index 08fcb6d..2f9432a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -101,7 +101,7 @@ public abstract class AbstractWindowBytesStoreTest { windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false, Serdes.Integer(), Serdes.String()); recordCollector = new MockRecordCollector(); - context = new InternalMockProcessorContext( + context = new InternalMockProcessorContext<>( baseDir, Serdes.String(), Serdes.Integer(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java index ea4b147..fba59cf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.processor.api.Record; import java.util.HashMap; import java.util.Map; @@ -46,4 +47,15 @@ public class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], ) ); } + + @Override + public void apply(Record<byte[], Change<byte[]>> record) { + forwarded.put( + keyDeserializer.deserialize(null, record.key()), + new Change<>( + valueDeserializer.deserialize(null, record.value().newValue), + valueDeserializer.deserialize(null, record.value().oldValue) + ) + ); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java index 66b13c1..ff78642 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java @@ -74,7 +74,7 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest store = new CachingKeyValueStore(underlyingStore); store.setFlushListener(cacheFlushListener, false); cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics())); - context = new InternalMockProcessorContext(null, null, null, null, cache); + context = new InternalMockProcessorContext<>(null, null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null)); store.init((StateStoreContext) context, null); } @@ -200,7 +200,7 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest EasyMock.replay(underlyingStore); store = new CachingKeyValueStore(underlyingStore); cache = EasyMock.niceMock(ThreadCache.class); - context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null)); store.init((StateStoreContext) context, store); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java index e584e2c..5885a59 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; @@ -87,7 +88,7 @@ public class CachingInMemorySessionStoreTest { underlyingStore = new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope"); cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); - context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } @@ -223,7 +224,7 @@ public class CachingInMemorySessionStoreTest { EasyMock.replay(underlyingStore); cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = EasyMock.niceMock(ThreadCache.class); - final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + final InternalMockProcessorContext context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } @@ -752,5 +753,18 @@ public class CachingInMemorySessionStoreTest { valueDesializer.deserialize(null, oldValue)), timestamp)); } + + @Override + public void apply(Record<byte[], Change<byte[]>> record) { + forwarded.add( + new KeyValueTimestamp<>( + keyDeserializer.deserialize(null, record.key()), + new Change<>( + valueDesializer.deserialize(null, record.value().newValue), + valueDesializer.deserialize(null, record.value().oldValue)), + record.timestamp() + ) + ); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java index 7f8a394..224c8bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; @@ -92,7 +93,7 @@ public class CachingPersistentSessionStoreTest { cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); final InternalMockProcessorContext context = - new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } @@ -209,7 +210,7 @@ public class CachingPersistentSessionStoreTest { cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = EasyMock.niceMock(ThreadCache.class); final InternalMockProcessorContext context = - new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } @@ -763,5 +764,18 @@ public class CachingPersistentSessionStoreTest { ) ); } + + @Override + public void apply(Record<byte[], Change<byte[]>> record) { + forwarded.add( + new KeyValueTimestamp<>( + keyDeserializer.deserialize(null, record.key()), + new Change<>( + valueDesializer.deserialize(null, record.value().newValue), + valueDesializer.deserialize(null, record.value().oldValue)), + record.timestamp() + ) + ); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 13f1f2b..e434c21 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -103,7 +103,7 @@ public class CachingPersistentWindowStoreTest { cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL); cachingStore.setFlushListener(cacheListener, false); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); - context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } @@ -906,7 +906,7 @@ public class CachingPersistentWindowStoreTest { EasyMock.replay(underlyingStore); cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL); cache = EasyMock.createNiceMock(ThreadCache.class); - context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 255994c..9e2532d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -46,6 +46,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +@SuppressWarnings("ALL") public class ChangeLoggingKeyValueBytesStoreTest { private final MockRecordCollector collector = new MockRecordCollector(); @@ -64,7 +65,7 @@ public class ChangeLoggingKeyValueBytesStoreTest { } private InternalMockProcessorContext mockContext() { - return new InternalMockProcessorContext( + return new InternalMockProcessorContext<>( TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java index 8295f7d..d65d948 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java @@ -42,6 +42,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +@SuppressWarnings("rawtypes") public class ChangeLoggingTimestampedKeyValueBytesStoreTest { private final MockRecordCollector collector = new MockRecordCollector(); @@ -64,7 +65,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest { } private InternalMockProcessorContext mockContext() { - return new InternalMockProcessorContext( + return new InternalMockProcessorContext<>( TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index 736721a..ca8468b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -76,8 +76,15 @@ public class CompositeReadOnlyKeyValueStoreTest { Serdes.String()) .build(); - final InternalMockProcessorContext context = new InternalMockProcessorContext(new StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName), - Serdes.String(), Serdes.String()), new MockRecordCollector()); + @SuppressWarnings("rawtypes") final InternalMockProcessorContext context = + new InternalMockProcessorContext<>( + new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic("appId", storeName), + Serdes.String(), + Serdes.String() + ), + new MockRecordCollector() + ); context.setTime(1L); store.init((StateStoreContext) context, store); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index 831e684..87a2063 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -80,6 +80,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { return store; } + @SuppressWarnings("unchecked") @Test public void shouldRemoveKeysWithNullValues() { store.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index a044eda..53057b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -132,6 +132,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest { assertEquals(3, driver.numFlushedEntryRemoved()); } + @SuppressWarnings("unchecked") @Test public void testRestoreEvict() { store.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java index 2ef9bad..2a9b5bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java @@ -67,6 +67,7 @@ public class InMemoryWindowStoreTest extends AbstractWindowBytesStoreTest { LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); } + @SuppressWarnings("unchecked") @Test public void shouldRestore() { // should be empty initially diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java index aeef8ce..c8f1a0e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java @@ -55,7 +55,7 @@ public class KeyValueSegmentsTest { @Before public void createContext() { stateDirectory = TestUtils.tempDirectory(); - context = new InternalMockProcessorContext( + context = new InternalMockProcessorContext<>( stateDirectory, Serdes.String(), Serdes.Long(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java index c05c1ba..4266751 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java @@ -87,7 +87,7 @@ public class MeteredTimestampedWindowStoreTest { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime()); - context = new InternalMockProcessorContext( + context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 538c8d3..b18919a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -132,7 +132,7 @@ public class MeteredWindowStoreTest { public void setUp() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, new MockTime()); - context = new InternalMockProcessorContext( + context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index c9e9a8d..14166be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -87,6 +87,7 @@ import static org.junit.Assert.assertTrue; import static org.powermock.api.easymock.PowerMock.replay; import static org.powermock.api.easymock.PowerMock.verify; +@SuppressWarnings("unchecked") public class RocksDBStoreTest { private static boolean enableBloomFilters = false; final static String DB_NAME = "db-name"; @@ -107,7 +108,7 @@ public class RocksDBStoreTest { final Properties props = StreamsTestUtils.getStreamsConfig(); props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); dir = TestUtils.tempDirectory(); - context = new InternalMockProcessorContext( + context = new InternalMockProcessorContext<>( dir, Serdes.String(), Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java index 2646c4c..95c88ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java @@ -59,7 +59,7 @@ public class RocksDBTimeOrderedWindowStoreTest { windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String()); recordCollector = new MockRecordCollector(); - context = new InternalMockProcessorContext( + context = new InternalMockProcessorContext<>( baseDir, Serdes.String(), Serdes.Integer(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 5643cde..fff4ae1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -491,6 +491,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { ); } + @SuppressWarnings("unchecked") @Test public void testRestore() throws Exception { final long startTime = SEGMENT_INTERVAL * 2; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index 97593e7..c7e5924 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -53,9 +53,10 @@ public class SegmentIteratorTest { private SegmentIterator<KeyValueSegment> iterator = null; + @SuppressWarnings("rawtypes") @Before public void before() { - final InternalMockProcessorContext context = new InternalMockProcessorContext( + final InternalMockProcessorContext context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java index 558f1c9..722cb69 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java @@ -55,7 +55,7 @@ public class TimestampedSegmentsTest { @Before public void createContext() { stateDirectory = TestUtils.tempDirectory(); - context = new InternalMockProcessorContext( + context = new InternalMockProcessorContext<>( stateDirectory, Serdes.String(), Serdes.Long(), diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 8d39bf3..b19fdf5 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -59,8 +59,8 @@ import java.util.Map; import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt; -public class InternalMockProcessorContext - extends AbstractProcessorContext +public class InternalMockProcessorContext<KOut, VOut> + extends AbstractProcessorContext<KOut, VOut> implements RecordCollector.Supplier { private StateManager stateManager = new StateManagerStub(); @@ -290,13 +290,13 @@ public class InternalMockProcessorContext public void commit() {} @Override - public <K, V> void forward(final Record<K, V> record) { + public <K extends KOut, V extends VOut> void forward(final Record<K, V> record) { forward(record, null); } @SuppressWarnings("unchecked") @Override - public <K, V> void forward(final Record<K, V> record, final String childName) { + public <K extends KOut, V extends VOut> void forward(final Record<K, V> record, final String childName) { if (recordContext != null && record.timestamp() != recordContext.timestamp()) { setTime(record.timestamp()); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java index 370dca7..c32c136 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -40,7 +40,7 @@ import java.util.Optional; import java.util.Properties; import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; -public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext { +public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> { private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>(); private ProcessorNode currentNode; diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java index a75c250..4ab4cb8b 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java @@ -53,7 +53,7 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, } @Override - public void init(final InternalProcessorContext context) { + public void init(final InternalProcessorContext<KOut, VOut> context) { super.init(context); initialized = true; } diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java index 9d22e3b..f52134e 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java +++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.internals.SourceNode; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; -public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> { +public class MockSourceNode<KIn, VIn> extends SourceNode<KIn, VIn> { private static final String NAME = "MOCK-SOURCE-"; private static final AtomicInteger INDEX = new AtomicInteger(1); @@ -47,7 +47,7 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K } @Override - public void init(final InternalProcessorContext context) { + public void init(final InternalProcessorContext<KIn, VIn> context) { super.init(context); initialized = true; } diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index a3ec02b..b3243cd 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -44,7 +44,7 @@ import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; -public class NoOpProcessorContext extends AbstractProcessorContext { +public class NoOpProcessorContext extends AbstractProcessorContext<Object, Object> { public boolean initialized; @SuppressWarnings("WeakerAccess") public Map<Object, Object> forwardedValues = new HashMap<>();
