This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch poc-478-ktable-1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 8707d5c5b1b1806f1c92101665c8935fb43245f1 Author: John Roesler <[email protected]> AuthorDate: Fri May 21 13:19:35 2021 -0500 wip --- .../streams/kstream/internals/KTableFilter.java | 74 ++++++++++++---------- .../streams/kstream/internals/KTableImpl.java | 20 +++++- .../internals/KTableNewProcessorSupplier.java | 40 ++++++++++++ .../kstream/internals/KTableValueGetter.java | 4 +- ....java => NewTimestampedCacheFlushListener.java} | 29 +++++---- .../internals/SessionCacheFlushListener.java | 29 +++++++-- .../internals/TimestampedCacheFlushListener.java | 43 ++++++++----- .../internals/TimestampedTupleForwarder.java | 42 +++++++----- .../internals/AbstractProcessorContext.java | 2 +- .../internals/InternalProcessorContext.java | 4 +- .../internals/InternalTopologyBuilder.java | 51 ++++++++------- .../streams/processor/internals/ProcessorNode.java | 5 +- .../processor/internals/ProcessorTopology.java | 32 +++++----- .../processor/internals/RecordDeserializer.java | 6 +- .../streams/processor/internals/RecordQueue.java | 6 +- .../streams/processor/internals/SinkNode.java | 8 +-- .../streams/processor/internals/SourceNode.java | 6 +- .../streams/processor/internals/StreamTask.java | 11 +++- .../state/internals/CacheFlushListener.java | 8 +++ 19 files changed, 269 insertions(+), 151 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index 76753a4..bbffea6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -17,23 +17,23 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; -class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { - private final KTableImpl<K, ?, V> parent; - private final Predicate<? super K, ? super V> predicate; +class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VIn> { + private final KTableImpl<KIn, ?, VIn> parent; + private final Predicate<? super KIn, ? super VIn> predicate; private final boolean filterNot; private final String queryableName; private boolean sendOldValues; - KTableFilter(final KTableImpl<K, ?, V> parent, - final Predicate<? super K, ? super V> predicate, + KTableFilter(final KTableImpl<KIn, ?, VIn> parent, + final Predicate<? super KIn, ? super VIn> predicate, final boolean filterNot, final String queryableName) { this.parent = parent; @@ -45,7 +45,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { } @Override - public Processor<K, Change<V>> get() { + public Processor<KIn, Change<VIn>, KIn, Change<VIn>> get() { return new KTableFilterProcessor(); } @@ -62,8 +62,8 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { return sendOldValues; } - private V computeValue(final K key, final V value) { - V newValue = null; + private VIn computeValue(final KIn key, final VIn value) { + VIn newValue = null; if (value != null && (filterNot ^ predicate.test(key, value))) { newValue = value; @@ -72,11 +72,11 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { return newValue; } - private ValueAndTimestamp<V> computeValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) { - ValueAndTimestamp<V> newValueAndTimestamp = null; + private ValueAndTimestamp<VIn> computeValue(final KIn key, final ValueAndTimestamp<VIn> valueAndTimestamp) { + ValueAndTimestamp<VIn> newValueAndTimestamp = null; if (valueAndTimestamp != null) { - final V value = valueAndTimestamp.value(); + final VIn value = valueAndTimestamp.value(); if (filterNot ^ predicate.test(key, value)) { newValueAndTimestamp = valueAndTimestamp; } @@ -86,13 +86,14 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { } - private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> { - private TimestampedKeyValueStore<K, V> store; - private TimestampedTupleForwarder<K, V> tupleForwarder; + private class KTableFilterProcessor implements Processor<KIn, Change<VIn>, KIn, Change<VIn>> { + private ProcessorContext<KIn, Change<VIn>> context; + private TimestampedKeyValueStore<KIn, VIn> store; + private TimestampedTupleForwarder<KIn, VIn> tupleForwarder; @Override - public void init(final ProcessorContext context) { - super.init(context); + public void init(final ProcessorContext<KIn, Change<VIn>> context) { + this.context = context; if (queryableName != null) { store = context.getStateStore(queryableName); tupleForwarder = new TimestampedTupleForwarder<>( @@ -104,23 +105,26 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { } @Override - public void process(final K key, final Change<V> change) { - final V newValue = computeValue(key, change.newValue); - final V oldValue = computeOldValue(key, change); + public void process(Record<KIn, Change<VIn>> record) { + final KIn key = record.key(); + final Change<VIn> change = record.value(); + + final VIn newValue = computeValue(key, change.newValue); + final VIn oldValue = computeOldValue(key, change); if (sendOldValues && oldValue == null && newValue == null) { return; // unnecessary to forward here. } if (queryableName != null) { - store.put(key, ValueAndTimestamp.make(newValue, context().timestamp())); - tupleForwarder.maybeForward(key, newValue, oldValue); + store.put(key, ValueAndTimestamp.make(newValue, record.timestamp())); + tupleForwarder.maybeForward(record.withValue(new Change<>(newValue, oldValue))); } else { - context().forward(key, new Change<>(newValue, oldValue)); + context.forward(record.withValue(new Change<>(newValue, oldValue))); } } - private V computeOldValue(final K key, final Change<V> change) { + private VIn computeOldValue(final KIn key, final Change<VIn> change) { if (!sendOldValues) { return null; } @@ -132,16 +136,16 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { } @Override - public KTableValueGetterSupplier<K, V> view() { + public KTableValueGetterSupplier<KIn, VIn> view() { // if the KTable is materialized, use the materialized store to return getter value; // otherwise rely on the parent getter and apply filter on-the-fly if (queryableName != null) { return new KTableMaterializedValueGetterSupplier<>(queryableName); } else { - return new KTableValueGetterSupplier<K, V>() { - final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier(); + return new KTableValueGetterSupplier<KIn, VIn>() { + final KTableValueGetterSupplier<KIn, VIn> parentValueGetterSupplier = parent.valueGetterSupplier(); - public KTableValueGetter<K, V> get() { + public KTableValueGetter<KIn, VIn> get() { return new KTableFilterValueGetter(parentValueGetterSupplier.get()); } @@ -154,20 +158,20 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { } - private class KTableFilterValueGetter implements KTableValueGetter<K, V> { - private final KTableValueGetter<K, V> parentGetter; + private class KTableFilterValueGetter implements KTableValueGetter<KIn, VIn> { + private final KTableValueGetter<KIn, VIn> parentGetter; - KTableFilterValueGetter(final KTableValueGetter<K, V> parentGetter) { + KTableFilterValueGetter(final KTableValueGetter<KIn, VIn> parentGetter) { this.parentGetter = parentGetter; } @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext<Void, Void> context) { parentGetter.init(context); } @Override - public ValueAndTimestamp<V> get(final K key) { + public ValueAndTimestamp<VIn> get(final KIn key) { return computeValue(key, parentGetter.get(key)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 52f7b5f..a2b8702 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -125,6 +125,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< private static final String SINK_NAME = "KTABLE-SINK-"; private final ProcessorSupplier<?, ?> processorSupplier; + private final org.apache.kafka.streams.processor.api.ProcessorSupplier<?, ?, ?, ?> newProcessorSupplier; private final String queryableStoreName; @@ -140,6 +141,21 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final InternalStreamsBuilder builder) { super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder); this.processorSupplier = processorSupplier; + this.newProcessorSupplier = null; + this.queryableStoreName = queryableStoreName; + } + + public KTableImpl(final String name, + final Serde<K> keySerde, + final Serde<V> valueSerde, + final Set<String> subTopologySourceNodes, + final String queryableStoreName, + final org.apache.kafka.streams.processor.api.ProcessorSupplier<?, ?, ?, ?> newProcessorSupplier, + final GraphNode graphNode, + final InternalStreamsBuilder builder) { + super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder); + this.processorSupplier = null; + this.newProcessorSupplier = newProcessorSupplier; this.queryableStoreName = queryableStoreName; } @@ -179,7 +195,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< } final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME); - final KTableProcessorSupplier<K, V, V> processorSupplier = + final KTableNewProcessorSupplier<K, V, K, V> processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); final ProcessorParameters<K, V, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( @@ -194,7 +210,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< builder.addGraphNode(this.graphNode, tableNode); - return new KTableImpl<>( + return new KTableImpl<K, V, V>( name, keySerde, valueSerde, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java new file mode 100644 index 0000000..ef8ea30 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.api.ProcessorSupplier; + +public interface KTableNewProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> { + + KTableValueGetterSupplier<KOut, VOut> view(); + + /** + * Potentially enables sending old values. + * <p> + * If {@code forceMaterialization} is {@code true}, the method will force the materialization of upstream nodes to + * enable sending old values. + * <p> + * If {@code forceMaterialization} is {@code false}, the method will only enable the sending of old values <i>if</i> + * an upstream node is already materialized. + * + * @param forceMaterialization indicates if an upstream node should be forced to materialize to enable sending old + * values. + * @return {@code true} is sending old values is enabled, i.e. either because {@code forceMaterialization} was + * {@code true} or some upstream node is materialized. + */ + boolean enableSendingOldValues(boolean forceMaterialization); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java index 12145fa..c939234 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.state.ValueAndTimestamp; public interface KTableValueGetter<K, V> { - void init(ProcessorContext context); + void init(ProcessorContext<Void, Void> context); ValueAndTimestamp<V> get(K key); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java similarity index 61% copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java index f40fdfe..d325459 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java @@ -16,31 +16,34 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.internals.CacheFlushListener; -class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> { - private final InternalProcessorContext context; +class NewTimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> { + private final InternalProcessorContext<KOut, Change<VOut>> context; + + @SuppressWarnings("rawtypes") private final ProcessorNode myNode; - SessionCacheFlushListener(final ProcessorContext context) { - this.context = (InternalProcessorContext) context; + NewTimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>> context) { + this.context = (InternalProcessorContext<KOut, Change<VOut>>) context; myNode = this.context.currentNode(); } @Override - public void apply(final Windowed<K> key, - final V newValue, - final V oldValue, - final long timestamp) { - final ProcessorNode prev = context.currentNode(); + public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) { + throw new RuntimeException("ASDFASDF"); + } + + @Override + public void apply(Record<KOut, Change<VOut>> record) { + @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { - context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end())); + context.forward(record); } finally { context.setCurrentNode(prev); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java index f40fdfe..ceff4b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java @@ -19,25 +19,29 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.internals.CacheFlushListener; -class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> { - private final InternalProcessorContext context; +class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Windowed<KOut>, VOut> { + private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> context; + + @SuppressWarnings("rawtypes") private final ProcessorNode myNode; + @SuppressWarnings("unchecked") SessionCacheFlushListener(final ProcessorContext context) { - this.context = (InternalProcessorContext) context; + this.context = (InternalProcessorContext<Windowed<KOut>, Change<VOut>>) context; myNode = this.context.currentNode(); } @Override - public void apply(final Windowed<K> key, - final V newValue, - final V oldValue, + public void apply(final Windowed<KOut> key, + final VOut newValue, + final VOut oldValue, final long timestamp) { - final ProcessorNode prev = context.currentNode(); + @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end())); @@ -45,4 +49,15 @@ class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, context.setCurrentNode(prev); } } + + @Override + public void apply(Record<Windowed<KOut>, Change<VOut>> record) { + @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); + context.setCurrentNode(myNode); + try { + context.forward(record); + } finally { + context.setCurrentNode(prev); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java index 5540376..6dbf435 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java @@ -16,36 +16,47 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; -import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.CacheFlushListener; -import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; +class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> { + private final InternalProcessorContext<KOut, Change<VOut>> context; -class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, ValueAndTimestamp<V>> { - private final InternalProcessorContext context; + @SuppressWarnings("rawtypes") private final ProcessorNode myNode; - TimestampedCacheFlushListener(final ProcessorContext context) { - this.context = (InternalProcessorContext) context; + TimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>> context) { + this.context = (InternalProcessorContext<KOut, Change<VOut>>) context; myNode = this.context.currentNode(); } + @SuppressWarnings("unchecked") + TimestampedCacheFlushListener(final org.apache.kafka.streams.processor.ProcessorContext context) { + this.context = (InternalProcessorContext<KOut, Change<VOut>>) context; + myNode = this.context.currentNode(); + } + + @Override + public void apply(Record<KOut, Change<VOut>> record) { + @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); + context.setCurrentNode(myNode); + try { + context.forward(record); + } finally { + context.setCurrentNode(prev); + } + } + @Override - public void apply(final K key, - final ValueAndTimestamp<V> newValue, - final ValueAndTimestamp<V> oldValue, - final long timestamp) { - final ProcessorNode prev = context.currentNode(); + public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) { + @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { - context.forward( - key, - new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)), - To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp)); + context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp)); } finally { context.setCurrentNode(prev); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java index 910dd8f..729e9fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.internals.WrappedStateStore; /** @@ -30,34 +32,42 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; * @param <V> the type of the value */ class TimestampedTupleForwarder<K, V> { - private final ProcessorContext context; - private final boolean sendOldValues; + private final InternalProcessorContext<K, Change<V>> context; private final boolean cachingEnabled; - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) TimestampedTupleForwarder(final StateStore store, - final ProcessorContext context, + final ProcessorContext<K, Change<V>> context, final TimestampedCacheFlushListener<K, V> flushListener, final boolean sendOldValues) { - this.context = context; - this.sendOldValues = sendOldValues; + this.context = (InternalProcessorContext<K, Change<V>>) context; cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); } - public void maybeForward(final K key, - final V newValue, - final V oldValue) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TimestampedTupleForwarder(final StateStore store, + final org.apache.kafka.streams.processor.ProcessorContext context, + final TimestampedCacheFlushListener<K, V> flushListener, + final boolean sendOldValues) { + this.context = (InternalProcessorContext) context; + cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); + } + + public void maybeForward(final Record<K, Change<V>> record) { + if (!cachingEnabled) { + context.forward(record); + } + } + + public void maybeForward(K key, V value, V oldValue) { if (!cachingEnabled) { - context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null)); + context.forward(key, new Change<>(value, oldValue)); } } - public void maybeForward(final K key, - final V newValue, - final V oldValue, - final long timestamp) { + public void maybeForward(K key, V value, V oldValue, long newTimestamp) { if (!cachingEnabled) { - context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(timestamp)); + context.forward(key, new Change<>(value, oldValue), To.all().withTimestamp(newTimestamp)); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 09a2e31..37ffbdc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -34,7 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -public abstract class AbstractProcessorContext implements InternalProcessorContext { +public abstract class AbstractProcessorContext implements InternalProcessorContext<Object, Object> { private final TaskId taskId; private final String applicationId; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index ba5c580..88e47e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -34,8 +34,8 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from * {@link ThreadCache} */ -public interface InternalProcessorContext - extends ProcessorContext, org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>, StateStoreContext { +public interface InternalProcessorContext<KOut, VOut> + extends ProcessorContext, org.apache.kafka.streams.processor.api.ProcessorContext<KOut, VOut>, StateStoreContext { BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer(); ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 838cff9..4a54f01 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -243,7 +243,7 @@ public class InternalTopologyBuilder { // even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory private final Map<String, Pattern> topicToPatterns = new HashMap<>(); - private class SourceNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> { + private class SourceNodeFactory<KIn, VIn> extends NodeFactory<KIn, VIn, KIn, VIn> { private final List<String> topics; private final Pattern pattern; private final Deserializer<KIn> keyDeserializer; @@ -291,7 +291,7 @@ public class InternalTopologyBuilder { } @Override - public ProcessorNode<KIn, VIn, KOut, VOut> build() { + public ProcessorNode<KIn, VIn, KIn, VIn> build() { return new SourceNode<>(name, timestampExtractor, keyDeserializer, valDeserializer); } @@ -305,7 +305,7 @@ public class InternalTopologyBuilder { } } - private class SinkNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> { + private class SinkNodeFactory<KIn, VIn> extends NodeFactory<KIn, VIn, Void, Void> { private final Serializer<KIn> keySerializer; private final Serializer<VIn> valSerializer; private final StreamPartitioner<? super KIn, ? super VIn> partitioner; @@ -325,7 +325,7 @@ public class InternalTopologyBuilder { } @Override - public ProcessorNode<KIn, VIn, KOut, VOut> build() { + public ProcessorNode<KIn, VIn, Void, Void> build() { if (topicExtractor instanceof StaticTopicNameExtractor) { final String topic = ((StaticTopicNameExtractor<KIn, VIn>) topicExtractor).topicName; if (internalTopicNamesWithProperties.containsKey(topic)) { @@ -761,12 +761,12 @@ public class InternalTopologyBuilder { } } - private Set<SourceNodeFactory<?, ?, ?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) { - final Set<SourceNodeFactory<?, ?, ?, ?>> sourceNodes = new HashSet<>(); + private Set<SourceNodeFactory<?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) { + final Set<SourceNodeFactory<?, ?>> sourceNodes = new HashSet<>(); for (final String predecessor : predecessors) { final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(predecessor); if (nodeFactory instanceof SourceNodeFactory) { - sourceNodes.add((SourceNodeFactory<?, ?, ?, ?>) nodeFactory); + sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory); } else if (nodeFactory instanceof ProcessorNodeFactory) { sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?, ?, ?, ?>) nodeFactory).predecessors)); } @@ -787,10 +787,10 @@ public class InternalTopologyBuilder { final Set<String> sourceTopics = new HashSet<>(); final Set<Pattern> sourcePatterns = new HashSet<>(); - final Set<SourceNodeFactory<?, ?, ?, ?>> sourceNodesForPredecessor = + final Set<SourceNodeFactory<?, ?>> sourceNodesForPredecessor = findSourcesForProcessorPredecessors(processorNodeFactory.predecessors); - for (final SourceNodeFactory<?, ?, ?, ?> sourceNodeFactory : sourceNodesForPredecessor) { + for (final SourceNodeFactory<?, ?> sourceNodeFactory : sourceNodesForPredecessor) { if (sourceNodeFactory.pattern != null) { sourcePatterns.add(sourceNodeFactory.pattern); } else { @@ -925,8 +925,8 @@ public class InternalTopologyBuilder { Objects.requireNonNull(applicationId, "topology has not completed optimization"); final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new LinkedHashMap<>(); - final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new HashMap<>(); - final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>(); + final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>(); + final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>(); final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>(); final Set<String> repartitionTopics = new HashSet<>(); @@ -946,15 +946,15 @@ public class InternalTopologyBuilder { } else if (factory instanceof SourceNodeFactory) { buildSourceNode(topicSourceMap, repartitionTopics, - (SourceNodeFactory<?, ?, ?, ?>) factory, - (SourceNode<?, ?, ?, ?>) node); + (SourceNodeFactory<?, ?>) factory, + (SourceNode<?, ?>) node); } else if (factory instanceof SinkNodeFactory) { buildSinkNode(processorMap, topicSinkMap, repartitionTopics, - (SinkNodeFactory<?, ?, ?, ?>) factory, - (SinkNode<Object, Object, ?, ?>) node); + (SinkNodeFactory<?, ?>) factory, + (SinkNode<?, ?>) node); } else { throw new TopologyException("Unknown definition class: " + factory.getClass().getName()); } @@ -971,13 +971,16 @@ public class InternalTopologyBuilder { } private void buildSinkNode(final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap, - final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap, + final Map<String, SinkNode<?, ?>> topicSinkMap, final Set<String> repartitionTopics, - final SinkNodeFactory<?, ?, ?, ?> sinkNodeFactory, - final SinkNode<Object, Object, ?, ?> node) { + final SinkNodeFactory<?, ?> sinkNodeFactory, + final SinkNode<?, ?> node) { + @SuppressWarnings("unchecked") final ProcessorNode<Object, Object, ?, ?> sinkNode = + (ProcessorNode<Object, Object, ?, ?>) node; for (final String predecessorName : sinkNodeFactory.predecessors) { - getProcessor(processorMap, predecessorName).addChild(node); + final ProcessorNode<Object, Object, Object, Object> processor = getProcessor(processorMap, predecessorName); + processor.addChild(sinkNode); if (sinkNodeFactory.topicExtractor instanceof StaticTopicNameExtractor) { final String topic = ((StaticTopicNameExtractor<?, ?>) sinkNodeFactory.topicExtractor).topicName; @@ -1002,10 +1005,10 @@ public class InternalTopologyBuilder { return (ProcessorNode<KIn, VIn, KOut, VOut>) processorMap.get(predecessor); } - private void buildSourceNode(final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap, + private void buildSourceNode(final Map<String, SourceNode<?, ?>> topicSourceMap, final Set<String> repartitionTopics, - final SourceNodeFactory<?, ?, ?, ?> sourceNodeFactory, - final SourceNode<?, ?, ?, ?> node) { + final SourceNodeFactory<?, ?> sourceNodeFactory, + final SourceNode<?, ?> node) { final List<String> topics = (sourceNodeFactory.pattern != null) ? sourceNodeFactory.getTopics(subscriptionUpdates()) : @@ -1168,7 +1171,7 @@ public class InternalTopologyBuilder { private void setRegexMatchedTopicsToSourceNodes() { if (hasSubscriptionUpdates()) { for (final String nodeName : nodeToSourcePatterns.keySet()) { - final SourceNodeFactory<?, ?, ?, ?> sourceNode = (SourceNodeFactory<?, ?, ?, ?>) nodeFactories.get(nodeName); + final SourceNodeFactory<?, ?> sourceNode = (SourceNodeFactory<?, ?>) nodeFactories.get(nodeName); final List<String> sourceTopics = sourceNode.getTopics(subscriptionUpdates); //need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex nodeToSourceTopics.put(nodeName, sourceTopics); @@ -1358,7 +1361,7 @@ public class InternalTopologyBuilder { final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(nodeName); if (nodeFactory instanceof SourceNodeFactory) { - final List<String> topics = ((SourceNodeFactory<?, ?, ?, ?>) nodeFactory).topics; + final List<String> topics = ((SourceNodeFactory<?, ?>) nodeFactory).topics; return topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0)); } return false; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index a8c32c7..80fdc4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -105,8 +105,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> { childByName.put(child.name, child); } - @SuppressWarnings("unchecked") - public void init(final InternalProcessorContext context) { + public void init(final InternalProcessorContext<KOut, VOut> context) { if (!closed) throw new IllegalStateException("The processor is not closed"); @@ -116,7 +115,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> { maybeMeasureLatency( () -> { if (processor != null) { - processor.init((ProcessorContext<KOut, VOut>) context); + processor.init(context); } }, time, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 0a0118a..8f1a460 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -33,9 +33,9 @@ public class ProcessorTopology { private final Logger log = LoggerFactory.getLogger(ProcessorTopology.class); private final List<ProcessorNode<?, ?, ?, ?>> processorNodes; - private final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByName; - private final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByTopic; - private final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic; + private final Map<String, SourceNode<?, ?>> sourceNodesByName; + private final Map<String, SourceNode<?, ?>> sourceNodesByTopic; + private final Map<String, SinkNode<?, ?>> sinksByTopic; private final Set<String> terminalNodes; private final List<StateStore> stateStores; private final Set<String> stateStoreNames; @@ -46,8 +46,8 @@ public class ProcessorTopology { private final Map<String, String> storeToChangelogTopic; public ProcessorTopology(final List<ProcessorNode<?, ?, ?, ?>> processorNodes, - final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByTopic, - final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic, + final Map<String, SourceNode<?, ?>> sourceNodesByTopic, + final Map<String, SinkNode<?, ?>> sinksByTopic, final List<StateStore> stateStores, final List<StateStore> globalStateStores, final Map<String, String> storeToChangelogTopic, @@ -69,7 +69,7 @@ public class ProcessorTopology { } this.sourceNodesByName = new HashMap<>(); - for (final SourceNode<?, ?, ?, ?> source : sourceNodesByTopic.values()) { + for (final SourceNode<?, ?> source : sourceNodesByTopic.values()) { sourceNodesByName.put(source.name(), source); } } @@ -78,11 +78,11 @@ public class ProcessorTopology { return sourceNodesByTopic.keySet(); } - public SourceNode<?, ?, ?, ?> source(final String topic) { + public SourceNode<?, ?> source(final String topic) { return sourceNodesByTopic.get(topic); } - public Set<SourceNode<?, ?, ?, ?>> sources() { + public Set<SourceNode<?, ?>> sources() { return new HashSet<>(sourceNodesByTopic.values()); } @@ -90,7 +90,7 @@ public class ProcessorTopology { return sinksByTopic.keySet(); } - public SinkNode<?, ?, ?, ?> sink(final String topic) { + public SinkNode<?, ?> sink(final String topic) { return sinksByTopic.get(topic); } @@ -151,9 +151,9 @@ public class ProcessorTopology { public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) { sourceNodesByTopic.clear(); - for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : sourceNodesByName.entrySet()) { + for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourceNodesByName.entrySet()) { final String sourceNodeName = sourceNodeEntry.getKey(); - final SourceNode<?, ?, ?, ?> sourceNode = sourceNodeEntry.getValue(); + final SourceNode<?, ?> sourceNode = sourceNodeEntry.getValue(); final List<String> updatedSourceTopics = allSourceTopicsByNodeName.get(sourceNodeName); if (updatedSourceTopics == null) { @@ -211,10 +211,10 @@ public class ProcessorTopology { * @return A string representation of this instance. */ public String toString(final String indent) { - final Map<SourceNode<?, ?, ?, ?>, List<String>> sourceToTopics = new HashMap<>(); - for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : sourceNodesByTopic.entrySet()) { + final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>(); + for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourceNodesByTopic.entrySet()) { final String topic = sourceNodeEntry.getKey(); - final SourceNode<?, ?, ?, ?> source = sourceNodeEntry.getValue(); + final SourceNode<?, ?> source = sourceNodeEntry.getValue(); sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>()); sourceToTopics.get(source).add(topic); } @@ -222,8 +222,8 @@ public class ProcessorTopology { final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n"); // start from sources - for (final Map.Entry<SourceNode<?, ?, ?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) { - final SourceNode<?, ?, ?, ?> source = sourceNodeEntry.getKey(); + for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) { + final SourceNode<?, ?> source = sourceNodeEntry.getKey(); final List<String> topics = sourceNodeEntry.getValue(); sb.append(source.toString(indent + "\t")) .append(topicsToString(indent + "\t", topics)) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index 20f1449..a965187 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -31,11 +31,11 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXC class RecordDeserializer { private final Logger log; - private final SourceNode<?, ?, ?, ?> sourceNode; + private final SourceNode<?, ?> sourceNode; private final Sensor droppedRecordsSensor; private final DeserializationExceptionHandler deserializationExceptionHandler; - RecordDeserializer(final SourceNode<?, ?, ?, ?> sourceNode, + RecordDeserializer(final SourceNode<?, ?> sourceNode, final DeserializationExceptionHandler deserializationExceptionHandler, final LogContext logContext, final Sensor droppedRecordsSensor) { @@ -100,7 +100,7 @@ class RecordDeserializer { } } - SourceNode<?, ?, ?, ?> sourceNode() { + SourceNode<?, ?> sourceNode() { return sourceNode; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index df7e834..6f0db8a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -39,7 +39,7 @@ public class RecordQueue { public static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP; private final Logger log; - private final SourceNode<?, ?, ?, ?> source; + private final SourceNode<?, ?> source; private final TopicPartition partition; private final ProcessorContext processorContext; private final TimestampExtractor timestampExtractor; @@ -52,7 +52,7 @@ public class RecordQueue { private final Sensor droppedRecordsSensor; RecordQueue(final TopicPartition partition, - final SourceNode<?, ?, ?, ?> source, + final SourceNode<?, ?> source, final TimestampExtractor timestampExtractor, final DeserializationExceptionHandler deserializationExceptionHandler, final InternalProcessorContext processorContext, @@ -85,7 +85,7 @@ public class RecordQueue { * * @return SourceNode */ - public SourceNode<?, ?, ?, ?> source() { + public SourceNode<?, ?> source() { return source; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 2efa537..9091f3e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -24,14 +24,14 @@ import org.apache.kafka.streams.processor.api.Record; import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer; import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer; -public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> { +public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> { private Serializer<KIn> keySerializer; private Serializer<VIn> valSerializer; private final TopicNameExtractor<KIn, VIn> topicExtractor; private final StreamPartitioner<? super KIn, ? super VIn> partitioner; - private InternalProcessorContext context; + private InternalProcessorContext<Void, Void> context; SinkNode(final String name, final TopicNameExtractor<KIn, VIn> topicExtractor, @@ -50,12 +50,12 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut * @throws UnsupportedOperationException if this method adds a child to a sink node */ @Override - public void addChild(final ProcessorNode<KOut, VOut, ?, ?> child) { + public void addChild(final ProcessorNode<Void, Void, ?, ?> child) { throw new UnsupportedOperationException("sink node does not allow addChild"); } @Override - public void init(final InternalProcessorContext context) { + public void init(final InternalProcessorContext<Void, Void> context) { super.init(context); this.context = context; final Serializer<?> contextKeySerializer = ProcessorContextUtils.getKeySerializer(context); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 7198f2f..9ff3473 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -26,9 +26,9 @@ import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeyDeserializer; import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueDeserializer; -public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> { +public class SourceNode<KIn, VIn> extends ProcessorNode<KIn, VIn, KIn, VIn> { - private InternalProcessorContext context; + private InternalProcessorContext<KIn, VIn> context; private Deserializer<KIn> keyDeserializer; private Deserializer<VIn> valDeserializer; private final TimestampExtractor timestampExtractor; @@ -59,7 +59,7 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO } @Override - public void init(final InternalProcessorContext context) { + public void init(final InternalProcessorContext<KIn, VIn> context) { // It is important to first create the sensor before calling init on the // parent object. Otherwise due to backwards compatibility an empty sensor // without parent is created with the same name. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 129407a..ee5d1ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -102,7 +102,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private final Sensor punctuateLatencySensor; private final Sensor bufferedRecordsSensor; private final Map<String, Sensor> e2eLatencySensors = new HashMap<>(); + + @SuppressWarnings("rawtypes") private final InternalProcessorContext processorContext; + private final RecordQueueCreator recordQueueCreator; private StampedRecord record; @@ -110,6 +113,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private boolean commitRequested = false; private boolean hasPendingTxCommit = false; + @SuppressWarnings("rawtypes") public StreamTask(final TaskId id, final Set<TopicPartition> inputPartitions, final ProcessorTopology topology, @@ -317,6 +321,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } } + @SuppressWarnings("unchecked") private void closeTopology() { log.trace("Closing processor topology"); @@ -805,6 +810,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, * @throws IllegalStateException if the current node is not null * @throws TaskMigratedException if the task producer got fenced (EOS only) */ + @SuppressWarnings("unchecked") @Override public void punctuate(final ProcessorNode<?, ?, ?, ?> node, final long timestamp, @@ -840,6 +846,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } } + @SuppressWarnings("unchecked") private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> currNode, final long wallClockTime, final ProcessorRecordContext recordContext) { @@ -933,6 +940,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, return purgeableConsumedOffsets; } + @SuppressWarnings("unchecked") private void initializeTopology() { // initialize the task by initializing all its processor nodes in the topology log.trace("Initializing processor nodes of the topology"); @@ -1107,6 +1115,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } } + @SuppressWarnings("rawtypes") public InternalProcessorContext processorContext() { return processorContext; } @@ -1230,7 +1239,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } public RecordQueue createQueue(final TopicPartition partition) { - final SourceNode<?, ?, ?, ?> source = topology.source(partition.topic()); + final SourceNode<?, ?> source = topology.source(partition.topic()); if (source == null) { throw new TopologyException( "Topic is unknown to the topology. " + diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java index 7e5f11a..c86d216 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.processor.api.Record; + /** * Listen to cache flush events * @param <K> key type @@ -31,4 +34,9 @@ public interface CacheFlushListener<K, V> { * @param timestamp timestamp of new value */ void apply(final K key, final V newValue, final V oldValue, final long timestamp); + + /** + * Called when records are flushed from the {@link ThreadCache} + */ + void apply(final Record<K, Change<V>> record); }
