This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch poc-478-ktable-1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 82642732d400465084f42029e54ec703e2242318 Author: John Roesler <[email protected]> AuthorDate: Fri May 21 15:32:31 2021 -0500 fixing tests --- .../streams/kstream/internals/KTableFilter.java | 4 +- .../streams/kstream/internals/KTableImpl.java | 8 ++++ .../NewTimestampedCacheFlushListener.java | 51 ---------------------- .../internals/SessionCacheFlushListener.java | 2 +- .../internals/TimestampedCacheFlushListener.java | 27 +++++++++--- .../internals/TimestampedTupleForwarder.java | 22 +++++++--- .../org/apache/kafka/streams/processor/To.java | 7 +++ .../streams/processor/internals/ProcessorNode.java | 1 - .../state/internals/MeteredKeyValueStore.java | 4 +- .../state/internals/MeteredSessionStore.java | 4 +- .../state/internals/MeteredWindowStore.java | 4 +- .../internals/SessionCacheFlushListenerTest.java | 2 +- .../TimestampedCacheFlushListenerTest.java | 8 ++-- .../internals/TimestampedTupleForwarderTest.java | 23 +++++++--- .../state/internals/CacheFlushListenerStub.java | 2 +- .../internals/CachingInMemorySessionStoreTest.java | 2 +- .../CachingPersistentSessionStoreTest.java | 2 +- 17 files changed, 87 insertions(+), 86 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index a23dce4..ecbd4703 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -105,7 +105,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn } @Override - public void process(Record<KIn, Change<VIn>> record) { + public void process(final Record<KIn, Change<VIn>> record) { final KIn key = record.key(); final Change<VIn> change = record.value(); @@ -166,7 +166,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn } @Override - public void init(org.apache.kafka.streams.processor.ProcessorContext context) { + public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { parentGetter.init(context); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 9733511..01cd194 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -832,6 +832,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< return new KTableSourceValueGetterSupplier<>(source.queryableName()); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view(); + } else if (processorSupplier instanceof KTableNewProcessorSupplier){ + return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view(); } else { return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view(); } @@ -848,6 +850,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< source.enableSendingOldValues(); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues(); + } else if (processorSupplier instanceof KTableNewProcessorSupplier) { + final KTableNewProcessorSupplier<?, ?, ?, ?> tableProcessorSupplier = + (KTableNewProcessorSupplier<?, ?, ?, ?>) processorSupplier; + if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) { + return false; + } } else { final KTableProcessorSupplier<K, S, V> tableProcessorSupplier = (KTableProcessorSupplier<K, S, V>) processorSupplier; if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java deleted file mode 100644 index d325459..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.processor.api.ProcessorContext; -import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.InternalProcessorContext; -import org.apache.kafka.streams.processor.internals.ProcessorNode; -import org.apache.kafka.streams.state.internals.CacheFlushListener; - -class NewTimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> { - private final InternalProcessorContext<KOut, Change<VOut>> context; - - @SuppressWarnings("rawtypes") - private final ProcessorNode myNode; - - NewTimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>> context) { - this.context = (InternalProcessorContext<KOut, Change<VOut>>) context; - myNode = this.context.currentNode(); - } - - @Override - public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) { - throw new RuntimeException("ASDFASDF"); - } - - @Override - public void apply(Record<KOut, Change<VOut>> record) { - @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); - context.setCurrentNode(myNode); - try { - context.forward(record); - } finally { - context.setCurrentNode(prev); - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java index ceff4b7..2792dd9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java @@ -51,7 +51,7 @@ class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Window } @Override - public void apply(Record<Windowed<KOut>, Change<VOut>> record) { + public void apply(final Record<Windowed<KOut>, Change<VOut>> record) { @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java index 97ef6cb..4034414 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java @@ -24,7 +24,9 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.CacheFlushListener; -class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> { +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, ValueAndTimestamp<VOut>> { private final InternalProcessorContext<KOut, Change<VOut>> context; @SuppressWarnings("rawtypes") @@ -42,22 +44,35 @@ class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KO } @Override - public void apply(Record<KOut, Change<VOut>> record) { - @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); + public void apply(final KOut key, + final ValueAndTimestamp<VOut> newValue, + final ValueAndTimestamp<VOut> oldValue, + final long timestamp) { + final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { - context.forward(record); + context.forward( + key, + new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)), + To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp)); } finally { context.setCurrentNode(prev); } } @Override - public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) { + public void apply(final Record<KOut, Change<ValueAndTimestamp<VOut>>> record) { @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { - context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp)); + context.forward( + record.withValue( + new Change<>( + getValueOrNull(record.value().newValue), + getValueOrNull(record.value().oldValue) + ) + ) + ); } finally { context.setCurrentNode(prev); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java index 729e9fd..e5733eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; */ class TimestampedTupleForwarder<K, V> { private final InternalProcessorContext<K, Change<V>> context; + private final boolean sendOldValues; private final boolean cachingEnabled; @SuppressWarnings({"unchecked", "rawtypes"}) @@ -41,6 +42,7 @@ class TimestampedTupleForwarder<K, V> { final TimestampedCacheFlushListener<K, V> flushListener, final boolean sendOldValues) { this.context = (InternalProcessorContext<K, Change<V>>) context; + this.sendOldValues = sendOldValues; cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); } @@ -50,24 +52,34 @@ class TimestampedTupleForwarder<K, V> { final TimestampedCacheFlushListener<K, V> flushListener, final boolean sendOldValues) { this.context = (InternalProcessorContext) context; + this.sendOldValues = sendOldValues; cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); } public void maybeForward(final Record<K, Change<V>> record) { if (!cachingEnabled) { - context.forward(record); + if(sendOldValues) { + context.forward(record); + } else { + context.forward(record.withValue(new Change<>(record.value().newValue, null))); + } } } - public void maybeForward(K key, V value, V oldValue) { + public void maybeForward(final K key, + final V newValue, + final V oldValue) { if (!cachingEnabled) { - context.forward(key, new Change<>(value, oldValue)); + context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null)); } } - public void maybeForward(K key, V value, V oldValue, long newTimestamp) { + public void maybeForward(final K key, + final V newValue, + final V oldValue, + final long timestamp) { if (!cachingEnabled) { - context.forward(key, new Change<>(value, oldValue), To.all().withTimestamp(newTimestamp)); + context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(timestamp)); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java index fe19dbf..69c0c5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/To.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java @@ -89,4 +89,11 @@ public class To { throw new UnsupportedOperationException("To is unsafe for use in Hash collections"); } + @Override + public String toString() { + return "To{" + + "childName='" + childName + '\'' + + ", timestamp=" + timestamp + + '}'; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 80fdc4f..f221c57 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 62542e6..94624a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -177,7 +177,7 @@ public class MeteredKeyValueStore<K, V> return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener( new CacheFlushListener<byte[], byte[]>() { @Override - public void apply(byte[] rawKey, byte[] rawNewValue, byte[] rawOldValue, long timestamp) { + public void apply(final byte[] rawKey, final byte[] rawNewValue, final byte[] rawOldValue, final long timestamp) { listener.apply( serdes.keyFrom(rawKey), rawNewValue != null ? serdes.valueFrom(rawNewValue) : null, @@ -187,7 +187,7 @@ public class MeteredKeyValueStore<K, V> } @Override - public void apply(Record<byte[], Change<byte[]>> record) { + public void apply(final Record<byte[], Change<byte[]>> record) { listener.apply( record.withKey(serdes.keyFrom(record.key())) .withValue(new Change<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index d305951..3b858e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -148,7 +148,7 @@ public class MeteredSessionStore<K, V> return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener( new CacheFlushListener<byte[], byte[]>() { @Override - public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) { + public void apply(final byte[] key, final byte[] newValue, final byte[] oldValue, final long timestamp) { listener.apply( SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()), newValue != null ? serdes.valueFrom(newValue) : null, @@ -158,7 +158,7 @@ public class MeteredSessionStore<K, V> } @Override - public void apply(Record<byte[], Change<byte[]>> record) { + public void apply(final Record<byte[], Change<byte[]>> record) { listener.apply( record.withKey(SessionKeySchema.from(record.key(), serdes.keyDeserializer(), serdes.topic())) .withValue(new Change<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 82f65a6..f77bf4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -152,7 +152,7 @@ public class MeteredWindowStore<K, V> return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener( new CacheFlushListener<byte[], byte[]>() { @Override - public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) { + public void apply(final byte[] key, final byte[] newValue, final byte[] oldValue, final long timestamp) { listener.apply( WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()), newValue != null ? serdes.valueFrom(newValue) : null, @@ -162,7 +162,7 @@ public class MeteredWindowStore<K, V> } @Override - public void apply(Record<byte[], Change<byte[]>> record) { + public void apply(final Record<byte[], Change<byte[]>> record) { listener.apply( record.withKey(WindowKeySchema.fromStoreKey(record.key(), windowSizeMs, serdes.keyDeserializer(), serdes.topic())) .withValue(new Change<>( diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java index a826d50..c60bcf4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java @@ -30,7 +30,7 @@ import static org.easymock.EasyMock.verify; public class SessionCacheFlushListenerTest { @Test public void shouldForwardKeyNewValueOldValueAndTimestamp() { - final InternalProcessorContext<Windowed<String>,Change<String>> context = mock(InternalProcessorContext.class); + final InternalProcessorContext<Windowed<String>, Change<String>> context = mock(InternalProcessorContext.class); expect(context.currentNode()).andReturn(null).anyTimes(); context.setCurrentNode(null); context.setCurrentNode(null); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java index 7c1b0e7..7c25b2e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java @@ -32,7 +32,7 @@ public class TimestampedCacheFlushListenerTest { @Test public void shouldForwardValueTimestampIfNewValueExists() { - final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class); + final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class); expect(context.currentNode()).andReturn(null).anyTimes(); context.setCurrentNode(null); context.setCurrentNode(null); @@ -43,7 +43,7 @@ public class TimestampedCacheFlushListenerTest { expectLastCall(); replay(context); - new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply( + new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<String>>) context).apply( "key", ValueAndTimestamp.make("newValue", 42L), ValueAndTimestamp.make("oldValue", 21L), @@ -54,7 +54,7 @@ public class TimestampedCacheFlushListenerTest { @Test public void shouldForwardParameterTimestampIfNewValueIsNull() { - final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class); + final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class); expect(context.currentNode()).andReturn(null).anyTimes(); context.setCurrentNode(null); context.setCurrentNode(null); @@ -65,7 +65,7 @@ public class TimestampedCacheFlushListenerTest { expectLastCall(); replay(context); - new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply( + new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<String>>) context).apply( "key", null, ValueAndTimestamp.make("oldValue", 21L), diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java index dc2767c..8b9dccb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.junit.Test; @@ -39,14 +40,14 @@ public class TimestampedTupleForwarderTest { private void setFlushListener(final boolean sendOldValues) { final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class); - final TimestampedCacheFlushListener<Object, ValueAndTimestamp<Object>> flushListener = mock(TimestampedCacheFlushListener.class); + final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class); expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false); replay(store); new TimestampedTupleForwarder<>( store, - (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<ValueAndTimestamp<Object>>>) null, + (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<Object>>) null, flushListener, sendOldValues ); @@ -62,7 +63,7 @@ public class TimestampedTupleForwarderTest { private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) { final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); - final ProcessorContext context = mock(ProcessorContext.class); + final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class); expect(store.setFlushListener(null, sendOldValues)).andReturn(false); if (sendOldValues) { @@ -76,7 +77,12 @@ public class TimestampedTupleForwarderTest { replay(store, context); final TimestampedTupleForwarder<String, String> forwarder = - new TimestampedTupleForwarder<>(store, context, null, sendOldValues); + new TimestampedTupleForwarder<>( + store, + (org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context, + null, + sendOldValues + ); forwarder.maybeForward("key1", "newValue1", "oldValue1"); forwarder.maybeForward("key2", "newValue2", "oldValue2", 42L); @@ -86,13 +92,18 @@ public class TimestampedTupleForwarderTest { @Test public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() { final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); - final ProcessorContext context = mock(ProcessorContext.class); + final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class); expect(store.setFlushListener(null, false)).andReturn(true); replay(store, context); final TimestampedTupleForwarder<String, String> forwarder = - new TimestampedTupleForwarder<>(store, context, null, false); + new TimestampedTupleForwarder<>( + store, + (org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context, + null, + false + ); forwarder.maybeForward("key", "newValue", "oldValue"); forwarder.maybeForward("key", "newValue", "oldValue", 42L); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java index fba59cf..b214739 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java @@ -49,7 +49,7 @@ public class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], } @Override - public void apply(Record<byte[], Change<byte[]>> record) { + public void apply(final Record<byte[], Change<byte[]>> record) { forwarded.put( keyDeserializer.deserialize(null, record.key()), new Change<>( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java index 5885a59..417b35f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java @@ -755,7 +755,7 @@ public class CachingInMemorySessionStoreTest { } @Override - public void apply(Record<byte[], Change<byte[]>> record) { + public void apply(final Record<byte[], Change<byte[]>> record) { forwarded.add( new KeyValueTimestamp<>( keyDeserializer.deserialize(null, record.key()), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java index 224c8bd..55018bf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java @@ -766,7 +766,7 @@ public class CachingPersistentSessionStoreTest { } @Override - public void apply(Record<byte[], Change<byte[]>> record) { + public void apply(final Record<byte[], Change<byte[]>> record) { forwarded.add( new KeyValueTimestamp<>( keyDeserializer.deserialize(null, record.key()),
