This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8707d5c5b1b1806f1c92101665c8935fb43245f1
Author: John Roesler <[email protected]>
AuthorDate: Fri May 21 13:19:35 2021 -0500

    wip
---
 .../streams/kstream/internals/KTableFilter.java    | 74 ++++++++++++----------
 .../streams/kstream/internals/KTableImpl.java      | 20 +++++-
 .../internals/KTableNewProcessorSupplier.java      | 40 ++++++++++++
 .../kstream/internals/KTableValueGetter.java       |  4 +-
 ....java => NewTimestampedCacheFlushListener.java} | 29 +++++----
 .../internals/SessionCacheFlushListener.java       | 29 +++++++--
 .../internals/TimestampedCacheFlushListener.java   | 43 ++++++++-----
 .../internals/TimestampedTupleForwarder.java       | 42 +++++++-----
 .../internals/AbstractProcessorContext.java        |  2 +-
 .../internals/InternalProcessorContext.java        |  4 +-
 .../internals/InternalTopologyBuilder.java         | 51 ++++++++-------
 .../streams/processor/internals/ProcessorNode.java |  5 +-
 .../processor/internals/ProcessorTopology.java     | 32 +++++-----
 .../processor/internals/RecordDeserializer.java    |  6 +-
 .../streams/processor/internals/RecordQueue.java   |  6 +-
 .../streams/processor/internals/SinkNode.java      |  8 +--
 .../streams/processor/internals/SourceNode.java    |  6 +-
 .../streams/processor/internals/StreamTask.java    | 11 +++-
 .../state/internals/CacheFlushListener.java        |  8 +++
 19 files changed, 269 insertions(+), 151 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 76753a4..bbffea6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -17,23 +17,23 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
