This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch poc-478-ktable-1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b4f09abdec9ebc29ae028cce25b977ac0ed87061 Author: John Roesler <[email protected]> AuthorDate: Fri May 21 16:01:56 2021 -0500 fix tests --- .../java/org/apache/kafka/streams/kstream/internals/KTableFilter.java | 2 ++ .../java/org/apache/kafka/streams/kstream/internals/KTableImpl.java | 2 +- .../kafka/streams/kstream/internals/SessionCacheFlushListener.java | 2 +- .../kafka/streams/kstream/internals/TimestampedTupleForwarder.java | 2 +- .../kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java | 1 - .../streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java | 2 +- 6 files changed, 6 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index ecbd4703..3d97408 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -167,6 +167,8 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn @Override public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { + // This is the old processor context for compatibility with the other KTable processors. + // Once we migrte them all, we can swap this out. parentGetter.init(context); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 01cd194..faee7af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -832,7 +832,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< return new KTableSourceValueGetterSupplier<>(source.queryableName()); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view(); - } else if (processorSupplier instanceof KTableNewProcessorSupplier){ + } else if (processorSupplier instanceof KTableNewProcessorSupplier) { return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view(); } else { return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java index 2792dd9..daa7c64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java @@ -55,7 +55,7 @@ class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Window @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { - context.forward(record); + context.forward(record.withTimestamp(record.key().window().end())); } finally { context.setCurrentNode(prev); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java index e5733eb..6411b35 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java @@ -58,7 +58,7 @@ class TimestampedTupleForwarder<K, V> { public void maybeForward(final Record<K, Change<V>> record) { if (!cachingEnabled) { - if(sendOldValues) { + if (sendOldValues) { context.forward(record); } else { context.forward(record.withValue(new Change<>(record.value().newValue, null))); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java index 8b9dccb..89b732e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 9e2532d..5f524fa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -46,7 +46,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; -@SuppressWarnings("ALL") +@SuppressWarnings("rawtypes") public class ChangeLoggingKeyValueBytesStoreTest { private final MockRecordCollector collector = new MockRecordCollector();
