KAFKA-3776: Unify store and downstream caching in streams This is joint work between dguy and enothereska. The work implements KIP-63. Overview of main changes:
- New byte-based cache that acts as a buffer for any persistent store and for forwarding changes downstream. - Forwarding record path changes: previously a record in a task completed end-to-end. Now it may be buffered in a processor node while other records complete in the task. - Cleanup and state stores and decoupling of cache from state store and forwarding. - More than 80 new unit and integration tests. Author: Damian Guy <damian....@gmail.com> Author: Eno Thereska <eno.there...@gmail.com> Reviewers: Matthias J. Sax, Guozhang Wang Closes #1752 from enothereska/KAFKA-3776-poc Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86aa0eb0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86aa0eb0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86aa0eb0 Branch: refs/heads/trunk Commit: 86aa0eb0f274c6e44eb190ce250433419e011a67 Parents: 143a33b Author: Damian Guy <damian....@gmail.com> Authored: Fri Sep 16 09:58:36 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Fri Sep 16 09:58:36 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/StreamsConfig.java | 11 +- .../kafka/streams/kstream/JoinWindows.java | 5 + .../kafka/streams/kstream/TimeWindows.java | 5 + .../kafka/streams/kstream/UnlimitedWindows.java | 5 + .../apache/kafka/streams/kstream/Windows.java | 1 + .../kstream/internals/CacheFlushListener.java | 35 ++ .../internals/ForwardingCacheFlushListener.java | 38 ++ .../kstream/internals/KGroupedStreamImpl.java | 12 +- .../kstream/internals/KGroupedTableImpl.java | 1 + .../kstream/internals/KStreamAggregate.java | 10 +- .../streams/kstream/internals/KStreamImpl.java | 2 +- .../kstream/internals/KStreamMapValues.java | 2 +- .../kstream/internals/KStreamReduce.java | 7 +- .../internals/KStreamWindowAggregate.java | 17 +- .../kstream/internals/KStreamWindowReduce.java | 12 +- .../kstream/internals/KTableAggregate.java | 13 +- .../streams/kstream/internals/KTableImpl.java | 3 +- .../streams/kstream/internals/KTableReduce.java | 11 +- .../streams/kstream/internals/KTableSource.java | 5 +- .../kafka/streams/processor/Processor.java | 2 +- .../streams/processor/ProcessorContext.java | 1 + .../streams/processor/TopologyBuilder.java | 14 +- .../processor/internals/AbstractTask.java | 19 +- .../internals/InternalProcessorContext.java | 49 +++ .../internals/ProcessorContextImpl.java | 113 +++-- .../processor/internals/ProcessorNode.java | 9 +- .../internals/ProcessorRecordContext.java | 72 +++ .../internals/ProcessorStateManager.java | 17 +- .../processor/internals/ProcessorTopology.java | 22 +- .../processor/internals/RecordContext.java | 45 ++ .../streams/processor/internals/SinkNode.java | 4 +- .../streams/processor/internals/SourceNode.java | 3 +- .../processor/internals/StandbyContextImpl.java | 27 +- .../processor/internals/StandbyTask.java | 4 +- .../streams/processor/internals/StreamTask.java | 88 ++-- .../processor/internals/StreamThread.java | 64 ++- .../org/apache/kafka/streams/state/Stores.java | 22 +- .../state/internals/AbstractStoreSupplier.java | 1 + .../state/internals/CachedStateStore.java | 28 ++ .../state/internals/CachingKeyValueStore.java | 219 ++++++++++ .../state/internals/CachingWindowStore.java | 170 ++++++++ .../DelegatingPeekingKeyValueIterator.java | 73 ++++ .../DelegatingPeekingWindowIterator.java | 73 ++++ .../streams/state/internals/LRUCacheEntry.java | 92 ++++ .../MergedSortedCacheKeyValueStoreIterator.java | 109 +++++ .../MergedSortedCachedWindowStoreIterator.java | 105 +++++ .../streams/state/internals/NamedCache.java | 298 +++++++++++++ .../internals/PeekingKeyValueIterator.java | 24 + .../state/internals/PeekingWindowIterator.java | 25 ++ .../internals/RocksDBKeyValueStoreSupplier.java | 28 +- .../streams/state/internals/RocksDBStore.java | 201 ++------- .../state/internals/RocksDBWindowStore.java | 48 +- .../internals/RocksDBWindowStoreSupplier.java | 25 +- .../streams/state/internals/ThreadCache.java | 311 +++++++++++++ .../state/internals/ThreadCacheMetrics.java | 40 ++ .../state/internals/WindowStoreUtils.java | 8 + .../integration/FanoutIntegrationTest.java | 16 + .../KStreamAggregationDedupIntegrationTest.java | 264 +++++++++++ .../KStreamAggregationIntegrationTest.java | 19 +- .../KStreamKTableJoinIntegrationTest.java | 102 +++-- .../integration/KStreamRepartitionJoinTest.java | 51 ++- .../QueryableStateIntegrationTest.java | 127 ++++-- .../internals/KGroupedTableImplTest.java | 46 ++ .../internals/KStreamKStreamJoinTest.java | 58 +-- .../internals/KStreamKStreamLeftJoinTest.java | 68 +-- .../internals/KStreamWindowAggregateTest.java | 119 +++-- .../kstream/internals/KTableAggregateTest.java | 98 ++++- .../kstream/internals/KTableFilterTest.java | 18 +- .../kstream/internals/KTableForeachTest.java | 1 + .../kstream/internals/KTableImplTest.java | 10 + .../kstream/internals/KTableKTableJoinTest.java | 32 +- .../internals/KTableKTableLeftJoinTest.java | 41 +- .../internals/KTableKTableOuterJoinTest.java | 51 +-- .../kstream/internals/KTableMapKeysTest.java | 2 +- .../kstream/internals/KTableMapValuesTest.java | 17 +- .../kstream/internals/KTableSourceTest.java | 13 +- .../kafka/streams/perf/SimpleBenchmark.java | 9 +- .../streams/processor/TopologyBuilderTest.java | 6 +- .../internals/ProcessorStateManagerTest.java | 13 +- .../internals/ProcessorTopologyTest.java | 2 + .../internals/PunctuationQueueTest.java | 4 +- .../processor/internals/RecordContextStub.java | 55 +++ .../processor/internals/StandbyTaskTest.java | 20 +- .../processor/internals/StreamTaskTest.java | 14 +- .../processor/internals/StreamThreadTest.java | 2 +- .../kafka/streams/smoketest/SmokeTestUtil.java | 3 +- .../streams/state/KeyValueStoreTestDriver.java | 9 +- .../internals/AbstractKeyValueStoreTest.java | 1 - .../internals/CachingKeyValueStoreTest.java | 151 +++++++ .../state/internals/CachingWindowStoreTest.java | 166 +++++++ .../DelegatingPeekingKeyValueIteratorTest.java | 78 ++++ .../DelegatingPeekingWindowIteratorTest.java | 92 ++++ ...gedSortedCacheKeyValueStoreIteratorTest.java | 57 +++ ...ergedSortedCacheWindowStoreIteratorTest.java | 97 +++++ .../streams/state/internals/NamedCacheTest.java | 189 ++++++++ .../internals/RocksDBKeyValueStoreTest.java | 56 ++- .../state/internals/RocksDBWindowStoreTest.java | 259 ++++++----- .../state/internals/StateStoreTestUtils.java | 1 + .../StreamThreadStateStoreProviderTest.java | 4 +- .../state/internals/ThreadCacheTest.java | 434 +++++++++++++++++++ .../apache/kafka/test/KStreamTestDriver.java | 51 ++- .../apache/kafka/test/MockProcessorContext.java | 66 ++- .../kafka/test/MockProcessorSupplier.java | 5 +- .../java/org/apache/kafka/test/MockReducer.java | 21 + .../kafka/test/ProcessorTopologyTestDriver.java | 6 +- 105 files changed, 4784 insertions(+), 888 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index e972887..23b5287 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -136,6 +136,9 @@ public class StreamsConfig extends AbstractConfig { public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; + /** <code>cache.max.bytes.buffering</code> */ + public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering"; + public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads"; static { CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, // required with no default value @@ -247,7 +250,13 @@ public class StreamsConfig extends AbstractConfig { Type.LONG, 24 * 60 * 60 * 1000, Importance.MEDIUM, - WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC); + WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC) + .define(CACHE_MAX_BYTES_BUFFERING_CONFIG, + Type.LONG, + 10 * 1024 * 1024L, + atLeast(0), + Importance.LOW, + CACHE_MAX_BYTES_BUFFERING_DOC); } // this is the list of configs for underlying clients http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 2552148..1ac606e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -103,6 +103,11 @@ public class JoinWindows extends Windows<TimeWindow> { } @Override + public long size() { + return after + before; + } + + @Override public final boolean equals(Object o) { if (o == this) { return true; http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index 1ec4628..ef94cf9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -110,6 +110,11 @@ public class TimeWindows extends Windows<TimeWindow> { } @Override + public long size() { + return size; + } + + @Override public final boolean equals(Object o) { if (o == this) { return true; http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 971f3c7..92f9ee9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -70,6 +70,11 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> { } @Override + public long size() { + return Long.MAX_VALUE; + } + + @Override public final boolean equals(Object o) { if (o == this) { return true; http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index d0a5861..f060d39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -79,4 +79,5 @@ public abstract class Windows<W extends Window> { */ public abstract Map<Long, W> windowsFor(long timestamp); + public abstract long size(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java new file mode 100644 index 0000000..c01ed0f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CacheFlushListener.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.state.internals.ThreadCache; + +/** + * Listen to cache flush events + * @param <K> + * @param <V> + */ +public interface CacheFlushListener<K, V> { + + /** + * Called when records are flushed from the {@link ThreadCache} + * @param key key of the entry + * @param newValue current value + * @param oldValue previous value + */ + void apply(final K key, final V newValue, final V oldValue); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java new file mode 100644 index 0000000..1796be9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.ProcessorContext; + +class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> { + private final ProcessorContext context; + private final boolean sendOldValues; + + ForwardingCacheFlushListener(final ProcessorContext context, final boolean sendOldValues) { + this.context = context; + this.sendOldValues = sendOldValues; + } + + @Override + public void apply(final K key, final V newValue, final V oldValue) { + if (sendOldValues) { + context.forward(key, new Change<>(newValue, oldValue)); + } else { + context.forward(key, new Change<>(newValue, null)); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 78e6a2c..9bc66e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -151,18 +151,18 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou final Windows<W> windows, final String storeName) { return storeFactory(aggValSerde, storeName) - .windowed(windows.maintainMs(), windows.segments, false) - .build(); + .windowed(windows.size(), windows.maintainMs(), windows.segments, false) + .build(); } private <T> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<T> aggValueSerde, final String storeName) { return Stores.create(storeName) - .withKeys(keySerde) - .withValues(aggValueSerde) - .persistent(); - + .withKeys(keySerde) + .withValues(aggValueSerde) + .persistent() + .enableCaching(); } private <T> KTable<K, T> doAggregate( http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 82a800d..7aa2531 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -105,6 +105,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup .withKeys(keySerde) .withValues(aggValueSerde) .persistent() + .enableCaching() .build(); // send the aggregate key-value pairs to the intermediate topic for partitioning http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index dc6410d..428c513 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.CachedStateStore; public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> { @@ -30,6 +31,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, private final Initializer<T> initializer; private final Aggregator<K, V, T> aggregator; + private boolean sendOldValues = false; public KStreamAggregate(String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) { @@ -56,8 +58,8 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, @Override public void init(ProcessorContext context) { super.init(context); - store = (KeyValueStore<K, T>) context.getStateStore(storeName); + ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues)); } @@ -80,12 +82,6 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, // update the store with the new value store.put(key, newAgg); - - // send the old / new pair - if (sendOldValues) - context().forward(key, new Change<>(newAgg, oldAgg)); - else - context().forward(key, new Change<>(newAgg, null)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 4d39b18..b9ed19a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -633,7 +633,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V .withKeys(keySerde) .withValues(valueSerde) .persistent() - .windowed(windows.maintainMs(), windows.segments, true) + .windowed(windows.size(), windows.maintainMs(), windows.segments, true) .build(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java index 06667e8..66c0f62 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java @@ -37,7 +37,7 @@ class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> { private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override - public void process(K key, V value) { + public void process(final K key, final V value) { V1 newValue = mapper.apply(value); context().forward(key, newValue); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index dd5ba45..6d24284 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.CachedStateStore; public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> { @@ -55,6 +56,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, super.init(context); store = (KeyValueStore<K, V>) context.getStateStore(storeName); + ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues)); } @@ -79,11 +81,6 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, // update the store with the new value store.put(key, newAgg); - // send the old / new pair - if (sendOldValues) - context().forward(key, new Change<>(newAgg, oldAgg)); - else - context().forward(key, new Change<>(newAgg, null)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 125c7fc..437d304 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.CachedStateStore; import java.util.Map; @@ -67,6 +68,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea super.init(context); windowStore = (WindowStore<K, T>) context.getStateStore(storeName); + ((CachedStateStore) windowStore).setFlushListener(new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues)); } @Override @@ -91,7 +93,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea try (WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo)) { - // for each matching window, try to update the corresponding key and send to the downstream + // for each matching window, try to update the corresponding key while (iter.hasNext()) { KeyValue<Long, T> entry = iter.next(); W window = matchedWindows.get(entry.key); @@ -109,12 +111,6 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea // update the store with the new value windowStore.put(key, newAgg, window.start()); - // forward the aggregated change pair - if (sendOldValues) - context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); - matchedWindows.remove(entry.key); } } @@ -124,14 +120,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea for (long windowStartMs : matchedWindows.keySet()) { T oldAgg = initializer.apply(); T newAgg = aggregator.apply(key, value, oldAgg); - windowStore.put(key, newAgg, windowStartMs); - - // send the new aggregate pair - if (sendOldValues) - context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null)); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java index 763ccdd..2a47f72 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.CachedStateStore; import java.util.Map; @@ -62,8 +63,8 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr @Override public void init(ProcessorContext context) { super.init(context); - windowStore = (WindowStore<K, V>) context.getStateStore(storeName); + ((CachedStateStore) windowStore).setFlushListener(new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues)); } @Override @@ -108,12 +109,6 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr // update the store with the new value windowStore.put(key, newAgg, window.start()); - // forward the aggregated change pair - if (sendOldValues) - context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); - matchedWindows.remove(entry.key); } } @@ -122,9 +117,6 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr // create the new window for the rest of unmatched window that do not exist yet for (long windowStartMs : matchedWindows.keySet()) { windowStore.put(key, value, windowStartMs); - - // send the new aggregate pair (there will be no old value) - context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null)); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index 4a7c7c0..3f2ab97 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.CachedStateStore; public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> { @@ -57,10 +58,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T @SuppressWarnings("unchecked") @Override - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { super.init(context); - store = (KeyValueStore<K, T>) context.getStateStore(storeName); + ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues)); } /** @@ -91,13 +92,8 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T // update the store with the new value store.put(key, newAgg); - - // send the old / new pair - if (sendOldValues) - context().forward(key, new Change<>(newAgg, oldAgg)); - else - context().forward(key, new Change<>(newAgg, null)); } + } @Override @@ -128,4 +124,5 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T } } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 6c73b11..ebe00d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -422,7 +422,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, keySerde, valSerde, false, - Collections.<String, String>emptyMap()); + Collections.<String, String>emptyMap(), + true); // mark this state as non internal hence it is read directly from a user topic topology.addStateStore(storeSupplier, name); source.materialize(); http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java index bab6bf3..a5457a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.CachedStateStore; public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { @@ -45,10 +46,10 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { @Override public Processor<K, Change<V>> get() { - return new KTableAggregateProcessor(); + return new KTableReduceProcessor(); } - private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> { + private class KTableReduceProcessor extends AbstractProcessor<K, Change<V>> { private KeyValueStore<K, V> store; @@ -58,6 +59,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { super.init(context); store = (KeyValueStore<K, V>) context.getStateStore(storeName); + ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues)); } /** @@ -89,11 +91,6 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { // update the store with the new value store.put(key, newAgg); - // send the old / new pair - if (sendOldValues) - context().forward(key, new Change<>(newAgg, oldAgg)); - else - context().forward(key, new Change<>(newAgg, null)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java index 05befed..d8d389f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.CachedStateStore; public class KTableSource<K, V> implements ProcessorSupplier<K, V> { @@ -72,6 +73,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { public void init(ProcessorContext context) { super.init(context); store = (KeyValueStore<K, V>) context.getStateStore(storeName); + ((CachedStateStore) store).setFlushListener(new ForwardingCacheFlushListener<K, V>(context, sendOldValues)); } @Override @@ -80,10 +82,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { if (key == null) throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null."); - V oldValue = sendOldValues ? store.get(key) : null; store.put(key, value); - - context().forward(key, new Change<>(value, oldValue)); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java index 92fcf12..beaace3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java @@ -38,7 +38,7 @@ public interface Processor<K, V> { /** * Process the record with the given key and value. - * + * * @param key the key for the record * @param value the value for the record */ http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index acecf91..d854a85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -184,4 +184,5 @@ public interface ProcessorContext { * */ Map<String, Object> appConfigsWithPrefix(String prefix); + } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index ee61e73..f5fd571 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -767,12 +767,13 @@ public class TopologyBuilder { Map<String, ProcessorNode> processorMap = new HashMap<>(); Map<String, SourceNode> topicSourceMap = new HashMap<>(); Map<String, SinkNode> topicSinkMap = new HashMap<>(); - Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>(); + Map<String, StateStore> stateStoreMap = new LinkedHashMap<>(); + Map<StateStore, ProcessorNode> storeToProcessorNodeMap = new HashMap<>(); // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) for (NodeFactory factory : nodeFactories.values()) { if (nodeGroup == null || nodeGroup.contains(factory.name)) { - ProcessorNode node = factory.build(); + final ProcessorNode node = factory.build(); processorNodes.add(node); processorMap.put(node.name(), node); @@ -782,7 +783,10 @@ public class TopologyBuilder { } for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) { if (!stateStoreMap.containsKey(stateStoreName)) { - stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier); + final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier; + final StateStore stateStore = supplier.get(); + stateStoreMap.put(stateStoreName, stateStore); + storeToProcessorNodeMap.put(stateStore, node); } } } else if (factory instanceof SourceNodeFactory) { @@ -815,9 +819,9 @@ public class TopologyBuilder { } } - return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), sourceStoreToSourceTopic); + return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), sourceStoreToSourceTopic, storeToProcessorNodeMap); } - + /** * Returns the map of topic groups keyed by the group id. * A topic group is a group of topics in the same task. http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 8a45dd6..54cbe4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -23,8 +23,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.IOException; import java.util.Collection; @@ -40,8 +40,8 @@ public abstract class AbstractTask { protected final Consumer consumer; protected final ProcessorStateManager stateMgr; protected final Set<TopicPartition> partitions; - protected ProcessorContext processorContext; - + protected InternalProcessorContext processorContext; + protected final ThreadCache cache; /** * @throws ProcessorStateException if the state manager cannot be created */ @@ -52,16 +52,18 @@ public abstract class AbstractTask { Consumer<byte[], byte[]> consumer, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby, - StateDirectory stateDirectory) { + StateDirectory stateDirectory, + final ThreadCache cache) { this.id = id; this.applicationId = applicationId; this.partitions = new HashSet<>(partitions); this.topology = topology; this.consumer = consumer; + this.cache = cache; // create the processor state manager try { - this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.sourceStoreToSourceTopic()); + this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.sourceStoreToSourceTopic(), topology.storeToProcessorNodeMap()); } catch (IOException e) { throw new ProcessorStateException("Error while creating the state manager", e); @@ -72,8 +74,7 @@ public abstract class AbstractTask { // set initial offset limits initializeOffsetLimits(); - for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) { - StateStore store = stateStoreSupplier.get(); + for (StateStore store : this.topology.stateStores()) { store.init(this.processorContext, store); } } @@ -98,6 +99,10 @@ public abstract class AbstractTask { return processorContext; } + public final ThreadCache cache() { + return cache; + } + public abstract void commit(); /** http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java new file mode 100644 index 0000000..251ff3f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.internals.ThreadCache; + +/** + * For internal use so we can update the {@link RecordContext} and current + * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from + * {@link ThreadCache} + */ +public interface InternalProcessorContext extends ProcessorContext { + + /** + * Returns the current {@link RecordContext} + * @return the current {@link RecordContext} + */ + RecordContext recordContext(); + + /** + * @param recordContext the {@link RecordContext} for the record about to be processes + */ + void setRecordContext(RecordContext recordContext); + + /** + * @param currentNode the current {@link ProcessorNode} + */ + void setCurrentNode(ProcessorNode currentNode); + + /** + * Get the thread-global cache + */ + ThreadCache getCache(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index a38839f..f4d4e83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -21,15 +21,16 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.File; +import java.util.List; import java.util.Map; -public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier { +public class ProcessorContextImpl implements InternalProcessorContext, RecordCollector.Supplier { public static final String NONEXIST_TOPIC = "__null_topic__"; @@ -42,12 +43,10 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S private final StreamsConfig config; private final Serde<?> keySerde; private final Serde<?> valSerde; - + private final ThreadCache cache; private boolean initialized; - private Long timestamp; - private String topic; - private Long offset; - private Integer partition; + private RecordContext recordContext; + private ProcessorNode currentNode; @SuppressWarnings("unchecked") public ProcessorContextImpl(TaskId id, @@ -55,7 +54,8 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S StreamsConfig config, RecordCollector collector, ProcessorStateManager stateMgr, - StreamsMetrics metrics) { + StreamsMetrics metrics, + final ThreadCache cache) { this.id = id; this.task = task; this.metrics = metrics; @@ -65,7 +65,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S this.config = config; this.keySerde = config.keySerde(); this.valSerde = config.valueSerde(); - + this.cache = cache; this.initialized = false; } @@ -140,10 +140,22 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S return stateMgr.getStore(name); } + @Override + public ThreadCache getCache() { + return cache; + } + /** + * @throws IllegalStateException if the task's record is null + */ @Override - public synchronized String topic() { - if (topic == null || topic.equals(NONEXIST_TOPIC)) + public String topic() { + if (recordContext == null) + throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed"); + + String topic = recordContext.topic(); + + if (topic.equals(NONEXIST_TOPIC)) return null; else return topic; @@ -153,48 +165,77 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S * @throws IllegalStateException if partition is null */ @Override - public synchronized int partition() { - if (partition == null) { + public int partition() { + if (recordContext == null) throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed"); - } - return partition; + + return recordContext.partition(); } /** * @throws IllegalStateException if offset is null */ @Override - public synchronized long offset() { - if (offset == null) { + public long offset() { + if (recordContext == null) throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed"); - } - return offset; + + return recordContext.offset(); } /** * @throws IllegalStateException if timestamp is null */ @Override - public synchronized long timestamp() { - if (timestamp == null) { - throw new IllegalStateException("This should not happen as timestamp should be set during record processing"); - } - return timestamp; + public long timestamp() { + if (recordContext == null) + throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed"); + + return recordContext.timestamp(); } + @SuppressWarnings("unchecked") @Override public <K, V> void forward(K key, V value) { - task.forward(key, value); + ProcessorNode previousNode = currentNode; + try { + for (ProcessorNode child : (List<ProcessorNode>) currentNode.children()) { + currentNode = child; + child.process(key, value); + } + } finally { + currentNode = previousNode; + } } + @SuppressWarnings("unchecked") @Override public <K, V> void forward(K key, V value, int childIndex) { - task.forward(key, value, childIndex); + ProcessorNode previousNode = currentNode; + final ProcessorNode child = (ProcessorNode<K, V>) currentNode.children().get(childIndex); + currentNode = child; + try { + child.process(key, value); + } finally { + currentNode = previousNode; + } } + @SuppressWarnings("unchecked") @Override public <K, V> void forward(K key, V value, String childName) { - task.forward(key, value, childName); + for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode.children()) { + if (child.name().equals(childName)) { + ProcessorNode previousNode = currentNode; + currentNode = child; + try { + child.process(key, value); + return; + } finally { + currentNode = previousNode; + } + } + } } @Override @@ -217,10 +258,18 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S return config.originalsWithPrefix(prefix); } - public synchronized void update(final StampedRecord record) { - this.timestamp = record.timestamp; - this.partition = record.partition(); - this.offset = record.offset(); - this.topic = record.topic(); + @Override + public void setRecordContext(final RecordContext recordContext) { + this.recordContext = recordContext; + } + + @Override + public RecordContext recordContext() { + return this.recordContext; + } + + @Override + public void setCurrentNode(final ProcessorNode currentNode) { + this.currentNode = currentNode; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 64ca032..c05702b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -44,6 +44,7 @@ public class ProcessorNode<K, V> { this.stateStores = stateStores; } + public final String name() { return name; } @@ -64,14 +65,14 @@ public class ProcessorNode<K, V> { processor.init(context); } - public void process(K key, V value) { - processor.process(key, value); - } - public void close() { processor.close(); } + public void process(final K key, final V value) { + processor.process(key, value); + } + /** * @return a string representation of this node, useful for debugging. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java new file mode 100644 index 0000000..55452ad --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Objects; + +public class ProcessorRecordContext implements RecordContext { + + private final long timestamp; + private final long offset; + private final String topic; + private final int partition; + + public ProcessorRecordContext(final long timestamp, + final long offset, + final int partition, + final String topic) { + + this.timestamp = timestamp; + this.offset = offset; + this.topic = topic; + this.partition = partition; + } + + public long offset() { + return offset; + } + + public long timestamp() { + return timestamp; + } + + @Override + public String topic() { + return topic; + } + + @Override + public int partition() { + return partition; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final ProcessorRecordContext that = (ProcessorRecordContext) o; + return timestamp == that.timestamp && + offset == that.offset && + partition == that.partition && + Objects.equals(topic, that.topic); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, offset, topic, partition); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 123e475..5e5eaa9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -64,21 +65,24 @@ public class ProcessorStateManager { private final Map<String, String> sourceStoreToSourceTopic; private final TaskId taskId; private final StateDirectory stateDirectory; + private final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap; /** * @throws IOException if any error happens while creating or locking the state directory */ public ProcessorStateManager(String applicationId, TaskId taskId, Collection<TopicPartition> sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby, - StateDirectory stateDirectory, final Map<String, String> sourceStoreToSourceTopic) throws IOException { + StateDirectory stateDirectory, final Map<String, String> sourceStoreToSourceTopic, + final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap) throws IOException { this.applicationId = applicationId; this.defaultPartition = taskId.partition; this.taskId = taskId; this.stateDirectory = stateDirectory; + this.stateStoreProcessorNodeMap = stateStoreProcessorNodeMap; this.partitionForTopic = new HashMap<>(); for (TopicPartition source : sources) { this.partitionForTopic.put(source.topic(), source); } - this.stores = new HashMap<>(); + this.stores = new LinkedHashMap<>(); this.loggingEnabled = new HashSet<>(); this.restoreConsumer = restoreConsumer; this.restoredOffsets = new HashMap<>(); @@ -288,11 +292,16 @@ public class ProcessorStateManager { return stores.get(name); } - public void flush() { + public void flush(final InternalProcessorContext context) { if (!this.stores.isEmpty()) { log.debug("task [{}] Flushing stores.", taskId); - for (StateStore store : this.stores.values()) + for (StateStore store : this.stores.values()) { + final ProcessorNode processorNode = stateStoreProcessorNodeMap.get(store); + if (processorNode != null) { + context.setCurrentNode(processorNode); + } store.flush(); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 221d152..04c0261 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.StateStore; import java.util.Collections; import java.util.HashSet; @@ -28,18 +28,22 @@ public class ProcessorTopology { private final List<ProcessorNode> processorNodes; private final Map<String, SourceNode> sourceByTopics; private final Map<String, SinkNode> sinkByTopics; - private final List<StateStoreSupplier> stateStoreSuppliers; + private final List<StateStore> stateStores; private final Map<String, String> sourceStoreToSourceTopic; + private final Map<StateStore, ProcessorNode> storeToProcessorNodeMap; + public ProcessorTopology(List<ProcessorNode> processorNodes, Map<String, SourceNode> sourceByTopics, Map<String, SinkNode> sinkByTopics, - List<StateStoreSupplier> stateStoreSuppliers, - Map<String, String> sourceStoreToSourceTopic) { + List<StateStore> stateStores, + Map<String, String> sourceStoreToSourceTopic, + Map<StateStore, ProcessorNode> storeToProcessorNodeMap) { this.processorNodes = Collections.unmodifiableList(processorNodes); this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics); this.sinkByTopics = Collections.unmodifiableMap(sinkByTopics); - this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers); + this.stateStores = Collections.unmodifiableList(stateStores); this.sourceStoreToSourceTopic = sourceStoreToSourceTopic; + this.storeToProcessorNodeMap = Collections.unmodifiableMap(storeToProcessorNodeMap); } public Set<String> sourceTopics() { @@ -70,14 +74,18 @@ public class ProcessorTopology { return processorNodes; } - public List<StateStoreSupplier> stateStoreSuppliers() { - return stateStoreSuppliers; + public List<StateStore> stateStores() { + return stateStores; } public Map<String, String> sourceStoreToSourceTopic() { return sourceStoreToSourceTopic; } + public Map<StateStore, ProcessorNode> storeToProcessorNodeMap() { + return storeToProcessorNodeMap; + } + private String childrenToString(List<ProcessorNode<?, ?>> children) { if (children == null || children.isEmpty()) { return ""; http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java new file mode 100644 index 0000000..f37f3e9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.processor.Processor; + +/** + * The context associated with the current record being processed by + * an {@link Processor} + */ +public interface RecordContext { + /** + * @return The offset of the original record received from Kafka + */ + long offset(); + + /** + * @return The timestamp extracted from the record received from Kafka + */ + long timestamp(); + + /** + * @return The topic the record was received on + */ + String topic(); + + /** + * @return The partition the record was received on + */ + int partition(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 6907858..2b5692d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -65,9 +65,9 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { } + @Override - public void process(K key, V value) { - // send to all the registered topics + public void process(final K key, final V value) { RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); collector.send(new ProducerRecord<>(topic, null, context.timestamp(), key, value), keySerializer, valSerializer, partitioner); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 90da1de..4bc3a53 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -60,8 +60,9 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { ((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer()); } + @Override - public void process(K key, V value) { + public void process(final K key, final V value) { context.forward(key, value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 039ab66..563dbce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -20,15 +20,14 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; - +import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.File; import java.util.Map; -public class StandbyContextImpl implements ProcessorContext, RecordCollector.Supplier { +public class StandbyContextImpl implements InternalProcessorContext, RecordCollector.Supplier { private final TaskId id; private final String applicationId; @@ -38,6 +37,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup private final StreamsConfig config; private final Serde<?> keySerde; private final Serde<?> valSerde; + private final ThreadCache zeroSizedCache = new ThreadCache(0); private boolean initialized; @@ -120,6 +120,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks."); } + @Override + public ThreadCache getCache() { + return zeroSizedCache; + } + /** * @throws UnsupportedOperationException */ @@ -201,4 +206,20 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup public Map<String, Object> appConfigsWithPrefix(String prefix) { return config.originalsWithPrefix(prefix); } + + @Override + public RecordContext recordContext() { + throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks."); + } + + @Override + public void setRecordContext(final RecordContext recordContext) { + throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks."); + } + + + @Override + public void setCurrentNode(final ProcessorNode currentNode) { + // no-op. can't throw as this is called on commit when the StateStores get flushed. + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index a22bea9..384a1a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -59,7 +59,7 @@ public class StandbyTask extends AbstractTask { Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config, StreamsMetrics metrics, final StateDirectory stateDirectory) { - super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory); + super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null); log.info("task [{}] Creating processorContext", id()); @@ -92,7 +92,7 @@ public class StandbyTask extends AbstractTask { public void commit() { log.debug("task [{}] flushing", id()); - stateMgr.flush(); + stateMgr.flush(processorContext); // reinitialize offset limits initializeOffsetLimits(); http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 476ec2e..2d40d88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -26,12 +26,12 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import static java.util.Collections.singleton; @@ -56,7 +56,6 @@ public class StreamTask extends AbstractTask implements Punctuator { private boolean commitRequested = false; private boolean commitOffsetNeeded = false; - private StampedRecord currRecord = null; private ProcessorNode currNode = null; private boolean requiresPoll = true; @@ -83,8 +82,9 @@ public class StreamTask extends AbstractTask implements Punctuator { Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config, StreamsMetrics metrics, - StateDirectory stateDirectory) { - super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory); + StateDirectory stateDirectory, + ThreadCache cache) { + super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache); this.punctuationQueue = new PunctuationQueue(); this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); @@ -110,7 +110,7 @@ public class StreamTask extends AbstractTask implements Punctuator { log.info("task [{}] Creating restoration consumer client", id()); // initialize the topology with its own context - this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics); + this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache); // initialize the state stores initializeStateStores(); @@ -165,19 +165,18 @@ public class StreamTask extends AbstractTask implements Punctuator { try { // process the record by passing to the source node of the topology - this.currRecord = record; this.currNode = recordInfo.node(); TopicPartition partition = recordInfo.partition(); - log.debug("task [{}] Start processing one record [{}]", id(), currRecord); + log.debug("task [{}] Start processing one record [{}]", id(), record); + final ProcessorRecordContext recordContext = createRecordContext(record); + updateProcessorContext(recordContext, currNode); + this.currNode.process(record.key(), record.value()); - updateContext(currRecord); - this.currNode.process(currRecord.key(), currRecord.value()); - - log.debug("task [{}] Completed processing one record [{}]", id(), currRecord); + log.debug("task [{}] Completed processing one record [{}]", id(), record); // update the consumed offset map after processing is done - consumedOffsets.put(partition, currRecord.offset()); + consumedOffsets.put(partition, record.offset()); commitOffsetNeeded = true; // after processing this record, if its partition queue's buffered size has been @@ -191,13 +190,18 @@ public class StreamTask extends AbstractTask implements Punctuator { requiresPoll = true; } } finally { - this.currRecord = null; + processorContext.setCurrentNode(null); this.currNode = null; } return partitionGroup.numBuffered(); } + private void updateProcessorContext(final ProcessorRecordContext recordContext, final ProcessorNode currNode) { + processorContext.setRecordContext(recordContext); + processorContext.setCurrentNode(currNode); + } + public boolean requiresPoll() { return requiresPoll; } @@ -226,23 +230,16 @@ public class StreamTask extends AbstractTask implements Punctuator { throw new IllegalStateException(String.format("task [%s] Current node is not null", id())); currNode = node; - currRecord = new StampedRecord(DUMMY_RECORD, timestamp); - updateContext(currRecord); + final StampedRecord stampedRecord = new StampedRecord(DUMMY_RECORD, timestamp); + updateProcessorContext(createRecordContext(stampedRecord), node); try { node.processor().punctuate(timestamp); } finally { + processorContext.setCurrentNode(null); currNode = null; - currRecord = null; } } - private void updateContext(final StampedRecord record) { - ((ProcessorContextImpl) processorContext).update(record); - } - - public StampedRecord record() { - return this.currRecord; - } public ProcessorNode node() { return this.currNode; @@ -253,7 +250,7 @@ public class StreamTask extends AbstractTask implements Punctuator { */ public void commit() { // 1) flush local state - stateMgr.flush(); + stateMgr.flush(processorContext); // 2) flush produced records in the downstream and change logs of local states recordCollector.flush(); @@ -338,45 +335,8 @@ public class StreamTask extends AbstractTask implements Punctuator { return new RecordQueue(partition, source); } - @SuppressWarnings("unchecked") - public <K, V> void forward(K key, V value) { - ProcessorNode thisNode = currNode; - try { - for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { - currNode = childNode; - childNode.process(key, value); - } - } finally { - currNode = thisNode; - } - } - - @SuppressWarnings("unchecked") - public <K, V> void forward(K key, V value, int childIndex) { - ProcessorNode thisNode = currNode; - ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex); - currNode = childNode; - try { - childNode.process(key, value); - } finally { - currNode = thisNode; - } - } - - @SuppressWarnings("unchecked") - public <K, V> void forward(K key, V value, String childName) { - ProcessorNode thisNode = currNode; - for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { - if (childNode.name().equals(childName)) { - currNode = childNode; - try { - childNode.process(key, value); - } finally { - currNode = thisNode; - } - break; - } - } + private ProcessorRecordContext createRecordContext(final StampedRecord currRecord) { + return new ProcessorRecordContext(currRecord.timestamp, currRecord.offset(), currRecord.partition(), currRecord.topic()); } /** @@ -387,4 +347,6 @@ public class StreamTask extends AbstractTask implements Punctuator { public String toString() { return super.toString(); } + + }