This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce83e5b  KAFKA-10540: Migrate KStream aggregate operations (#11315)
ce83e5b is described below

commit ce83e5be6644884d5c727ed49982f45eca8c136f
Author: Jorge Esteban Quilcate Otoya <quilcate.jo...@gmail.com>
AuthorDate: Thu Sep 30 17:40:40 2021 +0100

    KAFKA-10540: Migrate KStream aggregate operations (#11315)
    
    As part of the migration of KStream/KTable operations to the new
    Processor API https://issues.apache.org/jira/browse/KAFKA-8410,
    this PR includes the migration of KStream aggregate/reduce operations.
    
    Reviewers: John Roesler <vvcep...@apache.org>
---
 .../internals/CogroupedStreamAggregateBuilder.java |  18 +-
 .../internals/GroupedStreamAggregateBuilder.java   |   2 +-
 .../kstream/internals/KGroupedStreamImpl.java      |   2 +-
 .../internals/KStreamAggProcessorSupplier.java     |   7 +-
 .../kstream/internals/KStreamAggregate.java        |  76 ++++---
 .../streams/kstream/internals/KStreamReduce.java   |  49 +++--
 .../internals/KStreamSessionWindowAggregate.java   | 152 +++++++------
 .../internals/KStreamSlidingWindowAggregate.java   | 236 ++++++++++++---------
 .../kstream/internals/KStreamWindowAggregate.java  | 131 +++++++-----
 .../streams/kstream/internals/KTableImpl.java      |   2 +-
 .../internals/SessionCacheFlushListener.java       |   8 +-
 .../kstream/internals/SessionTupleForwarder.java   |  13 +-
 .../internals/graph/GraphGraceSearchUtil.java      |   5 +-
 .../streams/processor/api/ContextualProcessor.java |   2 +-
 .../kstream/internals/KGroupedStreamImplTest.java  |   4 +-
 ...KStreamSessionWindowAggregateProcessorTest.java | 142 ++++++-------
 .../KStreamSlidingWindowAggregateTest.java         |  16 +-
 .../internals/KStreamWindowAggregateTest.java      |  30 +--
 .../internals/SessionCacheFlushListenerTest.java   |   9 +-
 .../internals/SessionTupleForwarderTest.java       |  31 +--
 20 files changed, 522 insertions(+), 413 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index ebaa954..5a04a2f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -38,6 +38,7 @@ import 
org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 class CogroupedStreamAggregateBuilder<K, VOut> {
@@ -60,7 +61,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor =
+            final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
                 new KStreamAggregate<>(storeBuilder.name(), initializer, 
kGroupedStream.getValue());
             parentProcessors.add(parentProcessor);
             final StatefulProcessorNode<K, ?> statefulProcessorNode = 
getStatefulProcessorNode(
@@ -94,8 +95,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final KStreamAggProcessorSupplier<K, K, ?, ?>  parentProcessor =
-                (KStreamAggProcessorSupplier<K, K, ?, ?>) new 
KStreamWindowAggregate<K, K, VOut, W>(
+            final KStreamAggProcessorSupplier<K, ?, K, ?>  parentProcessor =
+                (KStreamAggProcessorSupplier<K, ?, K, ?>) new 
KStreamWindowAggregate<K, K, VOut, W>(
                     windows,
                     storeBuilder.name(),
                     initializer,
@@ -132,8 +133,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor =
-                (KStreamAggProcessorSupplier<K, K, ?, ?>) new 
KStreamSessionWindowAggregate<K, K, VOut>(
+            final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
+                (KStreamAggProcessorSupplier<K, ?, K, ?>) new 
KStreamSessionWindowAggregate<K, K, VOut>(
                     sessionWindows,
                     storeBuilder.name(),
                     initializer,
@@ -170,8 +171,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor =
-                (KStreamAggProcessorSupplier<K, K, ?, ?>) new 
KStreamSlidingWindowAggregate<K, K, VOut>(
+            final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
+                (KStreamAggProcessorSupplier<K, ?, K, ?>) new 
KStreamSlidingWindowAggregate<K, K, VOut>(
                     slidingWindows,
                     storeBuilder.name(),
                     initializer,
@@ -253,11 +254,10 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             builder);
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String 
processorName,
                                                                  final boolean 
stateCreated,
                                                                  final 
StoreBuilder<?> storeBuilder,
-                                                                 final 
org.apache.kafka.streams.processor.ProcessorSupplier<K, ?> kStreamAggregate) {
+                                                                 final 
ProcessorSupplier<K, ?, K, ?> kStreamAggregate) {
         final StatefulProcessorNode<K, ?> statefulProcessorNode;
         if (!stateCreated) {
             statefulProcessorNode =
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 4896d97..dfcf63d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -68,7 +68,7 @@ class GroupedStreamAggregateBuilder<K, V> {
 
     <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
                                   final StoreBuilder<?> storeBuilder,
-                                  final KStreamAggProcessorSupplier<K, KR, V, 
VR> aggregateSupplier,
+                                  final KStreamAggProcessorSupplier<K, V, KR, 
VR> aggregateSupplier,
                                   final String queryableStoreName,
                                   final Serde<KR> keySerde,
                                   final Serde<VR> valueSerde) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 1317996..d56caed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -233,7 +233,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> 
implements KGroupedS
         );
     }
 
-    private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, 
K, V, T> aggregateSupplier,
+    private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, 
V, K, T> aggregateSupplier,
                                          final String functionName,
                                          final MaterializedInternal<K, T, 
KeyValueStore<Bytes, byte[]>> materializedInternal) {
         return aggregateBuilder.build(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java
index fab61be..7d8ff94 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java
@@ -16,10 +16,11 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public interface KStreamAggProcessorSupplier<K, RK, V, T> extends 
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 
-    KTableValueGetterSupplier<RK, T> view();
+public interface KStreamAggProcessorSupplier<KIn, VIn, KAgg, VAgg> extends 
ProcessorSupplier<KIn, VIn, KAgg, Change<VAgg>> {
+
+    KTableValueGetterSupplier<KAgg, VAgg> view();
 
     void enableSendingOldValues();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 0f83849..8e0a910 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -19,6 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -28,25 +33,26 @@ import org.slf4j.LoggerFactory;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K, K, V, T> {
+public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupplier<KIn, VIn, KIn, VAgg> {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamAggregate.class);
+
     private final String storeName;
-    private final Initializer<T> initializer;
-    private final Aggregator<? super K, ? super V, T> aggregator;
+    private final Initializer<VAgg> initializer;
+    private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
 
     private boolean sendOldValues = false;
 
     KStreamAggregate(final String storeName,
-                     final Initializer<T> initializer,
-                     final Aggregator<? super K, ? super V, T> aggregator) {
+                     final Initializer<VAgg> initializer,
+                     final Aggregator<? super KIn, ? super VIn, VAgg> 
aggregator) {
         this.storeName = storeName;
         this.initializer = initializer;
         this.aggregator = aggregator;
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<K, V> get() {
+    public Processor<KIn, VIn, KIn, Change<VAgg>> get() {
         return new KStreamAggregateProcessor();
     }
 
@@ -56,13 +62,13 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
     }
 
 
-    private class KStreamAggregateProcessor extends 
org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
-        private TimestampedKeyValueStore<K, T> store;
+    private class KStreamAggregateProcessor extends ContextualProcessor<KIn, 
VIn, KIn, Change<VAgg>> {
+        private TimestampedKeyValueStore<KIn, VAgg> store;
         private Sensor droppedRecordsSensor;
-        private TimestampedTupleForwarder<K, T> tupleForwarder;
+        private TimestampedTupleForwarder<KIn, VAgg> tupleForwarder;
 
         @Override
-        public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<KIn, Change<VAgg>> context) {
             super.init(context);
             droppedRecordsSensor = droppedRecordsSensor(
                 Thread.currentThread().getName(),
@@ -77,43 +83,51 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // If the key or value is null we don't need to proceed
-            if (key == null || value == null) {
-                LOG.warn(
-                    "Skipping record due to null key or value. key=[{}] 
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    key, value, context().topic(), context().partition(), 
context().offset()
-                );
+            if (record.key() == null || record.value() == null) {
+                if (context().recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key or value. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key or value. Topic, 
partition, and offset not known."
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
-            final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
-            T oldAgg = getValueOrNull(oldAggAndTimestamp);
+            final ValueAndTimestamp<VAgg> oldAggAndTimestamp = 
store.get(record.key());
+            VAgg oldAgg = getValueOrNull(oldAggAndTimestamp);
 
-            final T newAgg;
+            final VAgg newAgg;
             final long newTimestamp;
 
             if (oldAgg == null) {
                 oldAgg = initializer.apply();
-                newTimestamp = context().timestamp();
+                newTimestamp = record.timestamp();
             } else {
                 oldAgg = oldAggAndTimestamp.value();
-                newTimestamp = Math.max(context().timestamp(), 
oldAggAndTimestamp.timestamp());
+                newTimestamp = Math.max(record.timestamp(), 
oldAggAndTimestamp.timestamp());
             }
 
-            newAgg = aggregator.apply(key, value, oldAgg);
+            newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
 
-            store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
-            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : 
null, newTimestamp);
+            store.put(record.key(), ValueAndTimestamp.make(newAgg, 
newTimestamp));
+            tupleForwarder.maybeForward(record.key(), newAgg, sendOldValues ? 
oldAgg : null, newTimestamp);
         }
     }
 
     @Override
-    public KTableValueGetterSupplier<K, T> view() {
-        return new KTableValueGetterSupplier<K, T>() {
+    public KTableValueGetterSupplier<KIn, VAgg> view() {
+        return new KTableValueGetterSupplier<KIn, VAgg>() {
 
-            public KTableValueGetter<K, T> get() {
+            public KTableValueGetter<KIn, VAgg> get() {
                 return new KStreamAggregateValueGetter();
             }
 
@@ -124,8 +138,8 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
         };
     }
 
-    private class KStreamAggregateValueGetter implements KTableValueGetter<K, 
T> {
-        private TimestampedKeyValueStore<K, T> store;
+    private class KStreamAggregateValueGetter implements 
KTableValueGetter<KIn, VAgg> {
+        private TimestampedKeyValueStore<KIn, VAgg> store;
 
         @Override
         public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
@@ -133,7 +147,7 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
         }
 
         @Override
-        public ValueAndTimestamp<T> get(final K key) {
+        public ValueAndTimestamp<VAgg> get(final KIn key) {
             return store.get(key);
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index d774217..080f9a4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -18,6 +18,11 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -27,8 +32,8 @@ import org.slf4j.LoggerFactory;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, 
V, V> {
+public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, V, 
K, V> {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamReduce.class);
 
     private final String storeName;
@@ -42,7 +47,7 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, K, V,
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<K, V> get() {
+    public Processor<K, V, K, Change<V>> get() {
         return new KStreamReduceProcessor();
     }
 
@@ -52,13 +57,13 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, K, V,
     }
 
 
-    private class KStreamReduceProcessor extends 
org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
+    private class KStreamReduceProcessor extends ContextualProcessor<K, V, K, 
Change<V>> {
         private TimestampedKeyValueStore<K, V> store;
         private TimestampedTupleForwarder<K, V> tupleForwarder;
         private Sensor droppedRecordsSensor;
 
         @Override
-        public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<K, Change<V>> context) {
             super.init(context);
             droppedRecordsSensor = droppedRecordsSensor(
                 Thread.currentThread().getName(),
@@ -74,33 +79,41 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, K, V,
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<K, V> record) {
             // If the key or value is null we don't need to proceed
-            if (key == null || value == null) {
-                LOG.warn(
-                    "Skipping record due to null key or value. key=[{}] 
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    key, value, context().topic(), context().partition(), 
context().offset()
-                );
+            if (record.key() == null || record.value() == null) {
+                if (context().recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key or value. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key. Topic, partition, 
and offset not known."
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
-            final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
+            final ValueAndTimestamp<V> oldAggAndTimestamp = 
store.get(record.key());
             final V oldAgg = getValueOrNull(oldAggAndTimestamp);
 
             final V newAgg;
             final long newTimestamp;
 
             if (oldAgg == null) {
-                newAgg = value;
-                newTimestamp = context().timestamp();
+                newAgg = record.value();
+                newTimestamp = record.timestamp();
             } else {
-                newAgg = reducer.apply(oldAgg, value);
-                newTimestamp = Math.max(context().timestamp(), 
oldAggAndTimestamp.timestamp());
+                newAgg = reducer.apply(oldAgg, record.value());
+                newTimestamp = Math.max(record.timestamp(), 
oldAggAndTimestamp.timestamp());
             }
 
-            store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
-            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : 
null, newTimestamp);
+            store.put(record.key(), ValueAndTimestamp.make(newAgg, 
newTimestamp));
+            tupleForwarder.maybeForward(record.key(), newAgg, sendOldValues ? 
oldAgg : null, newTimestamp);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index f15997f..00b6959 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -24,6 +24,11 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -36,23 +41,23 @@ import java.util.List;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamSessionWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupplier<KIn, VIn, Windowed<KIn>, VAgg> {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
 
     private final String storeName;
     private final SessionWindows windows;
-    private final Initializer<Agg> initializer;
-    private final Aggregator<? super K, ? super V, Agg> aggregator;
-    private final Merger<? super K, Agg> sessionMerger;
+    private final Initializer<VAgg> initializer;
+    private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
+    private final Merger<? super KIn, VAgg> sessionMerger;
 
     private boolean sendOldValues = false;
 
     public KStreamSessionWindowAggregate(final SessionWindows windows,
-                                         final String storeName,
-                                         final Initializer<Agg> initializer,
-                                         final Aggregator<? super K, ? super 
V, Agg> aggregator,
-                                         final Merger<? super K, Agg> 
sessionMerger) {
+        final String storeName,
+        final Initializer<VAgg> initializer,
+        final Aggregator<? super KIn, ? super VIn, VAgg> aggregator,
+        final Merger<? super KIn, VAgg> sessionMerger) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;
@@ -61,7 +66,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> 
implements KStreamAggProce
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<K, V> get() {
+    public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
         return new KStreamSessionWindowAggregateProcessor();
     }
 
@@ -74,92 +79,118 @@ public class KStreamSessionWindowAggregate<K, V, Agg> 
implements KStreamAggProce
         sendOldValues = true;
     }
 
-    private class KStreamSessionWindowAggregateProcessor extends 
org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
+    private class KStreamSessionWindowAggregateProcessor extends
+        ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
 
-        private SessionStore<K, Agg> store;
-        private SessionTupleForwarder<K, Agg> tupleForwarder;
+        private SessionStore<KIn, VAgg> store;
+        private SessionTupleForwarder<KIn, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
         @Override
-        public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> 
context) {
             super.init(context);
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
             final String threadId = Thread.currentThread().getName();
-            droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(),
+                metrics);
             store = context.getStateStore(storeName);
-            tupleForwarder = new SessionTupleForwarder<>(store, context, new 
SessionCacheFlushListener<>(context), sendOldValues);
+            tupleForwarder = new SessionTupleForwarder<>(
+                store,
+                context,
+                new SessionCacheFlushListener<>(context),
+                sendOldValues
+            );
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, we do not need proceed aggregating
             // the record with the table
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. value=[{}] topic=[{}] 
partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), 
context().offset()
-                );
+            if (record.key() == null) {
+                if (context().recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key. Topic, partition, 
and offset not known."
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
-            final long timestamp = context().timestamp();
+            final long timestamp = record.timestamp();
             observedStreamTime = Math.max(observedStreamTime, timestamp);
             final long closeTime = observedStreamTime - 
windows.gracePeriodMs() - windows.inactivityGap();
 
-            final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
+            final List<KeyValue<Windowed<KIn>, VAgg>> merged = new 
ArrayList<>();
             final SessionWindow newSessionWindow = new 
SessionWindow(timestamp, timestamp);
             SessionWindow mergedWindow = newSessionWindow;
-            Agg agg = initializer.apply();
+            VAgg agg = initializer.apply();
 
             try (
-                final KeyValueIterator<Windowed<K>, Agg> iterator = 
store.findSessions(
-                    key,
+                final KeyValueIterator<Windowed<KIn>, VAgg> iterator = 
store.findSessions(
+                    record.key(),
                     timestamp - windows.inactivityGap(),
                     timestamp + windows.inactivityGap()
                 )
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<K>, Agg> next = iterator.next();
+                    final KeyValue<Windowed<KIn>, VAgg> next = iterator.next();
                     merged.add(next);
-                    agg = sessionMerger.apply(key, agg, next.value);
+                    agg = sessionMerger.apply(record.key(), agg, next.value);
                     mergedWindow = mergeSessionWindow(mergedWindow, 
(SessionWindow) next.key.window());
                 }
             }
 
             if (mergedWindow.end() < closeTime) {
-                LOG.warn(
-                    "Skipping record for expired window. " +
-                        "key=[{}] " +
-                        "topic=[{}] " +
-                        "partition=[{}] " +
-                        "offset=[{}] " +
-                        "timestamp=[{}] " +
-                        "window=[{},{}] " +
-                        "expiration=[{}] " +
-                        "streamTime=[{}]",
-                    key,
-                    context().topic(),
-                    context().partition(),
-                    context().offset(),
-                    timestamp,
-                    mergedWindow.start(),
-                    mergedWindow.end(),
-                    closeTime,
-                    observedStreamTime
-                );
+                if (context().recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record for expired window. " +
+                            "topic=[{}] " +
+                            "partition=[{}] " +
+                            "offset=[{}] " +
+                            "timestamp=[{}] " +
+                            "window=[{},{}] " +
+                            "expiration=[{}] " +
+                            "streamTime=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset(),
+                        timestamp,
+                        mergedWindow.start(), mergedWindow.end(),
+                        closeTime,
+                        observedStreamTime
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record for expired window. Topic, partition, 
and offset not known. " +
+                            "timestamp=[{}] " +
+                            "window=[{},{}] " +
+                            "expiration=[{}] " +
+                            "streamTime=[{}]",
+                        timestamp,
+                        mergedWindow.start(), mergedWindow.end(),
+                        closeTime,
+                        observedStreamTime
+                    );
+                }
                 droppedRecordsSensor.record();
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
-                    for (final KeyValue<Windowed<K>, Agg> session : merged) {
+                    for (final KeyValue<Windowed<KIn>, VAgg> session : merged) 
{
                         store.remove(session.key);
-                        tupleForwarder.maybeForward(session.key, null, 
sendOldValues ? session.value : null);
+                        tupleForwarder.maybeForward(session.key, null,
+                            sendOldValues ? session.value : null);
                     }
                 }
 
-                agg = aggregator.apply(key, value, agg);
-                final Windowed<K> sessionKey = new Windowed<>(key, 
mergedWindow);
+                agg = aggregator.apply(record.key(), record.value(), agg);
+                final Windowed<KIn> sessionKey = new Windowed<>(record.key(), 
mergedWindow);
                 store.put(sessionKey, agg);
                 tupleForwarder.maybeForward(sessionKey, agg, null);
             }
@@ -173,22 +204,23 @@ public class KStreamSessionWindowAggregate<K, V, Agg> 
implements KStreamAggProce
     }
 
     @Override
-    public KTableValueGetterSupplier<Windowed<K>, Agg> view() {
-        return new KTableValueGetterSupplier<Windowed<K>, Agg>() {
+    public KTableValueGetterSupplier<Windowed<KIn>, VAgg> view() {
+        return new KTableValueGetterSupplier<Windowed<KIn>, VAgg>() {
             @Override
-            public KTableValueGetter<Windowed<K>, Agg> get() {
+            public KTableValueGetter<Windowed<KIn>, VAgg> get() {
                 return new KTableSessionWindowValueGetter();
             }
 
             @Override
             public String[] storeNames() {
-                return new String[] {storeName};
+                return new String[]{storeName};
             }
         };
     }
 
-    private class KTableSessionWindowValueGetter implements 
KTableValueGetter<Windowed<K>, Agg> {
-        private SessionStore<K, Agg> store;
+    private class KTableSessionWindowValueGetter implements 
KTableValueGetter<Windowed<KIn>, VAgg> {
+
+        private SessionStore<KIn, VAgg> store;
 
         @Override
         public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
@@ -196,7 +228,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> 
implements KStreamAggProce
         }
 
         @Override
-        public ValueAndTimestamp<Agg> get(final Windowed<K> key) {
+        public ValueAndTimestamp<VAgg> get(final Windowed<KIn> key) {
             return ValueAndTimestamp.make(
                 store.fetchSession(key.key(), key.window().start(), 
key.window().end()),
                 key.window().end());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index db91bb3..ac4710e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -24,6 +24,11 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -37,21 +42,21 @@ import java.util.Set;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamSlidingWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupplier<KIn, VIn, Windowed<KIn>, VAgg> {
+
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final String storeName;
     private final SlidingWindows windows;
-    private final Initializer<Agg> initializer;
-    private final Aggregator<? super K, ? super V, Agg> aggregator;
+    private final Initializer<VAgg> initializer;
+    private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
 
     private boolean sendOldValues = false;
 
     public KStreamSlidingWindowAggregate(final SlidingWindows windows,
                                          final String storeName,
-                                         final Initializer<Agg> initializer,
-                                         final Aggregator<? super K, ? super 
V, Agg> aggregator) {
+                                         final Initializer<VAgg> initializer,
+                                         final Aggregator<? super KIn, ? super 
VIn, VAgg> aggregator) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;
@@ -59,7 +64,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<K, V> get() {
+    public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
         return new KStreamSlidingWindowAggregateProcessor();
     }
 
@@ -72,17 +77,18 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
         sendOldValues = true;
     }
 
-    private class KStreamSlidingWindowAggregateProcessor extends 
org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
-        private TimestampedWindowStore<K, Agg> windowStore;
-        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+    private class KStreamSlidingWindowAggregateProcessor extends 
ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
+        private TimestampedWindowStore<KIn, VAgg> windowStore;
+        private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
         private Boolean reverseIteratorPossible = null;
 
         @Override
-        public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> 
context) {
             super.init(context);
-            final InternalProcessorContext internalProcessorContext = 
(InternalProcessorContext) context;
+            final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> 
internalProcessorContext =
+                (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) 
context;
             final StreamsMetricsImpl metrics = 
internalProcessorContext.metrics();
             final String threadId = Thread.currentThread().getName();
             droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
@@ -95,52 +101,71 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
         }
 
         @Override
-        public void process(final K key, final V value) {
-            if (key == null || value == null) {
-                log.warn(
-                    "Skipping record due to null key or value. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), 
context().offset()
-                );
+        public void process(final Record<KIn, VIn> record) {
+            if (record.key() == null || record.value() == null) {
+                if (context().recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                    log.warn(
+                        "Skipping record due to null key or value. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+                    );
+                } else {
+                    log.warn(
+                        "Skipping record due to null key or value. Topic, 
partition, and offset not known."
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
-            final long inputRecordTimestamp = context().timestamp();
+            final long inputRecordTimestamp = record.timestamp();
             observedStreamTime = Math.max(observedStreamTime, 
inputRecordTimestamp);
             final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
 
             if (inputRecordTimestamp + 1L + windows.timeDifferenceMs() <= 
closeTime) {
-                log.warn(
-                    "Skipping record for expired window. " +
-                        "key=[{}] " +
-                        "topic=[{}] " +
-                        "partition=[{}] " +
-                        "offset=[{}] " +
-                        "timestamp=[{}] " +
-                        "window=[{},{}] " +
-                        "expiration=[{}] " +
-                        "streamTime=[{}]",
-                    key,
-                    context().topic(),
-                    context().partition(),
-                    context().offset(),
-                    context().timestamp(),
-                    inputRecordTimestamp - windows.timeDifferenceMs(), 
inputRecordTimestamp,
-                    closeTime,
-                    observedStreamTime
-                );
+                if (context().recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                    log.warn(
+                        "Skipping record for expired window. " +
+                            "topic=[{}] " +
+                            "partition=[{}] " +
+                            "offset=[{}] " +
+                            "timestamp=[{}] " +
+                            "window=[{},{}] " +
+                            "expiration=[{}] " +
+                            "streamTime=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset(),
+                        record.timestamp(),
+                        inputRecordTimestamp - windows.timeDifferenceMs(), 
inputRecordTimestamp,
+                        closeTime,
+                        observedStreamTime
+                    );
+                } else {
+                    log.warn(
+                        "Skipping record for expired window. Topic, partition, 
and offset not known. " +
+                            "timestamp=[{}] " +
+                            "window=[{},{}] " +
+                            "expiration=[{}] " +
+                            "streamTime=[{}]",
+                        record.timestamp(),
+                        inputRecordTimestamp - windows.timeDifferenceMs(), 
inputRecordTimestamp,
+                        closeTime,
+                        observedStreamTime
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
             if (inputRecordTimestamp < windows.timeDifferenceMs()) {
-                processEarly(key, value, inputRecordTimestamp, closeTime);
+                processEarly(record.key(), record.value(), 
inputRecordTimestamp, closeTime);
                 return;
             }
 
             if (reverseIteratorPossible == null) {
                 try {
-                    windowStore.backwardFetch(key, 0L, 0L);
+                    windowStore.backwardFetch(record.key(), 0L, 0L);
                     reverseIteratorPossible = true;
                     log.debug("Sliding Windows aggregate using a reverse 
iterator");
                 } catch (final UnsupportedOperationException e)  {
@@ -150,19 +175,19 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
             }
 
             if (reverseIteratorPossible) {
-                processReverse(key, value, inputRecordTimestamp, closeTime);
+                processReverse(record.key(), record.value(), 
inputRecordTimestamp, closeTime);
             } else {
-                processInOrder(key, value, inputRecordTimestamp, closeTime);
+                processInOrder(record.key(), record.value(), 
inputRecordTimestamp, closeTime);
             }
         }
 
-        public void processInOrder(final K key, final V value, final long 
inputRecordTimestamp, final long closeTime) {
+        public void processInOrder(final KIn key, final VIn value, final long 
inputRecordTimestamp, final long closeTime) {
 
             final Set<Long> windowStartTimes = new HashSet<>();
 
             // aggregate that will go in the current record’s left/right 
window (if needed)
-            ValueAndTimestamp<Agg> leftWinAgg = null;
-            ValueAndTimestamp<Agg> rightWinAgg = null;
+            ValueAndTimestamp<VAgg> leftWinAgg = null;
+            ValueAndTimestamp<VAgg> rightWinAgg = null;
 
             //if current record's left/right windows already exist
             boolean leftWinAlreadyCreated = false;
@@ -171,7 +196,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
             Long previousRecordTimestamp = null;
 
             try (
-                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> 
iterator = windowStore.fetch(
                     key,
                     key,
                     Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
@@ -179,7 +204,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
                     inputRecordTimestamp + 1)
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> 
windowBeingProcessed = iterator.next();
+                    final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> 
windowBeingProcessed = iterator.next();
                     final long startTime = 
windowBeingProcessed.key.window().start();
                     windowStartTimes.add(startTime);
                     final long endTime = startTime + 
windows.timeDifferenceMs();
@@ -211,13 +236,13 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
             createWindows(key, value, inputRecordTimestamp, closeTime, 
windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, 
rightWinAlreadyCreated, previousRecordTimestamp);
         }
 
-        public void processReverse(final K key, final V value, final long 
inputRecordTimestamp, final long closeTime) {
+        public void processReverse(final KIn key, final VIn value, final long 
inputRecordTimestamp, final long closeTime) {
 
             final Set<Long> windowStartTimes = new HashSet<>();
 
             // aggregate that will go in the current record’s left/right 
window (if needed)
-            ValueAndTimestamp<Agg> leftWinAgg = null;
-            ValueAndTimestamp<Agg> rightWinAgg = null;
+            ValueAndTimestamp<VAgg> leftWinAgg = null;
+            ValueAndTimestamp<VAgg> rightWinAgg = null;
 
             //if current record's left/right windows already exist
             boolean leftWinAlreadyCreated = false;
@@ -226,7 +251,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
             Long previousRecordTimestamp = null;
 
             try (
-                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> 
iterator = windowStore.backwardFetch(
                     key,
                     key,
                     Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
@@ -234,7 +259,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
                     inputRecordTimestamp + 1)
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> 
windowBeingProcessed = iterator.next();
+                    final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> 
windowBeingProcessed = iterator.next();
                     final long startTime = 
windowBeingProcessed.key.window().start();
                     windowStartTimes.add(startTime);
                     final long endTime = startTime + 
windows.timeDifferenceMs();
@@ -275,7 +300,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
          * windows with negative start times, which is not supported. Instead, 
we will put them into the [0, timeDifferenceMs]
          * window as a "workaround", and we will update or create their right 
windows as new records come in later
          */
-        private void processEarly(final K key, final V value, final long 
inputRecordTimestamp, final long closeTime) {
+        private void processEarly(final KIn key, final VIn value, final long 
inputRecordTimestamp, final long closeTime) {
             if (inputRecordTimestamp < 0 || inputRecordTimestamp >= 
windows.timeDifferenceMs()) {
                 log.error(
                     "Early record for sliding windows must fall between fall 
between 0 <= inputRecordTimestamp. Timestamp {} does not fall between 0 <= {}",
@@ -285,15 +310,15 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
             }
 
             // A window from [0, timeDifferenceMs] that holds all early records
-            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
-            ValueAndTimestamp<Agg> rightWinAgg = null;
+            KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> combinedWindow = 
null;
+            ValueAndTimestamp<VAgg> rightWinAgg = null;
             boolean rightWinAlreadyCreated = false;
             final Set<Long> windowStartTimes = new HashSet<>();
 
             Long previousRecordTimestamp = null;
 
             try (
-                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> 
iterator = windowStore.fetch(
                     key,
                     key,
                     0,
@@ -301,7 +326,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
                     inputRecordTimestamp + 1)
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> 
windowBeingProcessed = iterator.next();
+                    final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> 
windowBeingProcessed = iterator.next();
                     final long startTime = 
windowBeingProcessed.key.window().start();
                     windowStartTimes.add(startTime);
                     final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
@@ -352,7 +377,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
 
             if (combinedWindow == null) {
                 final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+                final ValueAndTimestamp<VAgg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
                 updateWindowAndForward(window, valueAndTime, key, value, 
closeTime, inputRecordTimestamp);
 
             } else {
@@ -362,13 +387,13 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
 
         }
 
-        private void createWindows(final K key,
-                                   final V value,
+        private void createWindows(final KIn key,
+                                   final VIn value,
                                    final long inputRecordTimestamp,
                                    final long closeTime,
                                    final Set<Long> windowStartTimes,
-                                   final ValueAndTimestamp<Agg> rightWinAgg,
-                                   final ValueAndTimestamp<Agg> leftWinAgg,
+                                   final ValueAndTimestamp<VAgg> rightWinAgg,
+                                   final ValueAndTimestamp<VAgg> leftWinAgg,
                                    final boolean leftWinAlreadyCreated,
                                    final boolean rightWinAlreadyCreated,
                                    final Long previousRecordTimestamp) {
@@ -382,7 +407,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
+                final ValueAndTimestamp<VAgg> valueAndTime;
                 if (leftWindowNotEmpty(previousRecordTimestamp, 
inputRecordTimestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
inputRecordTimestamp);
                 } else {
@@ -399,8 +424,8 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
         }
 
         private void createCurrentRecordRightWindow(final long 
inputRecordTimestamp,
-                                                    final 
ValueAndTimestamp<Agg> rightWinAgg,
-                                                    final K key) {
+                                                    final 
ValueAndTimestamp<VAgg> rightWinAgg,
+                                                    final KIn key) {
             final TimeWindow window = new TimeWindow(inputRecordTimestamp + 1, 
inputRecordTimestamp + 1 + windows.timeDifferenceMs());
             windowStore.put(
                 key,
@@ -415,11 +440,11 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
 
         private void createPreviousRecordRightWindow(final long windowStart,
                                                      final long 
inputRecordTimestamp,
-                                                     final K key,
-                                                     final V value,
+                                                     final KIn key,
+                                                     final VIn value,
                                                      final long closeTime) {
             final TimeWindow window = new TimeWindow(windowStart, windowStart 
+ windows.timeDifferenceMs());
-            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+            final ValueAndTimestamp<VAgg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
             updateWindowAndForward(window, valueAndTime, key, value, 
closeTime, inputRecordTimestamp);
         }
 
@@ -436,22 +461,22 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
         }
 
         // checks if the aggregate we found has records that fall into the 
current record's right window; if yes, the right window is not empty
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long inputRecordTimestamp) {
+        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<VAgg> 
rightWinAgg, final long inputRecordTimestamp) {
             return rightWinAgg != null && rightWinAgg.timestamp() > 
inputRecordTimestamp;
         }
 
         private void updateWindowAndForward(final Window window,
-                                            final ValueAndTimestamp<Agg> 
valueAndTime,
-                                            final K key,
-                                            final V value,
+                                            final ValueAndTimestamp<VAgg> 
valueAndTime,
+                                            final KIn key,
+                                            final VIn value,
                                             final long closeTime,
                                             final long inputRecordTimestamp) {
             final long windowStart = window.start();
             final long windowEnd = window.end();
             if (windowEnd > closeTime) {
                 //get aggregate from existing window
-                final Agg oldAgg = getValueOrNull(valueAndTime);
-                final Agg newAgg = aggregator.apply(key, value, oldAgg);
+                final VAgg oldAgg = getValueOrNull(valueAndTime);
+                final VAgg newAgg = aggregator.apply(key, value, oldAgg);
 
                 final long newTimestamp = oldAgg == null ? 
inputRecordTimestamp : Math.max(inputRecordTimestamp, valueAndTime.timestamp());
                 windowStore.put(
@@ -464,35 +489,46 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
                     sendOldValues ? oldAgg : null,
                     newTimestamp);
             } else {
-                log.warn(
-                    "Skipping record for expired window. " +
-                        "key=[{}] " +
-                        "topic=[{}] " +
-                        "partition=[{}] " +
-                        "offset=[{}] " +
-                        "timestamp=[{}] " +
-                        "window=[{},{}] " +
-                        "expiration=[{}] " +
-                        "streamTime=[{}]",
-                    key,
-                    context().topic(),
-                    context().partition(),
-                    context().offset(),
-                    context().timestamp(),
-                    windowStart, windowEnd,
-                    closeTime,
-                    observedStreamTime
-                );
+                if (context().recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                    log.warn(
+                        "Skipping record for expired window. " +
+                            "topic=[{}] " +
+                            "partition=[{}] " +
+                            "offset=[{}] " +
+                            "timestamp=[{}] " +
+                            "window=[{},{}] " +
+                            "expiration=[{}] " +
+                            "streamTime=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset(),
+                        inputRecordTimestamp,
+                        windowStart, windowEnd,
+                        closeTime,
+                        observedStreamTime
+                    );
+                } else {
+                    log.warn(
+                        "Skipping record for expired window. Topic, partition, 
and offset not known. " +
+                            "timestamp=[{}] " +
+                            "window=[{},{}] " +
+                            "expiration=[{}] " +
+                            "streamTime=[{}]",
+                        inputRecordTimestamp,
+                        windowStart, windowEnd,
+                        closeTime,
+                        observedStreamTime
+                    );
+                }
                 droppedRecordsSensor.record();
             }
         }
     }
 
     @Override
-    public KTableValueGetterSupplier<Windowed<K>, Agg> view() {
-        return new KTableValueGetterSupplier<Windowed<K>, Agg>() {
+    public KTableValueGetterSupplier<Windowed<KIn>, VAgg> view() {
+        return new KTableValueGetterSupplier<Windowed<KIn>, VAgg>() {
 
-            public KTableValueGetter<Windowed<K>, Agg> get() {
+            public KTableValueGetter<Windowed<KIn>, VAgg> get() {
                 return new KStreamWindowAggregateValueGetter();
             }
 
@@ -503,8 +539,8 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
         };
     }
 
-    private class KStreamWindowAggregateValueGetter implements 
KTableValueGetter<Windowed<K>, Agg> {
-        private TimestampedWindowStore<K, Agg> windowStore;
+    private class KStreamWindowAggregateValueGetter implements 
KTableValueGetter<Windowed<KIn>, VAgg> {
+        private TimestampedWindowStore<KIn, VAgg> windowStore;
 
         @Override
         public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
@@ -512,8 +548,8 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> 
implements KStreamAggProce
         }
 
         @Override
-        public ValueAndTimestamp<Agg> get(final Windowed<K> windowedKey) {
-            final K key = windowedKey.key();
+        public ValueAndTimestamp<VAgg> get(final Windowed<KIn> windowedKey) {
+            final KIn key = windowedKey.key();
             return windowStore.fetch(key, windowedKey.window().start());
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index aaf6a34..5730ae6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -23,6 +23,11 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -35,21 +40,21 @@ import java.util.Map;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> 
implements KStreamAggProcessorSupplier<KIn, VIn, Windowed<KIn>, VAgg> {
+
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final String storeName;
     private final Windows<W> windows;
-    private final Initializer<Agg> initializer;
-    private final Aggregator<? super K, ? super V, Agg> aggregator;
+    private final Initializer<VAgg> initializer;
+    private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
 
     private boolean sendOldValues = false;
 
     public KStreamWindowAggregate(final Windows<W> windows,
                                   final String storeName,
-                                  final Initializer<Agg> initializer,
-                                  final Aggregator<? super K, ? super V, Agg> 
aggregator) {
+                                  final Initializer<VAgg> initializer,
+                                  final Aggregator<? super KIn, ? super VIn, 
VAgg> aggregator) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;
@@ -57,7 +62,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends 
Window> implements KStr
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<K, V> get() {
+    public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
         return new KStreamWindowAggregateProcessor();
     }
 
@@ -71,16 +76,17 @@ public class KStreamWindowAggregate<K, V, Agg, W extends 
Window> implements KStr
     }
 
 
-    private class KStreamWindowAggregateProcessor extends 
org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
-        private TimestampedWindowStore<K, Agg> windowStore;
-        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+    private class KStreamWindowAggregateProcessor extends 
ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
+        private TimestampedWindowStore<KIn, VAgg> windowStore;
+        private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
         @Override
-        public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> 
context) {
             super.init(context);
-            final InternalProcessorContext internalProcessorContext = 
(InternalProcessorContext) context;
+            final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> 
internalProcessorContext =
+                (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) 
context;
             final StreamsMetricsImpl metrics = 
internalProcessorContext.metrics();
             final String threadId = Thread.currentThread().getName();
             droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
@@ -93,18 +99,26 @@ public class KStreamWindowAggregate<K, V, Agg, W extends 
Window> implements KStr
         }
 
         @Override
-        public void process(final K key, final V value) {
-            if (key == null) {
-                log.warn(
-                    "Skipping record due to null key. value=[{}] topic=[{}] 
partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), 
context().offset()
-                );
+        public void process(final Record<KIn, VIn> record) {
+            if (record.key() == null) {
+                if (context().recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                    log.warn(
+                        "Skipping record due to null key. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+                    );
+                } else {
+                    log.warn(
+                        "Skipping record due to null key. Topic, partition, 
and offset not known."
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
             // first get the matching windows
-            final long timestamp = context().timestamp();
+            final long timestamp = record.timestamp();
             observedStreamTime = Math.max(observedStreamTime, timestamp);
             final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
 
@@ -115,48 +129,59 @@ public class KStreamWindowAggregate<K, V, Agg, W extends 
Window> implements KStr
                 final Long windowStart = entry.getKey();
                 final long windowEnd = entry.getValue().end();
                 if (windowEnd > closeTime) {
-                    final ValueAndTimestamp<Agg> oldAggAndTimestamp = 
windowStore.fetch(key, windowStart);
-                    Agg oldAgg = getValueOrNull(oldAggAndTimestamp);
+                    final ValueAndTimestamp<VAgg> oldAggAndTimestamp = 
windowStore.fetch(record.key(), windowStart);
+                    VAgg oldAgg = getValueOrNull(oldAggAndTimestamp);
 
-                    final Agg newAgg;
+                    final VAgg newAgg;
                     final long newTimestamp;
 
                     if (oldAgg == null) {
                         oldAgg = initializer.apply();
-                        newTimestamp = context().timestamp();
+                        newTimestamp = record.timestamp();
                     } else {
-                        newTimestamp = Math.max(context().timestamp(), 
oldAggAndTimestamp.timestamp());
+                        newTimestamp = Math.max(record.timestamp(), 
oldAggAndTimestamp.timestamp());
                     }
 
-                    newAgg = aggregator.apply(key, value, oldAgg);
+                    newAgg = aggregator.apply(record.key(), record.value(), 
oldAgg);
 
                     // update the store with the new value
-                    windowStore.put(key, ValueAndTimestamp.make(newAgg, 
newTimestamp), windowStart);
+                    windowStore.put(record.key(), 
ValueAndTimestamp.make(newAgg, newTimestamp), windowStart);
                     tupleForwarder.maybeForward(
-                        new Windowed<>(key, entry.getValue()),
+                        new Windowed<>(record.key(), entry.getValue()),
                         newAgg,
                         sendOldValues ? oldAgg : null,
                         newTimestamp);
                 } else {
-                    log.warn(
-                        "Skipping record for expired window. " +
-                            "key=[{}] " +
-                            "topic=[{}] " +
-                            "partition=[{}] " +
-                            "offset=[{}] " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}) " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        key,
-                        context().topic(),
-                        context().partition(),
-                        context().offset(),
-                        context().timestamp(),
-                        windowStart, windowEnd,
-                        closeTime,
-                        observedStreamTime
-                    );
+                    if (context().recordMetadata().isPresent()) {
+                        final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                        log.warn(
+                            "Skipping record for expired window. " +
+                                "topic=[{}] " +
+                                "partition=[{}] " +
+                                "offset=[{}] " +
+                                "timestamp=[{}] " +
+                                "window=[{},{}) " +
+                                "expiration=[{}] " +
+                                "streamTime=[{}]",
+                            recordMetadata.topic(), 
recordMetadata.partition(), recordMetadata.offset(),
+                            record.timestamp(),
+                            windowStart, windowEnd,
+                            closeTime,
+                            observedStreamTime
+                        );
+                    } else {
+                        log.warn(
+                            "Skipping record for expired window. Topic, 
partition, and offset not known. " +
+                                "timestamp=[{}] " +
+                                "window=[{},{}] " +
+                                "expiration=[{}] " +
+                                "streamTime=[{}]",
+                            record.timestamp(),
+                            windowStart, windowEnd,
+                            closeTime,
+                            observedStreamTime
+                        );
+                    }
                     droppedRecordsSensor.record();
                 }
             }
@@ -164,10 +189,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends 
Window> implements KStr
     }
 
     @Override
-    public KTableValueGetterSupplier<Windowed<K>, Agg> view() {
-        return new KTableValueGetterSupplier<Windowed<K>, Agg>() {
+    public KTableValueGetterSupplier<Windowed<KIn>, VAgg> view() {
+        return new KTableValueGetterSupplier<Windowed<KIn>, VAgg>() {
 
-            public KTableValueGetter<Windowed<K>, Agg> get() {
+            public KTableValueGetter<Windowed<KIn>, VAgg> get() {
                 return new KStreamWindowAggregateValueGetter();
             }
 
@@ -178,8 +203,8 @@ public class KStreamWindowAggregate<K, V, Agg, W extends 
Window> implements KStr
         };
     }
 
-    private class KStreamWindowAggregateValueGetter implements 
KTableValueGetter<Windowed<K>, Agg> {
-        private TimestampedWindowStore<K, Agg> windowStore;
+    private class KStreamWindowAggregateValueGetter implements 
KTableValueGetter<Windowed<KIn>, VAgg> {
+        private TimestampedWindowStore<KIn, VAgg> windowStore;
 
         @Override
         public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
@@ -188,8 +213,8 @@ public class KStreamWindowAggregate<K, V, Agg, W extends 
Window> implements KStr
 
         @SuppressWarnings("unchecked")
         @Override
-        public ValueAndTimestamp<Agg> get(final Windowed<K> windowedKey) {
-            final K key = windowedKey.key();
+        public ValueAndTimestamp<VAgg> get(final Windowed<KIn> windowedKey) {
+            final KIn key = windowedKey.key();
             final W window = (W) windowedKey.window();
             return windowStore.fetch(key, window.start());
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 5339b6a..6f73044 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -832,7 +832,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
             source.materialize();
             return new 
KTableSourceValueGetterSupplier<>(source.queryableName());
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
-            return ((KStreamAggProcessorSupplier<?, K, S, V>) 
processorSupplier).view();
+            return ((KStreamAggProcessorSupplier<?, S, K, V>) 
processorSupplier).view();
         } else if (processorSupplier instanceof KTableNewProcessorSupplier) {
             return ((KTableNewProcessorSupplier<?, ?, K, V>) 
processorSupplier).view();
         } else {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index daa7c64..a2c95bf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -17,8 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
@@ -30,8 +29,7 @@ class SessionCacheFlushListener<KOut, VOut> implements 
CacheFlushListener<Window
     @SuppressWarnings("rawtypes")
     private final ProcessorNode myNode;
 
-    @SuppressWarnings("unchecked")
-    SessionCacheFlushListener(final ProcessorContext context) {
+    SessionCacheFlushListener(final ProcessorContext<Windowed<KOut>, 
Change<VOut>> context) {
         this.context = (InternalProcessorContext<Windowed<KOut>, 
Change<VOut>>) context;
         myNode = this.context.currentNode();
     }
@@ -44,7 +42,7 @@ class SessionCacheFlushListener<KOut, VOut> implements 
CacheFlushListener<Window
         @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(key, new Change<>(newValue, oldValue), 
To.all().withTimestamp(key.window().end()));
+            context.forward(new Record<>(key, new Change<>(newValue, 
oldValue), key.window().end()));
         } finally {
             context.setCurrentNode(prev);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
index bad255a..ac475a4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
@@ -17,9 +17,9 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
@@ -32,13 +32,13 @@ import 
org.apache.kafka.streams.state.internals.WrappedStateStore;
  * @param <V>
  */
 class SessionTupleForwarder<K, V> {
-    private final ProcessorContext context;
+    private final ProcessorContext<Windowed<K>, Change<V>> context;
     private final boolean sendOldValues;
     private final boolean cachingEnabled;
 
     @SuppressWarnings("unchecked")
     SessionTupleForwarder(final StateStore store,
-                          final ProcessorContext context,
+                          final ProcessorContext<Windowed<K>, Change<V>> 
context,
                           final CacheFlushListener<Windowed<K>, V> 
flushListener,
                           final boolean sendOldValues) {
         this.context = context;
@@ -50,7 +50,10 @@ class SessionTupleForwarder<K, V> {
                              final V newValue,
                              final V oldValue) {
         if (!cachingEnabled) {
-            context.forward(key, new Change<>(newValue, sendOldValues ? 
oldValue : null), To.all().withTimestamp(key.window().end()));
+            context.forward(new Record<>(
+                key,
+                new Change<>(newValue, sendOldValues ? oldValue : null),
+                key.window().end()));
         }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
index 4b39219..66ffdc0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.Windows;
 import 
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
 import 
org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate;
 import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 
 public final class GraphGraceSearchUtil {
     private GraphGraceSearchUtil() {}
@@ -69,10 +70,10 @@ public final class GraphGraceSearchUtil {
         return inheritedGrace;
     }
 
+    @SuppressWarnings("rawtypes")
     private static Long extractGracePeriod(final GraphNode node) {
         if (node instanceof StatefulProcessorNode) {
-            @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-            final org.apache.kafka.streams.processor.ProcessorSupplier 
processorSupplier = ((StatefulProcessorNode) 
node).processorParameters().oldProcessorSupplier();
+            final ProcessorSupplier processorSupplier = 
((StatefulProcessorNode) node).processorParameters().processorSupplier();
             if (processorSupplier instanceof KStreamWindowAggregate) {
                 final KStreamWindowAggregate kStreamWindowAggregate = 
(KStreamWindowAggregate) processorSupplier;
                 final Windows windows = kStreamWindowAggregate.windows();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java
index d2522e3..96cc278 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java
@@ -27,7 +27,7 @@ package org.apache.kafka.streams.processor.api;
  */
 public abstract class ContextualProcessor<KIn, VIn, KOut, VOut> implements 
Processor<KIn, VIn, KOut, VOut> {
 
-    protected ProcessorContext<KOut, VOut> context;
+    private ProcessorContext<KOut, VOut> context;
 
     protected ContextualProcessor() {}
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index eba39a7..333884e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -590,7 +590,7 @@ public class KGroupedStreamImplTest {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("Skipping record due to null key or value. key=[3] 
value=[null] topic=[topic] partition=[0] "
+                hasItem("Skipping record due to null key or value. 
topic=[topic] partition=[0] "
                     + "offset=[6]")
             );
         }
@@ -640,7 +640,7 @@ public class KGroupedStreamImplTest {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("Skipping record due to null key or value. key=[3] 
value=[null] topic=[topic] partition=[0] "
+                hasItem("Skipping record due to null key or value. 
topic=[topic] partition=[0] "
                     + "offset=[6]")
             );
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index a897d11..ab92b70 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -32,11 +32,11 @@ import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
-import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.processor.internals.ToInternal;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -70,29 +70,27 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamSessionWindowAggregateProcessorTest {
 
     private static final long GAP_MS = 5 * 60 * 1000L;
     private static final String STORE_NAME = "session-store";
 
     private final String threadId = Thread.currentThread().getName();
-    private final ToInternal toInternal = new ToInternal();
     private final Initializer<Long> initializer = () -> 0L;
     private final Aggregator<String, String, Long> aggregator = (aggKey, 
value, aggregate) -> aggregate + 1;
     private final Merger<String, Long> sessionMerger = (aggKey, aggOne, 
aggTwo) -> aggOne + aggTwo;
     private final KStreamSessionWindowAggregate<String, String, Long> 
sessionAggregator =
         new KStreamSessionWindowAggregate<>(
-            SessionWindows.with(ofMillis(GAP_MS)),
+            SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)),
             STORE_NAME,
             initializer,
             aggregator,
             sessionMerger);
 
     private final List<KeyValueTimestamp<Windowed<String>, Change<Long>>> 
results = new ArrayList<>();
-    private final org.apache.kafka.streams.processor.Processor<String, String> 
processor = sessionAggregator.get();
+    private final Processor<String, String, Windowed<String>, Change<Long>> 
processor = sessionAggregator.get();
     private SessionStore<String, Long> sessionStore;
-    private InternalMockProcessorContext context;
+    private InternalMockProcessorContext<Windowed<String>, Change<Long>> 
context;
     private final Metrics metrics = new Metrics();
 
     @Before
@@ -103,7 +101,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     private void setup(final boolean enableCache) {
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, new MockTime());
-        context = new InternalMockProcessorContext<Object, Object>(
+        context = new InternalMockProcessorContext<Windowed<String>, 
Change<Long>>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.String(),
@@ -113,11 +111,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
             new ThreadCache(new LogContext("testCache "), 100000, 
streamsMetrics),
             Time.SYSTEM
         ) {
-            @SuppressWarnings("unchecked")
             @Override
-            public void forward(final Object key, final Object value, final To 
to) {
-                toInternal.update(to);
-                results.add(new KeyValueTimestamp<>((Windowed<String>) key, 
(Change<Long>) value, toInternal.timestamp()));
+            public <K extends Windowed<String>, V extends Change<Long>> void 
forward(final Record<K, V> record) {
+                results.add(new KeyValueTimestamp<>(record.key(), 
record.value(), record.timestamp()));
             }
         };
         TaskMetrics.droppedRecordsSensor(threadId, 
context.taskId().toString(), streamsMetrics);
@@ -152,10 +148,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     @Test
     public void shouldCreateSingleSessionWhenWithinGap() {
-        context.setTime(0);
-        processor.process("john", "first");
-        context.setTime(500);
-        processor.process("john", "second");
+        processor.process(new Record<>("john", "first", 0L));
+        processor.process(new Record<>("john", "second", 500L));
 
         try (final KeyValueIterator<Windowed<String>, Long> values =
                  sessionStore.findSessions("john", 0, 2000)) {
@@ -166,20 +160,17 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     @Test
     public void shouldMergeSessions() {
-        context.setTime(0);
         final String sessionId = "mel";
-        processor.process(sessionId, "first");
+        processor.process(new Record<>(sessionId, "first", 0L));
         assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());
 
         // move time beyond gap
-        context.setTime(GAP_MS + 1);
-        processor.process(sessionId, "second");
+        processor.process(new Record<>(sessionId, "second", GAP_MS + 1));
         assertTrue(sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 
1).hasNext());
         // should still exist as not within gap
         assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());
         // move time back
-        context.setTime(GAP_MS / 2);
-        processor.process(sessionId, "third");
+        processor.process(new Record<>(sessionId, "third", GAP_MS / 2));
 
         try (final KeyValueIterator<Windowed<String>, Long> iterator =
                  sessionStore.findSessions(sessionId, 0, GAP_MS + 1)) {
@@ -192,9 +183,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     @Test
     public void shouldUpdateSessionIfTheSameTime() {
-        context.setTime(0);
-        processor.process("mel", "first");
-        processor.process("mel", "second");
+        processor.process(new Record<>("mel", "first", 0L));
+        processor.process(new Record<>("mel", "second", 0L));
         try (final KeyValueIterator<Windowed<String>, Long> iterator =
                  sessionStore.findSessions("mel", 0, 0)) {
             assertEquals(Long.valueOf(2L), iterator.next().value);
@@ -206,15 +196,14 @@ public class KStreamSessionWindowAggregateProcessorTest {
     public void 
shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
         final String sessionId = "mel";
         long time = 0;
-        context.setTime(time);
-        processor.process(sessionId, "first");
-        context.setTime(time += GAP_MS + 1);
-        processor.process(sessionId, "second");
-        processor.process(sessionId, "second");
-        context.setTime(time += GAP_MS + 1);
-        processor.process(sessionId, "third");
-        processor.process(sessionId, "third");
-        processor.process(sessionId, "third");
+        processor.process(new Record<>(sessionId, "first", time));
+        final long time1 = time += GAP_MS + 1;
+        processor.process(new Record<>(sessionId, "second", time1));
+        processor.process(new Record<>(sessionId, "second", time1));
+        final long time2 = time += GAP_MS + 1;
+        processor.process(new Record<>(sessionId, "third", time2));
+        processor.process(new Record<>(sessionId, "third", time2));
+        processor.process(new Record<>(sessionId, "third", time2));
 
         sessionStore.flush();
         assertEquals(
@@ -239,8 +228,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     @Test
     public void shouldRemoveMergedSessionsFromStateStore() {
-        context.setTime(0);
-        processor.process("a", "1");
+        processor.process(new Record<>("a", "1", 0L));
 
         // first ensure it is in the store
         try (final KeyValueIterator<Windowed<String>, Long> a1 =
@@ -249,8 +237,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         }
 
 
-        context.setTime(100);
-        processor.process("a", "2");
+        processor.process(new Record<>("a", "2", 100L));
         // a1 from above should have been removed
         // should have merged session in store
         try (final KeyValueIterator<Windowed<String>, Long> a2 =
@@ -262,19 +249,15 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     @Test
     public void shouldHandleMultipleSessionsAndMerging() {
-        context.setTime(0);
-        processor.process("a", "1");
-        processor.process("b", "1");
-        processor.process("c", "1");
-        processor.process("d", "1");
-        context.setTime(GAP_MS / 2);
-        processor.process("d", "2");
-        context.setTime(GAP_MS + 1);
-        processor.process("a", "2");
-        processor.process("b", "2");
-        context.setTime(GAP_MS + 1 + GAP_MS / 2);
-        processor.process("a", "3");
-        processor.process("c", "3");
+        processor.process(new Record<>("a", "1", 0L));
+        processor.process(new Record<>("b", "1", 0L));
+        processor.process(new Record<>("c", "1", 0L));
+        processor.process(new Record<>("d", "1", 0L));
+        processor.process(new Record<>("d", "2", GAP_MS / 2));
+        processor.process(new Record<>("a", "2", GAP_MS + 1));
+        processor.process(new Record<>("b", "2", GAP_MS + 1));
+        processor.process(new Record<>("a", "3", GAP_MS + 1 + GAP_MS / 2));
+        processor.process(new Record<>("c", "3", GAP_MS + 1 + GAP_MS / 2));
 
         sessionStore.flush();
 
@@ -317,11 +300,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
     public void shouldGetAggregatedValuesFromValueGetter() {
         final KTableValueGetter<Windowed<String>, Long> getter = 
sessionAggregator.view().get();
         getter.init(context);
-        context.setTime(0);
-        processor.process("a", "1");
-        context.setTime(GAP_MS + 1);
-        processor.process("a", "1");
-        processor.process("a", "2");
+        processor.process(new Record<>("a", "1", 0L));
+        processor.process(new Record<>("a", "1", GAP_MS + 1));
+        processor.process(new Record<>("a", "2", GAP_MS + 1));
         final long t0 = getter.get(new Windowed<>("a", new SessionWindow(0, 
0))).value();
         final long t1 = getter.get(new Windowed<>("a", new 
SessionWindow(GAP_MS + 1, GAP_MS + 1))).value();
         assertEquals(1L, t0);
@@ -333,10 +314,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
         initStore(false);
         processor.init(context);
 
-        context.setTime(0);
-        processor.process("a", "1");
-        processor.process("b", "1");
-        processor.process("c", "1");
+        processor.process(new Record<>("a", "1", 0L));
+        processor.process(new Record<>("b", "1", 0L));
+        processor.process(new Record<>("c", "1", 0L));
 
         assertEquals(
             Arrays.asList(
@@ -362,10 +342,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
         initStore(false);
         processor.init(context);
 
-        context.setTime(0);
-        processor.process("a", "1");
-        context.setTime(5);
-        processor.process("a", "1");
+        processor.process(new Record<>("a", "1", 0L));
+        processor.process(new Record<>("a", "1", 5L));
         assertEquals(
             Arrays.asList(
                 new KeyValueTimestamp<>(
@@ -396,14 +374,14 @@ public class KStreamSessionWindowAggregateProcessorTest {
         try (final LogCaptureAppender appender =
                  
LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) {
 
-            processor.process(null, "1");
+            processor.process(new Record<>(null, "1", 0L));
 
             assertThat(
                 appender.getEvents().stream()
                     .filter(e -> e.getLevel().equals("WARN"))
                     .map(Event::getMessage)
                     .collect(Collectors.toList()),
-                hasItem("Skipping record due to null key. value=[1] 
topic=[topic] partition=[-3] offset=[-2]")
+                hasItem("Skipping record due to null key. topic=[topic] 
partition=[-3] offset=[-2]")
             );
         }
 
@@ -416,8 +394,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
     @Test
     public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
         setup(false);
-        final org.apache.kafka.streams.processor.Processor<String, String> 
processor = new KStreamSessionWindowAggregate<>(
-            SessionWindows.with(ofMillis(10L)).grace(ofMillis(0L)),
+        final Processor<String, String, Windowed<String>, Change<Long>> 
processor = new KStreamSessionWindowAggregate<>(
+            SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), 
ofMillis(0L)),
             STORE_NAME,
             initializer,
             aggregator,
@@ -427,27 +405,27 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
         // dummy record to establish stream time = 0
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, 
"topic", new RecordHeaders()));
-        processor.process("dummy", "dummy");
+        processor.process(new Record<>("dummy", "dummy", 0L));
 
         // record arrives on time, should not be skipped
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, 
"topic", new RecordHeaders()));
-        processor.process("OnTime1", "1");
+        processor.process(new Record<>("OnTime1", "1", 0L));
 
         // dummy record to advance stream time = 11, 10 for gap time plus 1 to 
place outside window
         context.setRecordContext(new ProcessorRecordContext(11, -2, -3, 
"topic", new RecordHeaders()));
-        processor.process("dummy", "dummy");
+        processor.process(new Record<>("dummy", "dummy", 11L));
 
         try (final LogCaptureAppender appender =
                  
LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) {
 
             // record is late
             context.setRecordContext(new ProcessorRecordContext(0, -2, -3, 
"topic", new RecordHeaders()));
-            processor.process("Late1", "1");
+            processor.process(new Record<>("Late1", "1", 0L));
 
             assertThat(
                 appender.getMessages(),
                 hasItem("Skipping record for expired window." +
-                    " key=[Late1] topic=[topic] partition=[-3] offset=[-2] 
timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]")
+                    " topic=[topic] partition=[-3] offset=[-2] timestamp=[0] 
window=[0,0] expiration=[1] streamTime=[11]")
             );
         }
 
@@ -481,8 +459,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
     @Test
     public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
         setup(false);
-        final org.apache.kafka.streams.processor.Processor<String, String> 
processor = new KStreamSessionWindowAggregate<>(
-            SessionWindows.with(ofMillis(10L)).grace(ofMillis(1L)),
+        final Processor<String, String, Windowed<String>, Change<Long>> 
processor = new KStreamSessionWindowAggregate<>(
+            SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), 
ofMillis(1L)),
             STORE_NAME,
             initializer,
             aggregator,
@@ -495,32 +473,32 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
             // dummy record to establish stream time = 0
             context.setRecordContext(new ProcessorRecordContext(0, -2, -3, 
"topic", new RecordHeaders()));
-            processor.process("dummy", "dummy");
+            processor.process(new Record<>("dummy", "dummy", 0L));
 
             // record arrives on time, should not be skipped
             context.setRecordContext(new ProcessorRecordContext(0, -2, -3, 
"topic", new RecordHeaders()));
-            processor.process("OnTime1", "1");
+            processor.process(new Record<>("OnTime1", "1", 0L));
 
             // dummy record to advance stream time = 11, 10 for gap time plus 
1 to place at edge of window
             context.setRecordContext(new ProcessorRecordContext(11, -2, -3, 
"topic", new RecordHeaders()));
-            processor.process("dummy", "dummy");
+            processor.process(new Record<>("dummy", "dummy", 11L));
 
             // delayed record arrives on time, should not be skipped
             context.setRecordContext(new ProcessorRecordContext(0, -2, -3, 
"topic", new RecordHeaders()));
-            processor.process("OnTime2", "1");
+            processor.process(new Record<>("OnTime2", "1", 0L));
 
             // dummy record to advance stream time = 12, 10 for gap time plus 
2 to place outside window
             context.setRecordContext(new ProcessorRecordContext(12, -2, -3, 
"topic", new RecordHeaders()));
-            processor.process("dummy", "dummy");
+            processor.process(new Record<>("dummy", "dummy", 12L));
 
             // delayed record arrives late
             context.setRecordContext(new ProcessorRecordContext(0, -2, -3, 
"topic", new RecordHeaders()));
-            processor.process("Late1", "1");
+            processor.process(new Record<>("Late1", "1", 0L));
 
             assertThat(
                 appender.getMessages(),
                 hasItem("Skipping record for expired window." +
-                    " key=[Late1] topic=[topic] partition=[-3] offset=[-2] 
timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]")
+                    " topic=[topic] partition=[-3] offset=[-2] timestamp=[0] 
window=[0,0] expiration=[1] streamTime=[12]")
             );
         }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 798159d..38c3fa7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -726,7 +726,7 @@ public class KStreamSlidingWindowAggregateTest {
                     .filter(e -> e.getLevel().equals("WARN"))
                     .map(Event::getMessage)
                     .collect(Collectors.toList()),
-                hasItem("Skipping record due to null key or value. value=[1] 
topic=[topic] partition=[0] offset=[0]")
+                hasItem("Skipping record due to null key or value. 
topic=[topic] partition=[0] offset=[0]")
             );
         }
     }
@@ -772,19 +772,19 @@ public class KStreamSlidingWindowAggregateTest {
 
             assertThat(appender.getMessages(), hasItems(
                     // left window for k@100
-                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[1] timestamp=[100] window=[90,100] expiration=[110] 
streamTime=[200]",
+                    "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[1] timestamp=[100] window=[90,100] expiration=[110] 
streamTime=[200]",
                     // left window for k@101
-                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[2] timestamp=[101] window=[91,101] expiration=[110] 
streamTime=[200]",
+                    "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[2] timestamp=[101] window=[91,101] expiration=[110] 
streamTime=[200]",
                     // left window for k@102
-                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[3] timestamp=[102] window=[92,102] expiration=[110] 
streamTime=[200]",
+                    "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[3] timestamp=[102] window=[92,102] expiration=[110] 
streamTime=[200]",
                     // left window for k@103
-                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[4] timestamp=[103] window=[93,103] expiration=[110] 
streamTime=[200]",
+                    "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[4] timestamp=[103] window=[93,103] expiration=[110] 
streamTime=[200]",
                     // left window for k@104
-                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[5] timestamp=[104] window=[94,104] expiration=[110] 
streamTime=[200]",
+                    "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[5] timestamp=[104] window=[94,104] expiration=[110] 
streamTime=[200]",
                     // left window for k@105
-                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[6] timestamp=[105] window=[95,105] expiration=[110] 
streamTime=[200]",
+                    "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[6] timestamp=[105] window=[95,105] expiration=[110] 
streamTime=[200]",
                     // left window for k@15
-                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] 
streamTime=[200]"
+                    "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] 
streamTime=[200]"
             ));
             final TestOutputTopic<Windowed<String>, String> outputTopic =
                     driver.createOutputTopic("output", new 
TimeWindowedDeserializer<>(new StringDeserializer(), 10L), new 
StringDeserializer());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index b7759bb..df40c94 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -288,7 +288,7 @@ public class KStreamWindowAggregateTest {
                 driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
             inputTopic.pipeInput(null, "1");
 
-            assertThat(appender.getMessages(), hasItem("Skipping record due to 
null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
+            assertThat(appender.getMessages(), hasItem("Skipping record due to 
null key. topic=[topic] partition=[0] offset=[0]"));
         }
     }
 
@@ -335,13 +335,13 @@ public class KStreamWindowAggregateTest {
             );
 
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] 
streamTime=[100]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] 
streamTime=[100]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] 
streamTime=[100]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] 
streamTime=[100]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] 
streamTime=[100]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] 
streamTime=[100]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] 
streamTime=[100]"
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] 
streamTime=[100]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] 
streamTime=[100]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] 
streamTime=[100]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] 
streamTime=[100]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] 
streamTime=[100]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] 
streamTime=[100]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] 
streamTime=[100]"
             ));
 
             final TestOutputTopic<String, String> outputTopic =
@@ -389,13 +389,13 @@ public class KStreamWindowAggregateTest {
             assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375));
 
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] 
streamTime=[200]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] 
streamTime=[200]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] 
streamTime=[200]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] 
streamTime=[200]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] 
streamTime=[200]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] 
streamTime=[200]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] 
streamTime=[200]"
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] 
streamTime=[200]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] 
streamTime=[200]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] 
streamTime=[200]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] 
streamTime=[200]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] 
streamTime=[200]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] 
streamTime=[200]",
+                "Skipping record for expired window. topic=[topic] 
partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] 
streamTime=[200]"
             ));
 
             final TestOutputTopic<String, String> outputTopic =
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index c60bcf4..da71149 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.junit.Test;
 
@@ -35,9 +35,10 @@ public class SessionCacheFlushListenerTest {
         context.setCurrentNode(null);
         context.setCurrentNode(null);
         context.forward(
-            new Windowed<>("key", new SessionWindow(21L, 73L)),
-            new Change<>("newValue", "oldValue"),
-            To.all().withTimestamp(73L));
+            new Record<>(
+                new Windowed<>("key", new SessionWindow(21L, 73L)),
+                new Change<>("newValue", "oldValue"),
+                73L));
         expectLastCall();
         replay(context);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
index e99c684..f0a963d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
@@ -17,9 +17,9 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.junit.Test;
 
@@ -57,25 +57,32 @@ public class SessionTupleForwarderTest {
 
     private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final 
boolean sendOldValued) {
         final WrappedStateStore<StateStore, String, String> store = 
mock(WrappedStateStore.class);
-        final ProcessorContext context = mock(ProcessorContext.class);
+        final ProcessorContext<Windowed<String>, Change<String>> context = 
mock(
+            ProcessorContext.class);
 
         expect(store.setFlushListener(null, sendOldValued)).andReturn(false);
         if (sendOldValued) {
             context.forward(
-                new Windowed<>("key", new SessionWindow(21L, 42L)),
-                new Change<>("value", "oldValue"),
-                To.all().withTimestamp(42L));
+                new Record<>(
+                    new Windowed<>("key", new SessionWindow(21L, 42L)),
+                    new Change<>("value", "oldValue"),
+                    42L));
         } else {
             context.forward(
-                new Windowed<>("key", new SessionWindow(21L, 42L)),
-                new Change<>("value", null),
-                To.all().withTimestamp(42L));
+                new Record<>(
+                    new Windowed<>("key", new SessionWindow(21L, 42L)),
+                    new Change<>("value", null),
+                    42L));
         }
         expectLastCall();
         replay(store, context);
 
-        new SessionTupleForwarder<>(store, context, null, sendOldValued)
-            .maybeForward(new Windowed<>("key", new SessionWindow(21L, 42L)), 
"value", "oldValue");
+        new SessionTupleForwarder<>(store, context, null,
+            sendOldValued)
+            .maybeForward(
+                new Windowed<>("key", new SessionWindow(21L, 42L)),
+                "value",
+                "oldValue");
 
         verify(store, context);
     }
@@ -83,7 +90,7 @@ public class SessionTupleForwarderTest {
     @Test
     public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
         final WrappedStateStore<StateStore, String, String> store = 
mock(WrappedStateStore.class);
-        final ProcessorContext context = mock(ProcessorContext.class);
+        final ProcessorContext<Windowed<String>, Change<String>> context = 
mock(ProcessorContext.class);
 
         expect(store.setFlushListener(null, false)).andReturn(true);
         replay(store, context);

Reply via email to