-class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
-    private final KTableImpl<K, ?, V> parent;
-    private final Predicate<? super K, ? super V> predicate;
+class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, 
KIn, VIn> {
+    private final KTableImpl<KIn, ?, VIn> parent;
+    private final Predicate<? super KIn, ? super VIn> predicate;
     private final boolean filterNot;
     private final String queryableName;
     private boolean sendOldValues;
 
-    KTableFilter(final KTableImpl<K, ?, V> parent,
-                 final Predicate<? super K, ? super V> predicate,
+    KTableFilter(final KTableImpl<KIn, ?, VIn> parent,
+                 final Predicate<? super KIn, ? super VIn> predicate,
                  final boolean filterNot,
                  final String queryableName) {
         this.parent = parent;
@@ -45,7 +45,7 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public Processor<K, Change<V>> get() {
+    public Processor<KIn, Change<VIn>, KIn, Change<VIn>> get() {
         return new KTableFilterProcessor();
     }
 
@@ -62,8 +62,8 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
         return sendOldValues;
     }
 
-    private V computeValue(final K key, final V value) {
-        V newValue = null;
+    private VIn computeValue(final KIn key, final VIn value) {
+        VIn newValue = null;
 
         if (value != null && (filterNot ^ predicate.test(key, value))) {
             newValue = value;
@@ -72,11 +72,11 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
         return newValue;
     }
 
-    private ValueAndTimestamp<V> computeValue(final K key, final 
ValueAndTimestamp<V> valueAndTimestamp) {
-        ValueAndTimestamp<V> newValueAndTimestamp = null;
+    private ValueAndTimestamp<VIn> computeValue(final KIn key, final 
ValueAndTimestamp<VIn> valueAndTimestamp) {
+        ValueAndTimestamp<VIn> newValueAndTimestamp = null;
 
         if (valueAndTimestamp != null) {
-            final V value = valueAndTimestamp.value();
+            final VIn value = valueAndTimestamp.value();
             if (filterNot ^ predicate.test(key, value)) {
                 newValueAndTimestamp = valueAndTimestamp;
             }
@@ -86,13 +86,14 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
     }
 
 
-    private class KTableFilterProcessor extends AbstractProcessor<K, 
Change<V>> {
-        private TimestampedKeyValueStore<K, V> store;
-        private TimestampedTupleForwarder<K, V> tupleForwarder;
+    private class KTableFilterProcessor implements Processor<KIn, Change<VIn>, 
KIn, Change<VIn>> {
+        private ProcessorContext<KIn, Change<VIn>> context;
+        private TimestampedKeyValueStore<KIn, VIn> store;
+        private TimestampedTupleForwarder<KIn, VIn> tupleForwarder;
 
         @Override
-        public void init(final ProcessorContext context) {
-            super.init(context);
+        public void init(final ProcessorContext<KIn, Change<VIn>> context) {
+            this.context = context;
             if (queryableName != null) {
                 store = context.getStateStore(queryableName);
                 tupleForwarder = new TimestampedTupleForwarder<>(
@@ -104,23 +105,26 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
         }
 
         @Override
-        public void process(final K key, final Change<V> change) {
-            final V newValue = computeValue(key, change.newValue);
-            final V oldValue = computeOldValue(key, change);
+        public void process(Record<KIn, Change<VIn>> record) {
+            final KIn key = record.key();
+            final Change<VIn> change = record.value();
+
+            final VIn newValue = computeValue(key, change.newValue);
+            final VIn oldValue = computeOldValue(key, change);
 
             if (sendOldValues && oldValue == null && newValue == null) {
                 return; // unnecessary to forward here.
             }
 
             if (queryableName != null) {
-                store.put(key, ValueAndTimestamp.make(newValue, 
context().timestamp()));
-                tupleForwarder.maybeForward(key, newValue, oldValue);
+                store.put(key, ValueAndTimestamp.make(newValue, 
record.timestamp()));
+                tupleForwarder.maybeForward(record.withValue(new 
Change<>(newValue, oldValue)));
             } else {
-                context().forward(key, new Change<>(newValue, oldValue));
+                context.forward(record.withValue(new Change<>(newValue, 
oldValue)));
             }
         }
 
-        private V computeOldValue(final K key, final Change<V> change) {
+        private VIn computeOldValue(final KIn key, final Change<VIn> change) {
             if (!sendOldValues) {
                 return null;
             }
@@ -132,16 +136,16 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
+    public KTableValueGetterSupplier<KIn, VIn> view() {
         // if the KTable is materialized, use the materialized store to return 
getter value;
         // otherwise rely on the parent getter and apply filter on-the-fly
         if (queryableName != null) {
             return new KTableMaterializedValueGetterSupplier<>(queryableName);
         } else {
-            return new KTableValueGetterSupplier<K, V>() {
-                final KTableValueGetterSupplier<K, V> 
parentValueGetterSupplier = parent.valueGetterSupplier();
+            return new KTableValueGetterSupplier<KIn, VIn>() {
+                final KTableValueGetterSupplier<KIn, VIn> 
parentValueGetterSupplier = parent.valueGetterSupplier();
 
-                public KTableValueGetter<K, V> get() {
+                public KTableValueGetter<KIn, VIn> get() {
                     return new 
KTableFilterValueGetter(parentValueGetterSupplier.get());
                 }
 
@@ -154,20 +158,20 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
     }
 
 
-    private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
-        private final KTableValueGetter<K, V> parentGetter;
+    private class KTableFilterValueGetter implements KTableValueGetter<KIn, 
VIn> {
+        private final KTableValueGetter<KIn, VIn> parentGetter;
 
-        KTableFilterValueGetter(final KTableValueGetter<K, V> parentGetter) {
+        KTableFilterValueGetter(final KTableValueGetter<KIn, VIn> 
parentGetter) {
             this.parentGetter = parentGetter;
         }
 
         @Override
-        public void init(final ProcessorContext context) {
+        public void init(final ProcessorContext<Void, Void> context) {
             parentGetter.init(context);
         }
 
         @Override
-        public ValueAndTimestamp<V> get(final K key) {
+        public ValueAndTimestamp<VIn> get(final KIn key) {
             return computeValue(key, parentGetter.get(key));
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 52f7b5f..a2b8702 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -125,6 +125,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
     private static final String SINK_NAME = "KTABLE-SINK-";
 
     private final ProcessorSupplier<?, ?> processorSupplier;
+    private final org.apache.kafka.streams.processor.api.ProcessorSupplier<?, 
?, ?, ?> newProcessorSupplier;
 
     private final String queryableStoreName;
 
@@ -140,6 +141,21 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
                       final InternalStreamsBuilder builder) {
         super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, 
builder);
         this.processorSupplier = processorSupplier;
+        this.newProcessorSupplier = null;
+        this.queryableStoreName = queryableStoreName;
+    }
+
+    public KTableImpl(final String name,
+                      final Serde<K> keySerde,
+                      final Serde<V> valueSerde,
+                      final Set<String> subTopologySourceNodes,
+                      final String queryableStoreName,
+                      final 
org.apache.kafka.streams.processor.api.ProcessorSupplier<?, ?, ?, ?> 
newProcessorSupplier,
+                      final GraphNode graphNode,
+                      final InternalStreamsBuilder builder) {
+        super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, 
builder);
+        this.processorSupplier = null;
+        this.newProcessorSupplier = newProcessorSupplier;
         this.queryableStoreName = queryableStoreName;
     }
 
@@ -179,7 +195,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         }
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
 
-        final KTableProcessorSupplier<K, V, V> processorSupplier =
+        final KTableNewProcessorSupplier<K, V, K, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
         final ProcessorParameters<K, V, ?, ?> processorParameters = 
unsafeCastProcessorParametersToCompletelyDifferentType(
@@ -194,7 +210,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
 
         builder.addGraphNode(this.graphNode, tableNode);
 
-        return new KTableImpl<>(
+        return new KTableImpl<K, V, V>(
             name,
             keySerde,
             valueSerde,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java
new file mode 100644
index 0000000..ef8ea30
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+
+public interface KTableNewProcessorSupplier<KIn, VIn, KOut, VOut> extends 
ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> {
+
+    KTableValueGetterSupplier<KOut, VOut> view();
+
+    /**
+     * Potentially enables sending old values.
+     * <p>
+     * If {@code forceMaterialization} is {@code true}, the method will force 
the materialization of upstream nodes to
+     * enable sending old values.
+     * <p>
+     * If {@code forceMaterialization} is {@code false}, the method will only 
enable the sending of old values <i>if</i>
+     * an upstream node is already materialized.
+     *
+     * @param forceMaterialization indicates if an upstream node should be 
forced to materialize to enable sending old
+     *                             values.
+     * @return {@code true} is sending old values is enabled, i.e. either 
because {@code forceMaterialization} was
+     * {@code true} or some upstream node is materialized.
+     */
+    boolean enableSendingOldValues(boolean forceMaterialization);
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index 12145fa..c939234 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 public interface KTableValueGetter<K, V> {
 
-    void init(ProcessorContext context);
+    void init(ProcessorContext<Void, Void> context);
 
     ValueAndTimestamp<V> get(K key);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
similarity index 61%
copy from 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
copy to 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
index f40fdfe..d325459 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
@@ -16,31 +16,34 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class SessionCacheFlushListener<K, V> implements 
CacheFlushListener<Windowed<K>, V> {
-    private final InternalProcessorContext context;
+class NewTimestampedCacheFlushListener<KOut, VOut> implements 
CacheFlushListener<KOut, VOut> {
+    private final InternalProcessorContext<KOut, Change<VOut>> context;
+
+    @SuppressWarnings("rawtypes")
     private final ProcessorNode myNode;
 
-    SessionCacheFlushListener(final ProcessorContext context) {
-        this.context = (InternalProcessorContext) context;
+    NewTimestampedCacheFlushListener(final ProcessorContext<KOut, 
Change<VOut>> context) {
+        this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
         myNode = this.context.currentNode();
     }
 
     @Override
-    public void apply(final Windowed<K> key,
-                      final V newValue,
-                      final V oldValue,
-                      final long timestamp) {
-        final ProcessorNode prev = context.currentNode();
+    public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) {
+        throw new RuntimeException("ASDFASDF");
+    }
+
+    @Override
+    public void apply(Record<KOut, Change<VOut>> record) {
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(key, new Change<>(newValue, oldValue), 
To.all().withTimestamp(key.window().end()));
+            context.forward(record);
         } finally {
             context.setCurrentNode(prev);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index f40fdfe..ceff4b7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -19,25 +19,29 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class SessionCacheFlushListener<K, V> implements 
CacheFlushListener<Windowed<K>, V> {
-    private final InternalProcessorContext context;
+class SessionCacheFlushListener<KOut, VOut> implements 
CacheFlushListener<Windowed<KOut>, VOut> {
+    private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> 
context;
+
+    @SuppressWarnings("rawtypes")
     private final ProcessorNode myNode;
 
+    @SuppressWarnings("unchecked")
     SessionCacheFlushListener(final ProcessorContext context) {
-        this.context = (InternalProcessorContext) context;
+        this.context = (InternalProcessorContext<Windowed<KOut>, 
Change<VOut>>) context;
         myNode = this.context.currentNode();
     }
 
     @Override
-    public void apply(final Windowed<K> key,
-                      final V newValue,
-                      final V oldValue,
+    public void apply(final Windowed<KOut> key,
+                      final VOut newValue,
+                      final VOut oldValue,
                       final long timestamp) {
-        final ProcessorNode prev = context.currentNode();
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();
         context.setCurrentNode(myNode);
         try {
             context.forward(key, new Change<>(newValue, oldValue), 
To.all().withTimestamp(key.window().end()));
@@ -45,4 +49,15 @@ class SessionCacheFlushListener<K, V> implements 
CacheFlushListener<Windowed<K>,
             context.setCurrentNode(prev);
         }
     }
+
+    @Override
+    public void apply(Record<Windowed<KOut>, Change<VOut>> record) {
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();
+        context.setCurrentNode(myNode);
+        try {
+            context.forward(record);
+        } finally {
+            context.setCurrentNode(prev);
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
index 5540376..6dbf435 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -16,36 +16,47 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+class TimestampedCacheFlushListener<KOut, VOut> implements 
CacheFlushListener<KOut, VOut> {
+    private final InternalProcessorContext<KOut, Change<VOut>> context;
 
-class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, 
ValueAndTimestamp<V>> {
-    private final InternalProcessorContext context;
+    @SuppressWarnings("rawtypes")
     private final ProcessorNode myNode;
 
-    TimestampedCacheFlushListener(final ProcessorContext context) {
-        this.context = (InternalProcessorContext) context;
+    TimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>> 
context) {
+        this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
         myNode = this.context.currentNode();
     }
 
+    @SuppressWarnings("unchecked")
+    TimestampedCacheFlushListener(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+        this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
+        myNode = this.context.currentNode();
+    }
+
+    @Override
+    public void apply(Record<KOut, Change<VOut>> record) {
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();
+        context.setCurrentNode(myNode);
+        try {
+            context.forward(record);
+        } finally {
+            context.setCurrentNode(prev);
+        }
+    }
+
     @Override
-    public void apply(final K key,
-                      final ValueAndTimestamp<V> newValue,
-                      final ValueAndTimestamp<V> oldValue,
-                      final long timestamp) {
-        final ProcessorNode prev = context.currentNode();
+    public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) {
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(
-                key,
-                new Change<>(getValueOrNull(newValue), 
getValueOrNull(oldValue)),
-                To.all().withTimestamp(newValue != null ? newValue.timestamp() 
: timestamp));
+            context.forward(key, new Change<>(newValue, oldValue), 
To.all().withTimestamp(timestamp));
         } finally {
             context.setCurrentNode(prev);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 910dd8f..729e9fd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 /**
@@ -30,34 +32,42 @@ import 
org.apache.kafka.streams.state.internals.WrappedStateStore;
  * @param <V> the type of the value
  */
 class TimestampedTupleForwarder<K, V> {
-    private final ProcessorContext context;
-    private final boolean sendOldValues;
+    private final InternalProcessorContext<K, Change<V>> context;
     private final boolean cachingEnabled;
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "rawtypes"})
     TimestampedTupleForwarder(final StateStore store,
-                              final ProcessorContext context,
+                              final ProcessorContext<K, Change<V>> context,
                               final TimestampedCacheFlushListener<K, V> 
flushListener,
                               final boolean sendOldValues) {
-        this.context = context;
-        this.sendOldValues = sendOldValues;
+        this.context = (InternalProcessorContext<K, Change<V>>) context;
         cachingEnabled = ((WrappedStateStore) 
store).setFlushListener(flushListener, sendOldValues);
     }
 
-    public void maybeForward(final K key,
-                             final V newValue,
-                             final V oldValue) {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    TimestampedTupleForwarder(final StateStore store,
+                              final 
org.apache.kafka.streams.processor.ProcessorContext context,
+                              final TimestampedCacheFlushListener<K, V> 
flushListener,
+                              final boolean sendOldValues) {
+        this.context = (InternalProcessorContext) context;
+        cachingEnabled = ((WrappedStateStore) 
store).setFlushListener(flushListener, sendOldValues);
+    }
+
+    public void maybeForward(final Record<K, Change<V>> record) {
+        if (!cachingEnabled) {
+            context.forward(record);
+        }
+    }
+
+    public void maybeForward(K key, V value, V oldValue) {
         if (!cachingEnabled) {
-            context.forward(key, new Change<>(newValue, sendOldValues ? 
oldValue : null));
+            context.forward(key, new Change<>(value, oldValue));
         }
     }
 
-    public void maybeForward(final K key,
-                             final V newValue,
-                             final V oldValue,
-                             final long timestamp) {
+    public void maybeForward(K key, V value, V oldValue, long newTimestamp) {
         if (!cachingEnabled) {
-            context.forward(key, new Change<>(newValue, sendOldValues ? 
oldValue : null), To.all().withTimestamp(timestamp));
+            context.forward(key, new Change<>(value, oldValue), 
To.all().withTimestamp(newTimestamp));
         }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 09a2e31..37ffbdc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -34,7 +34,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
-public abstract class AbstractProcessorContext implements 
InternalProcessorContext {
+public abstract class AbstractProcessorContext implements 
InternalProcessorContext<Object, Object> {
 
     private final TaskId taskId;
     private final String applicationId;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index ba5c580..88e47e3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -34,8 +34,8 @@ import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
  * {@link ProcessorNode} when we are forwarding items that have been evicted 
or flushed from
  * {@link ThreadCache}
  */
-public interface InternalProcessorContext
-    extends ProcessorContext, 
org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>, 
StateStoreContext {
+public interface InternalProcessorContext<KOut, VOut>
+    extends ProcessorContext, 
org.apache.kafka.streams.processor.api.ProcessorContext<KOut, VOut>, 
StateStoreContext {
 
     BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
     ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 838cff9..4a54f01 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -243,7 +243,7 @@ public class InternalTopologyBuilder {
     // even if it can be matched by multiple regex patterns. Only used by 
SourceNodeFactory
     private final Map<String, Pattern> topicToPatterns = new HashMap<>();
 
-    private class SourceNodeFactory<KIn, VIn, KOut, VOut> extends 
NodeFactory<KIn, VIn, KOut, VOut> {
+    private class SourceNodeFactory<KIn, VIn> extends NodeFactory<KIn, VIn, 
KIn, VIn> {
         private final List<String> topics;
         private final Pattern pattern;
         private final Deserializer<KIn> keyDeserializer;
@@ -291,7 +291,7 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public ProcessorNode<KIn, VIn, KOut, VOut> build() {
+        public ProcessorNode<KIn, VIn, KIn, VIn> build() {
             return new SourceNode<>(name, timestampExtractor, keyDeserializer, 
valDeserializer);
         }
 
@@ -305,7 +305,7 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private class SinkNodeFactory<KIn, VIn, KOut, VOut> extends 
NodeFactory<KIn, VIn, KOut, VOut> {
+    private class SinkNodeFactory<KIn, VIn> extends NodeFactory<KIn, VIn, 
Void, Void> {
         private final Serializer<KIn> keySerializer;
         private final Serializer<VIn> valSerializer;
         private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
@@ -325,7 +325,7 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public ProcessorNode<KIn, VIn, KOut, VOut> build() {
+        public ProcessorNode<KIn, VIn, Void, Void> build() {
             if (topicExtractor instanceof StaticTopicNameExtractor) {
                 final String topic = ((StaticTopicNameExtractor<KIn, VIn>) 
topicExtractor).topicName;
                 if (internalTopicNamesWithProperties.containsKey(topic)) {
@@ -761,12 +761,12 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private Set<SourceNodeFactory<?, ?, ?, ?>> 
findSourcesForProcessorPredecessors(final String[] predecessors) {
-        final Set<SourceNodeFactory<?, ?, ?, ?>> sourceNodes = new HashSet<>();
+    private Set<SourceNodeFactory<?, ?>> 
findSourcesForProcessorPredecessors(final String[] predecessors) {
+        final Set<SourceNodeFactory<?, ?>> sourceNodes = new HashSet<>();
         for (final String predecessor : predecessors) {
             final NodeFactory<?, ?, ?, ?> nodeFactory = 
nodeFactories.get(predecessor);
             if (nodeFactory instanceof SourceNodeFactory) {
-                sourceNodes.add((SourceNodeFactory<?, ?, ?, ?>) nodeFactory);
+                sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory);
             } else if (nodeFactory instanceof ProcessorNodeFactory) {
                 
sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?,
 ?, ?, ?>) nodeFactory).predecessors));
             }
@@ -787,10 +787,10 @@ public class InternalTopologyBuilder {
 
         final Set<String> sourceTopics = new HashSet<>();
         final Set<Pattern> sourcePatterns = new HashSet<>();
-        final Set<SourceNodeFactory<?, ?, ?, ?>> sourceNodesForPredecessor =
+        final Set<SourceNodeFactory<?, ?>> sourceNodesForPredecessor =
             
findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
 
-        for (final SourceNodeFactory<?, ?, ?, ?> sourceNodeFactory : 
sourceNodesForPredecessor) {
+        for (final SourceNodeFactory<?, ?> sourceNodeFactory : 
sourceNodesForPredecessor) {
             if (sourceNodeFactory.pattern != null) {
                 sourcePatterns.add(sourceNodeFactory.pattern);
             } else {
@@ -925,8 +925,8 @@ public class InternalTopologyBuilder {
         Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
 
         final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new 
LinkedHashMap<>();
-        final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new 
HashMap<>();
-        final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>();
+        final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>();
+        final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>();
         final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
         final Set<String> repartitionTopics = new HashSet<>();
 
@@ -946,15 +946,15 @@ public class InternalTopologyBuilder {
                 } else if (factory instanceof SourceNodeFactory) {
                     buildSourceNode(topicSourceMap,
                                     repartitionTopics,
-                                    (SourceNodeFactory<?, ?, ?, ?>) factory,
-                                    (SourceNode<?, ?, ?, ?>) node);
+                                    (SourceNodeFactory<?, ?>) factory,
+                                    (SourceNode<?, ?>) node);
 
                 } else if (factory instanceof SinkNodeFactory) {
                     buildSinkNode(processorMap,
                                   topicSinkMap,
                                   repartitionTopics,
-                                  (SinkNodeFactory<?, ?, ?, ?>) factory,
-                                  (SinkNode<Object, Object, ?, ?>) node);
+                                  (SinkNodeFactory<?, ?>) factory,
+                                  (SinkNode<?, ?>) node);
                 } else {
                     throw new TopologyException("Unknown definition class: " + 
factory.getClass().getName());
                 }
@@ -971,13 +971,16 @@ public class InternalTopologyBuilder {
     }
 
     private void buildSinkNode(final Map<String, ProcessorNode<?, ?, ?, ?>> 
processorMap,
-                               final Map<String, SinkNode<?, ?, ?, ?>> 
topicSinkMap,
+                               final Map<String, SinkNode<?, ?>> topicSinkMap,
                                final Set<String> repartitionTopics,
-                               final SinkNodeFactory<?, ?, ?, ?> 
sinkNodeFactory,
-                               final SinkNode<Object, Object, ?, ?> node) {
+                               final SinkNodeFactory<?, ?> sinkNodeFactory,
+                               final SinkNode<?, ?> node) {
+        @SuppressWarnings("unchecked") final ProcessorNode<Object, Object, ?, 
?> sinkNode =
+            (ProcessorNode<Object, Object, ?, ?>) node;
 
         for (final String predecessorName : sinkNodeFactory.predecessors) {
-            getProcessor(processorMap, predecessorName).addChild(node);
+            final ProcessorNode<Object, Object, Object, Object> processor = 
getProcessor(processorMap, predecessorName);
+            processor.addChild(sinkNode);
             if (sinkNodeFactory.topicExtractor instanceof 
StaticTopicNameExtractor) {
                 final String topic = ((StaticTopicNameExtractor<?, ?>) 
sinkNodeFactory.topicExtractor).topicName;
 
@@ -1002,10 +1005,10 @@ public class InternalTopologyBuilder {
         return (ProcessorNode<KIn, VIn, KOut, VOut>) 
processorMap.get(predecessor);
     }
 
-    private void buildSourceNode(final Map<String, SourceNode<?, ?, ?, ?>> 
topicSourceMap,
+    private void buildSourceNode(final Map<String, SourceNode<?, ?>> 
topicSourceMap,
                                  final Set<String> repartitionTopics,
-                                 final SourceNodeFactory<?, ?, ?, ?> 
sourceNodeFactory,
-                                 final SourceNode<?, ?, ?, ?> node) {
+                                 final SourceNodeFactory<?, ?> 
sourceNodeFactory,
+                                 final SourceNode<?, ?> node) {
 
         final List<String> topics = (sourceNodeFactory.pattern != null) ?
             sourceNodeFactory.getTopics(subscriptionUpdates()) :
@@ -1168,7 +1171,7 @@ public class InternalTopologyBuilder {
     private void setRegexMatchedTopicsToSourceNodes() {
         if (hasSubscriptionUpdates()) {
             for (final String nodeName : nodeToSourcePatterns.keySet()) {
-                final SourceNodeFactory<?, ?, ?, ?> sourceNode = 
(SourceNodeFactory<?, ?, ?, ?>) nodeFactories.get(nodeName);
+                final SourceNodeFactory<?, ?> sourceNode = 
(SourceNodeFactory<?, ?>) nodeFactories.get(nodeName);
                 final List<String> sourceTopics = 
sourceNode.getTopics(subscriptionUpdates);
                 //need to update nodeToSourceTopics and sourceTopicNames with 
topics matched from given regex
                 nodeToSourceTopics.put(nodeName, sourceTopics);
@@ -1358,7 +1361,7 @@ public class InternalTopologyBuilder {
         final NodeFactory<?, ?, ?, ?> nodeFactory = 
nodeFactories.get(nodeName);
 
         if (nodeFactory instanceof SourceNodeFactory) {
-            final List<String> topics = ((SourceNodeFactory<?, ?, ?, ?>) 
nodeFactory).topics;
+            final List<String> topics = ((SourceNodeFactory<?, ?>) 
nodeFactory).topics;
             return topics != null && topics.size() == 1 && 
globalTopics.contains(topics.get(0));
         }
         return false;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index a8c32c7..80fdc4f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -105,8 +105,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
         childByName.put(child.name, child);
     }
 
-    @SuppressWarnings("unchecked")
-    public void init(final InternalProcessorContext context) {
+    public void init(final InternalProcessorContext<KOut, VOut> context) {
         if (!closed)
             throw new IllegalStateException("The processor is not closed");
 
@@ -116,7 +115,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
             maybeMeasureLatency(
                 () -> {
                     if (processor != null) {
-                        processor.init((ProcessorContext<KOut, VOut>) context);
+                        processor.init(context);
                     }
                 },
                 time,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 0a0118a..8f1a460 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -33,9 +33,9 @@ public class ProcessorTopology {
     private final Logger log = 
LoggerFactory.getLogger(ProcessorTopology.class);
 
     private final List<ProcessorNode<?, ?, ?, ?>> processorNodes;
-    private final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByName;
-    private final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByTopic;
-    private final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic;
+    private final Map<String, SourceNode<?, ?>> sourceNodesByName;
+    private final Map<String, SourceNode<?, ?>> sourceNodesByTopic;
+    private final Map<String, SinkNode<?, ?>> sinksByTopic;
     private final Set<String> terminalNodes;
     private final List<StateStore> stateStores;
     private final Set<String> stateStoreNames;
@@ -46,8 +46,8 @@ public class ProcessorTopology {
     private final Map<String, String> storeToChangelogTopic;
 
     public ProcessorTopology(final List<ProcessorNode<?, ?, ?, ?>> 
processorNodes,
-                             final Map<String, SourceNode<?, ?, ?, ?>> 
sourceNodesByTopic,
-                             final Map<String, SinkNode<?, ?, ?, ?>> 
sinksByTopic,
+                             final Map<String, SourceNode<?, ?>> 
sourceNodesByTopic,
+                             final Map<String, SinkNode<?, ?>> sinksByTopic,
                              final List<StateStore> stateStores,
                              final List<StateStore> globalStateStores,
                              final Map<String, String> storeToChangelogTopic,
@@ -69,7 +69,7 @@ public class ProcessorTopology {
         }
 
         this.sourceNodesByName = new HashMap<>();
-        for (final SourceNode<?, ?, ?, ?> source : 
sourceNodesByTopic.values()) {
+        for (final SourceNode<?, ?> source : sourceNodesByTopic.values()) {
             sourceNodesByName.put(source.name(), source);
         }
     }
@@ -78,11 +78,11 @@ public class ProcessorTopology {
         return sourceNodesByTopic.keySet();
     }
 
-    public SourceNode<?, ?, ?, ?> source(final String topic) {
+    public SourceNode<?, ?> source(final String topic) {
         return sourceNodesByTopic.get(topic);
     }
 
-    public Set<SourceNode<?, ?, ?, ?>> sources() {
+    public Set<SourceNode<?, ?>> sources() {
         return new HashSet<>(sourceNodesByTopic.values());
     }
 
@@ -90,7 +90,7 @@ public class ProcessorTopology {
         return sinksByTopic.keySet();
     }
 
-    public SinkNode<?, ?, ?, ?> sink(final String topic) {
+    public SinkNode<?, ?> sink(final String topic) {
         return sinksByTopic.get(topic);
     }
 
@@ -151,9 +151,9 @@ public class ProcessorTopology {
 
     public void updateSourceTopics(final Map<String, List<String>> 
allSourceTopicsByNodeName) {
         sourceNodesByTopic.clear();
-        for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : 
sourceNodesByName.entrySet()) {
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : 
sourceNodesByName.entrySet()) {
             final String sourceNodeName = sourceNodeEntry.getKey();
-            final SourceNode<?, ?, ?, ?> sourceNode = 
sourceNodeEntry.getValue();
+            final SourceNode<?, ?> sourceNode = sourceNodeEntry.getValue();
 
             final List<String> updatedSourceTopics = 
allSourceTopicsByNodeName.get(sourceNodeName);
             if (updatedSourceTopics == null) {
@@ -211,10 +211,10 @@ public class ProcessorTopology {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {
-        final Map<SourceNode<?, ?, ?, ?>, List<String>> sourceToTopics = new 
HashMap<>();
-        for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : 
sourceNodesByTopic.entrySet()) {
+        final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new 
HashMap<>();
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : 
sourceNodesByTopic.entrySet()) {
             final String topic = sourceNodeEntry.getKey();
-            final SourceNode<?, ?, ?, ?> source = sourceNodeEntry.getValue();
+            final SourceNode<?, ?> source = sourceNodeEntry.getValue();
             sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
             sourceToTopics.get(source).add(topic);
         }
@@ -222,8 +222,8 @@ public class ProcessorTopology {
         final StringBuilder sb = new StringBuilder(indent + 
"ProcessorTopology:\n");
 
         // start from sources
-        for (final Map.Entry<SourceNode<?, ?, ?, ?>, List<String>> 
sourceNodeEntry : sourceToTopics.entrySet()) {
-            final SourceNode<?, ?, ?, ?> source = sourceNodeEntry.getKey();
+        for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : 
sourceToTopics.entrySet()) {
+            final SourceNode<?, ?> source = sourceNodeEntry.getKey();
             final List<String> topics = sourceNodeEntry.getValue();
             sb.append(source.toString(indent + "\t"))
                 .append(topicsToString(indent + "\t", topics))
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 20f1449..a965187 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -31,11 +31,11 @@ import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXC
 
 class RecordDeserializer {
     private final Logger log;
-    private final SourceNode<?, ?, ?, ?> sourceNode;
+    private final SourceNode<?, ?> sourceNode;
     private final Sensor droppedRecordsSensor;
     private final DeserializationExceptionHandler 
deserializationExceptionHandler;
 
-    RecordDeserializer(final SourceNode<?, ?, ?, ?> sourceNode,
+    RecordDeserializer(final SourceNode<?, ?> sourceNode,
                        final DeserializationExceptionHandler 
deserializationExceptionHandler,
                        final LogContext logContext,
                        final Sensor droppedRecordsSensor) {
@@ -100,7 +100,7 @@ class RecordDeserializer {
         }
     }
 
-    SourceNode<?, ?, ?, ?> sourceNode() {
+    SourceNode<?, ?> sourceNode() {
         return sourceNode;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index df7e834..6f0db8a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -39,7 +39,7 @@ public class RecordQueue {
     public static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP;
 
     private final Logger log;
-    private final SourceNode<?, ?, ?, ?> source;
+    private final SourceNode<?, ?> source;
     private final TopicPartition partition;
     private final ProcessorContext processorContext;
     private final TimestampExtractor timestampExtractor;
@@ -52,7 +52,7 @@ public class RecordQueue {
     private final Sensor droppedRecordsSensor;
 
     RecordQueue(final TopicPartition partition,
-                final SourceNode<?, ?, ?, ?> source,
+                final SourceNode<?, ?> source,
                 final TimestampExtractor timestampExtractor,
                 final DeserializationExceptionHandler 
deserializationExceptionHandler,
                 final InternalProcessorContext processorContext,
@@ -85,7 +85,7 @@ public class RecordQueue {
      *
      * @return SourceNode
      */
-    public SourceNode<?, ?, ?, ?> source() {
+    public SourceNode<?, ?> source() {
         return source;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2efa537..9091f3e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -24,14 +24,14 @@ import org.apache.kafka.streams.processor.api.Record;
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer;
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer;
 
-public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, 
KOut, VOut> {
+public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> {
 
     private Serializer<KIn> keySerializer;
     private Serializer<VIn> valSerializer;
     private final TopicNameExtractor<KIn, VIn> topicExtractor;
     private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
 
-    private InternalProcessorContext context;
+    private InternalProcessorContext<Void, Void> context;
 
     SinkNode(final String name,
              final TopicNameExtractor<KIn, VIn> topicExtractor,
@@ -50,12 +50,12 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends 
ProcessorNode<KIn, VIn, KOut
      * @throws UnsupportedOperationException if this method adds a child to a 
sink node
      */
     @Override
-    public void addChild(final ProcessorNode<KOut, VOut, ?, ?> child) {
+    public void addChild(final ProcessorNode<Void, Void, ?, ?> child) {
         throw new UnsupportedOperationException("sink node does not allow 
addChild");
     }
 
     @Override
-    public void init(final InternalProcessorContext context) {
+    public void init(final InternalProcessorContext<Void, Void> context) {
         super.init(context);
         this.context = context;
         final Serializer<?> contextKeySerializer = 
ProcessorContextUtils.getKeySerializer(context);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 7198f2f..9ff3473 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -26,9 +26,9 @@ import 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeyDeserializer;
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueDeserializer;
 
-public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, 
KOut, VOut> {
+public class SourceNode<KIn, VIn> extends ProcessorNode<KIn, VIn, KIn, VIn> {
 
-    private InternalProcessorContext context;
+    private InternalProcessorContext<KIn, VIn> context;
     private Deserializer<KIn> keyDeserializer;
     private Deserializer<VIn> valDeserializer;
     private final TimestampExtractor timestampExtractor;
@@ -59,7 +59,7 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends 
ProcessorNode<KIn, VIn, KO
     }
 
     @Override
-    public void init(final InternalProcessorContext context) {
+    public void init(final InternalProcessorContext<KIn, VIn> context) {
         // It is important to first create the sensor before calling init on 
the
         // parent object. Otherwise due to backwards compatibility an empty 
sensor
         // without parent is created with the same name.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 129407a..ee5d1ee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -102,7 +102,10 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     private final Sensor punctuateLatencySensor;
     private final Sensor bufferedRecordsSensor;
     private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();
+
+    @SuppressWarnings("rawtypes")
     private final InternalProcessorContext processorContext;
+
     private final RecordQueueCreator recordQueueCreator;
 
     private StampedRecord record;
@@ -110,6 +113,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     private boolean commitRequested = false;
     private boolean hasPendingTxCommit = false;
 
+    @SuppressWarnings("rawtypes")
     public StreamTask(final TaskId id,
                       final Set<TopicPartition> inputPartitions,
                       final ProcessorTopology topology,
@@ -317,6 +321,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void closeTopology() {
         log.trace("Closing processor topology");
 
@@ -805,6 +810,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
      * @throws IllegalStateException if the current node is not null
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
+    @SuppressWarnings("unchecked")
     @Override
     public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
                           final long timestamp,
@@ -840,6 +846,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> 
currNode,
                                         final long wallClockTime,
                                         final ProcessorRecordContext 
recordContext) {
@@ -933,6 +940,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         return purgeableConsumedOffsets;
     }
 
+    @SuppressWarnings("unchecked")
     private void initializeTopology() {
         // initialize the task by initializing all its processor nodes in the 
topology
         log.trace("Initializing processor nodes of the topology");
@@ -1107,6 +1115,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         }
     }
 
+    @SuppressWarnings("rawtypes")
     public InternalProcessorContext processorContext() {
         return processorContext;
     }
@@ -1230,7 +1239,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         }
 
         public RecordQueue createQueue(final TopicPartition partition) {
-            final SourceNode<?, ?, ?, ?> source = 
topology.source(partition.topic());
+            final SourceNode<?, ?> source = topology.source(partition.topic());
             if (source == null) {
                 throw new TopologyException(
                         "Topic is unknown to the topology. " +
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java
index 7e5f11a..c86d216 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.api.Record;
+
 /**
  * Listen to cache flush events
  * @param <K> key type
@@ -31,4 +34,9 @@ public interface CacheFlushListener<K, V> {
      * @param timestamp   timestamp of new value
      */
     void apply(final K key, final V newValue, final V oldValue, final long 
timestamp);
+
+    /**
+     * Called when records are flushed from the {@link ThreadCache}
+     */
+    void apply(final Record<K, Change<V>> record);
 }

Reply via email to