This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 35e59427434 Revert "KAFKA-13722: remove usage of old ProcessorContext  
(#18292)" (#20398)
35e59427434 is described below

commit 35e594274342eb10035eeec39bf82620637a6825
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Mon Aug 25 03:43:39 2025 -0700

    Revert "KAFKA-13722: remove usage of old ProcessorContext  (#18292)" 
(#20398)
    
    This reverts commit f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, Eduwer Camacaro 
<eduw...@gmail.com>, Mickael Maison <mickael.mai...@gmail.com>,
---
 .../kstream/internals/KTableRepartitionMap.java       |  2 +-
 .../internals/GlobalProcessorContextImpl.java         |  2 +-
 .../processor/internals/GlobalStateUpdateTask.java    |  4 ++--
 .../processor/internals/ProcessorContextImpl.java     |  8 ++++----
 .../streams/processor/internals/ProcessorNode.java    | 10 +++++-----
 .../kafka/streams/processor/internals/SinkNode.java   |  6 +++---
 .../kafka/streams/processor/internals/StreamTask.java |  4 ++--
 .../streams/state/internals/CachingKeyValueStore.java | 10 +++++-----
 .../streams/state/internals/CachingSessionStore.java  | 10 +++++-----
 .../streams/state/internals/CachingWindowStore.java   | 10 +++++-----
 .../internals/ChangeLoggingKeyValueBytesStore.java    | 10 +++++-----
 .../internals/ChangeLoggingListValueBytesStore.java   |  4 ++--
 .../internals/ChangeLoggingSessionBytesStore.java     |  4 ++--
 .../ChangeLoggingTimestampedKeyValueBytesStore.java   |  6 +++---
 .../ChangeLoggingTimestampedWindowBytesStore.java     |  2 +-
 .../internals/ChangeLoggingWindowBytesStore.java      |  2 +-
 .../streams/state/internals/MeteredKeyValueStore.java |  2 +-
 .../streams/state/internals/MeteredSessionStore.java  |  2 +-
 .../streams/state/internals/MeteredWindowStore.java   |  2 +-
 .../internals/TimeOrderedCachingWindowStore.java      | 19 +++++++++----------
 .../processor/internals/ProcessorContextImplTest.java |  4 ----
 .../processor/internals/ProcessorNodeTest.java        |  2 +-
 .../internals/ChangeLoggingSessionBytesStoreTest.java |  5 -----
 .../ChangeLoggingTimestampedWindowBytesStoreTest.java | 13 ++++---------
 .../internals/ChangeLoggingWindowBytesStoreTest.java  | 13 ++++---------
 .../state/internals/MeteredWindowStoreTest.java       |  4 ++--
 .../TimeOrderedCachingPersistentWindowStoreTest.java  |  6 +++---
 .../state/internals/TimeOrderedWindowStoreTest.java   |  6 +++---
 28 files changed, 76 insertions(+), 96 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index a686692b40a..567d9a2947f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -213,7 +213,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements 
KTableRepartitionMapS
         private ValueAndTimestamp<KeyValue<? extends K1, ? extends V1>> 
mapValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
             return ValueAndTimestamp.make(
                 mapper.apply(key, getValueOrNull(valueAndTimestamp)),
-                valueAndTimestamp == null ? 
context.recordContext().timestamp() : valueAndTimestamp.timestamp()
+                valueAndTimestamp == null ? context.timestamp() : 
valueAndTimestamp.timestamp()
             );
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 01b694863fd..828ae3a0a79 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -84,7 +84,7 @@ public class GlobalProcessorContextImpl extends 
AbstractProcessorContext<Object,
 
     @Override
     public <KIn, VIn> void forward(final KIn key, final VIn value) {
-        forward(new Record<>(key, value, recordContext().timestamp(), 
headers()));
+        forward(new Record<>(key, value, timestamp(), headers()));
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index bfab9a770f6..1d8f18d326d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -120,8 +120,8 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
             final Record<Object, Object> toProcess = new Record<>(
                 deserialized.key(),
                 deserialized.value(),
-                processorContext.recordContext().timestamp(),
-                processorContext.recordContext().headers()
+                processorContext.timestamp(),
+                processorContext.headers()
             );
             ((SourceNode<Object, Object>) 
sourceNodeAndDeserializer.sourceNode()).process(toProcess);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 93961daf97b..b5e0515522a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -190,7 +190,7 @@ public final class ProcessorContextImpl extends 
AbstractProcessorContext<Object,
         final Record<K, V> toForward = new Record<>(
             key,
             value,
-            recordContext.timestamp(),
+            timestamp(),
             headers()
         );
         forward(toForward);
@@ -204,7 +204,7 @@ public final class ProcessorContextImpl extends 
AbstractProcessorContext<Object,
         final Record<K, V> toForward = new Record<>(
             key,
             value,
-            toInternal.hasTimestamp() ? toInternal.timestamp() : 
recordContext.timestamp(),
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
             headers()
         );
         forward(toForward, toInternal.child());
@@ -250,11 +250,11 @@ public final class ProcessorContextImpl extends 
AbstractProcessorContext<Object,
             // old API processors wouldn't see the timestamps or headers of 
upstream
             // new API processors. But then again, from the perspective of 
those old-API
             // processors, even consulting the timestamp or headers when the 
record context
-            // is undefined is itself not well-defined. Plus, I don't think we 
need to worry
+            // is undefined is itself not well defined. Plus, I don't think we 
need to worry
             // too much about heterogeneous applications, in which the 
upstream processor is
             // implementing the new API and the downstream one is implementing 
the old API.
             // So, this seems like a fine compromise for now.
-            if (recordContext != null && (record.timestamp() != 
recordContext.timestamp() || record.headers() != recordContext.headers())) {
+            if (recordContext != null && (record.timestamp() != timestamp() || 
record.headers() != headers())) {
                 recordContext = new ProcessorRecordContext(
                     record.timestamp(),
                     recordContext.offset(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 1dddc55ca3c..62173e807fd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -209,13 +209,13 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
             // (instead of `RuntimeException`) to work well with those 
languages
             final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
                 null, // only required to pass for 
DeserializationExceptionHandler
-                internalProcessorContext.recordContext().topic(),
-                internalProcessorContext.recordContext().partition(),
-                internalProcessorContext.recordContext().offset(),
-                internalProcessorContext.recordContext().headers(),
+                internalProcessorContext.topic(),
+                internalProcessorContext.partition(),
+                internalProcessorContext.offset(),
+                internalProcessorContext.headers(),
                 internalProcessorContext.currentNode().name(),
                 internalProcessorContext.taskId(),
-                internalProcessorContext.recordContext().timestamp(),
+                internalProcessorContext.timestamp(),
                 internalProcessorContext.recordContext().sourceRawKey(),
                 internalProcessorContext.recordContext().sourceRawValue()
             );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index d32cf2523e0..8cb82a6bfa5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -85,9 +85,9 @@ public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, 
VIn, Void, Void> {
         final ProcessorRecordContext contextForExtraction =
             new ProcessorRecordContext(
                 timestamp,
-                context.recordContext().offset(),
-                context.recordContext().partition(),
-                context.recordContext().topic(),
+                context.offset(),
+                context.partition(),
+                context.topic(),
                 record.headers()
             );
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 82e9c8d7fb1..8c3b6cc506b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -866,8 +866,8 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         final Record<Object, Object> toProcess = new Record<>(
             record.key(),
             record.value(),
-            processorContext.recordContext().timestamp(),
-            processorContext.recordContext().headers()
+            processorContext.timestamp(),
+            processorContext.headers()
         );
         maybeMeasureLatency(() -> currNode.process(toProcess), time, 
processLatencySensor);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 83343d04494..389cf688f4a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -272,12 +272,12 @@ public class CachingKeyValueStore
                 key,
                 new LRUCacheEntry(
                     value,
-                    internalContext.recordContext().headers(),
+                    internalContext.headers(),
                     true,
-                    internalContext.recordContext().offset(),
-                    internalContext.recordContext().timestamp(),
-                    internalContext.recordContext().partition(),
-                    internalContext.recordContext().topic(),
+                    internalContext.offset(),
+                    internalContext.timestamp(),
+                    internalContext.partition(),
+                    internalContext.topic(),
                     internalContext.recordContext().sourceRawKey(),
                     internalContext.recordContext().sourceRawValue()
                 )
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index ec0c1bd077d..7bb615ea4ee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -135,12 +135,12 @@ class CachingSessionStore
         final LRUCacheEntry entry =
             new LRUCacheEntry(
                 value,
-                internalContext.recordContext().headers(),
+                internalContext.headers(),
                 true,
-                internalContext.recordContext().offset(),
-                internalContext.recordContext().timestamp(),
-                internalContext.recordContext().partition(),
-                internalContext.recordContext().topic(),
+                internalContext.offset(),
+                internalContext.timestamp(),
+                internalContext.partition(),
+                internalContext.topic(),
                 internalContext.recordContext().sourceRawKey(),
                 internalContext.recordContext().sourceRawValue()
             );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 0432c1726cb..38d98b58d7e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -153,12 +153,12 @@ class CachingWindowStore
         final LRUCacheEntry entry =
             new LRUCacheEntry(
                 value,
-                internalContext.recordContext().headers(),
+                internalContext.headers(),
                 true,
-                internalContext.recordContext().offset(),
-                internalContext.recordContext().timestamp(),
-                internalContext.recordContext().partition(),
-                internalContext.recordContext().topic(),
+                internalContext.offset(),
+                internalContext.timestamp(),
+                internalContext.partition(),
+                internalContext.topic(),
                 internalContext.recordContext().sourceRawKey(),
                 internalContext.recordContext().sourceRawValue()
             );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 9c1c3f9ae76..b21b102cdfc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -52,7 +52,7 @@ public class ChangeLoggingKeyValueBytesStore
         if (wrapped() instanceof MemoryLRUCache) {
             ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
                 // pass null to indicate removal
-                log(key, null, internalContext.recordContext().timestamp());
+                log(key, null, internalContext.timestamp());
             });
         }
     }
@@ -66,7 +66,7 @@ public class ChangeLoggingKeyValueBytesStore
     public void put(final Bytes key,
                     final byte[] value) {
         wrapped().put(key, value);
-        log(key, value, internalContext.recordContext().timestamp());
+        log(key, value, internalContext.timestamp());
     }
 
     @Override
@@ -75,7 +75,7 @@ public class ChangeLoggingKeyValueBytesStore
         final byte[] previous = wrapped().putIfAbsent(key, value);
         if (previous == null) {
             // then it was absent
-            log(key, value, internalContext.recordContext().timestamp());
+            log(key, value, internalContext.timestamp());
         }
         return previous;
     }
@@ -84,7 +84,7 @@ public class ChangeLoggingKeyValueBytesStore
     public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         wrapped().putAll(entries);
         for (final KeyValue<Bytes, byte[]> entry : entries) {
-            log(entry.key, entry.value, 
internalContext.recordContext().timestamp());
+            log(entry.key, entry.value, internalContext.timestamp());
         }
     }
 
@@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStore
     @Override
     public byte[] delete(final Bytes key) {
         final byte[] oldValue = wrapped().delete(key);
-        log(key, null, internalContext.recordContext().timestamp());
+        log(key, null, internalContext.timestamp());
         return oldValue;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
index ba43ba30b17..9070fc8da5f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
@@ -32,9 +32,9 @@ public class ChangeLoggingListValueBytesStore extends 
ChangeLoggingKeyValueBytes
         // we need to log the full new list and thus call get() on the inner 
store below
         // if the value is a tombstone, we delete the whole list and thus can 
save the get call
         if (value == null) {
-            log(key, null, internalContext.recordContext().timestamp());
+            log(key, null, internalContext.timestamp());
         } else {
-            log(key, wrapped().get(key), 
internalContext.recordContext().timestamp());
+            log(key, wrapped().get(key), internalContext.timestamp());
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 248889211c3..06097aa7a71 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -73,13 +73,13 @@ public class ChangeLoggingSessionBytesStore
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
         wrapped().remove(sessionKey);
-        internalContext.logChange(name(), 
SessionKeySchema.toBinary(sessionKey), null, 
internalContext.recordContext().timestamp(), wrapped().getPosition());
+        internalContext.logChange(name(), 
SessionKeySchema.toBinary(sessionKey), null, internalContext.timestamp(), 
wrapped().getPosition());
     }
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
         wrapped().put(sessionKey, aggregate);
-        internalContext.logChange(name(), 
SessionKeySchema.toBinary(sessionKey), aggregate, 
internalContext.recordContext().timestamp(), wrapped().getPosition());
+        internalContext.logChange(name(), 
SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.timestamp(), 
wrapped().getPosition());
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
index b95ede1bba8..916c9547184 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
@@ -35,7 +35,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore 
extends ChangeLoggingKey
     public void put(final Bytes key,
                     final byte[] valueAndTimestamp) {
         wrapped().put(key, valueAndTimestamp);
-        log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? 
internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
+        log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? 
internalContext.timestamp() : timestamp(valueAndTimestamp));
     }
 
     @Override
@@ -44,7 +44,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore 
extends ChangeLoggingKey
         final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp);
         if (previous == null) {
             // then it was absent
-            log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? 
internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
+            log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? 
internalContext.timestamp() : timestamp(valueAndTimestamp));
         }
         return previous;
     }
@@ -54,7 +54,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore 
extends ChangeLoggingKey
         wrapped().putAll(entries);
         for (final KeyValue<Bytes, byte[]> entry : entries) {
             final byte[] valueAndTimestamp = entry.value;
-            log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == 
null ? internalContext.recordContext().timestamp() : 
timestamp(valueAndTimestamp));
+            log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == 
null ? internalContext.timestamp() : timestamp(valueAndTimestamp));
         }
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
index 5ae334f95cc..2bf87f9d2a8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
@@ -36,7 +36,7 @@ class ChangeLoggingTimestampedWindowBytesStore extends 
ChangeLoggingWindowBytesS
             name(),
             key,
             rawValue(valueAndTimestamp),
-            valueAndTimestamp != null ? timestamp(valueAndTimestamp) : 
internalContext.recordContext().timestamp(),
+            valueAndTimestamp != null ? timestamp(valueAndTimestamp) : 
internalContext.timestamp(),
             wrapped().getPosition()
         );
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 0d0f378af75..d5857d0456e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -129,7 +129,7 @@ class ChangeLoggingWindowBytesStore
     }
 
     void log(final Bytes key, final byte[] value) {
-        internalContext.logChange(name(), key, value, 
internalContext.recordContext().timestamp(), wrapped().getPosition());
+        internalContext.logChange(name(), key, value, 
internalContext.timestamp(), wrapped().getPosition());
     }
 
     private int maybeUpdateSeqnumForDups() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 0962033b7ef..8678111f989 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -417,7 +417,7 @@ public class MeteredKeyValueStore<K, V>
         // In that case, we _can't_ get the current timestamp, so we don't 
record anything.
         if (e2eLatencySensor.shouldRecord() && internalContext != null) {
             final long currentTime = time.milliseconds();
-            final long e2eLatency =  currentTime - 
internalContext.recordContext().timestamp();
+            final long e2eLatency =  currentTime - internalContext.timestamp();
             e2eLatencySensor.record(e2eLatency, currentTime);
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 234ac1220f7..546959a9269 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -494,7 +494,7 @@ public class MeteredSessionStore<K, V>
         // In that case, we _can't_ get the current timestamp, so we don't 
record anything.
         if (e2eLatencySensor.shouldRecord() && internalContext != null) {
             final long currentTime = time.milliseconds();
-            final long e2eLatency =  currentTime - 
internalContext.recordContext().timestamp();
+            final long e2eLatency =  currentTime - internalContext.timestamp();
             e2eLatencySensor.record(e2eLatency, currentTime);
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2da877453ce..783c16b2f4f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -510,7 +510,7 @@ public class MeteredWindowStore<K, V>
         // In that case, we _can't_ get the current timestamp, so we don't 
record anything.
         if (e2eLatencySensor.shouldRecord() && internalContext != null) {
             final long currentTime = time.milliseconds();
-            final long e2eLatency =  currentTime - 
internalContext.recordContext().timestamp();
+            final long e2eLatency =  currentTime - internalContext.timestamp();
             e2eLatencySensor.record(e2eLatency, currentTime);
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
index 646cbf2ca35..15a728ba0d0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
@@ -97,13 +97,12 @@ class TimeOrderedCachingWindowStore
         hasIndex = timeOrderedWindowStore.hasIndex();
     }
 
-    @SuppressWarnings("unchecked")
     private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore 
wrapped) {
         if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
             return (RocksDBTimeOrderedWindowStore) wrapped;
         }
         if (wrapped instanceof WrappedStateStore) {
-            return getWrappedStore(((WrappedStateStore<?, Bytes, byte[]>) 
wrapped).wrapped());
+            return getWrappedStore(((WrappedStateStore<?, ?, ?>) 
wrapped).wrapped());
         }
         return null;
     }
@@ -256,12 +255,12 @@ class TimeOrderedCachingWindowStore
         final LRUCacheEntry entry =
             new LRUCacheEntry(
                 value,
-                internalContext.recordContext().headers(),
+                internalContext.headers(),
                 true,
-                internalContext.recordContext().offset(),
-                internalContext.recordContext().timestamp(),
-                internalContext.recordContext().partition(),
-                internalContext.recordContext().topic(),
+                internalContext.offset(),
+                internalContext.timestamp(),
+                internalContext.partition(),
+                internalContext.topic(),
                 internalContext.recordContext().sourceRawKey(),
                 internalContext.recordContext().sourceRawValue()
             );
@@ -278,9 +277,9 @@ class TimeOrderedCachingWindowStore
                     new byte[0],
                     new RecordHeaders(),
                     true,
-                    internalContext.recordContext().offset(),
-                    internalContext.recordContext().timestamp(),
-                    internalContext.recordContext().partition(),
+                    internalContext.offset(),
+                    internalContext.timestamp(),
+                    internalContext.partition(),
                     "",
                     internalContext.recordContext().sourceRawKey(),
                     internalContext.recordContext().sourceRawValue()
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 9410ca5a978..42c466c2120 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -593,8 +593,6 @@ public class ProcessorContextImplTest {
     @Test
     public void shouldThrowUnsupportedOperationExceptionOnForward() {
         context = getStandbyContext();
-        context.recordContext = mock(ProcessorRecordContext.class);
-
         assertThrows(
             UnsupportedOperationException.class,
             () -> context.forward("key", "value")
@@ -604,8 +602,6 @@ public class ProcessorContextImplTest {
     @Test
     public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
         context = getStandbyContext();
-        context.recordContext = mock(ProcessorRecordContext.class);
-
         assertThrows(
             UnsupportedOperationException.class,
             () -> context.forward("key", "value", To.child("child-name"))
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 5341cd25f0d..ce5fddb870a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -362,7 +362,7 @@ public class ProcessorNodeTest {
             assertEquals(internalProcessorContext.offset(), context.offset());
             assertEquals(internalProcessorContext.currentNode().name(), 
context.processorNodeId());
             assertEquals(internalProcessorContext.taskId(), context.taskId());
-            assertEquals(internalProcessorContext.recordContext().timestamp(), 
context.timestamp());
+            assertEquals(internalProcessorContext.timestamp(), 
context.timestamp());
             
assertEquals(internalProcessorContext.recordContext().sourceRawKey(), 
context.sourceRawKey());
             
assertEquals(internalProcessorContext.recordContext().sourceRawValue(), 
context.sourceRawValue());
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 9a23e657600..d3243ef2fc6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -16,12 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -77,7 +75,6 @@ public class ChangeLoggingSessionBytesStoreTest {
     public void shouldLogPuts() {
         final Bytes binaryKey = SessionKeySchema.toBinary(key1);
         when(inner.getPosition()).thenReturn(Position.emptyPosition());
-        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
 
         store.put(key1, value1);
 
@@ -89,7 +86,6 @@ public class ChangeLoggingSessionBytesStoreTest {
     public void shouldLogPutsWithPosition() {
         final Bytes binaryKey = SessionKeySchema.toBinary(key1);
         when(inner.getPosition()).thenReturn(POSITION);
-        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
 
         store.put(key1, value1);
 
@@ -101,7 +97,6 @@ public class ChangeLoggingSessionBytesStoreTest {
     public void shouldLogRemoves() {
         final Bytes binaryKey = SessionKeySchema.toBinary(key1);
         when(inner.getPosition()).thenReturn(Position.emptyPosition());
-        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
 
         store.remove(key1);
         store.remove(key1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 1c1b713ce21..03701bdcb00 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -17,11 +17,9 @@
 
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
@@ -79,9 +77,8 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
     public void shouldLogPuts() {
         final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
         when(inner.getPosition()).thenReturn(Position.emptyPosition());
-        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
 
-        store.put(bytesKey, valueAndTimestamp, 
context.recordContext().timestamp());
+        store.put(bytesKey, valueAndTimestamp, context.timestamp());
 
         verify(inner).put(bytesKey, valueAndTimestamp, 0);
         verify(context).logChange(store.name(), key, value, 42, 
Position.emptyPosition());
@@ -91,9 +88,8 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
     public void shouldLogPutsWithPosition() {
         final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
         when(inner.getPosition()).thenReturn(POSITION);
-        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
 
-        store.put(bytesKey, valueAndTimestamp, 
context.recordContext().timestamp());
+        store.put(bytesKey, valueAndTimestamp, context.timestamp());
 
         verify(inner).put(bytesKey, valueAndTimestamp, 0);
         verify(context).logChange(store.name(), key, value, 42, POSITION);
@@ -122,10 +118,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
         final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
         final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
         when(inner.getPosition()).thenReturn(Position.emptyPosition());
-        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
 
-        store.put(bytesKey, valueAndTimestamp, 
context.recordContext().timestamp());
-        store.put(bytesKey, valueAndTimestamp, 
context.recordContext().timestamp());
+        store.put(bytesKey, valueAndTimestamp, context.timestamp());
+        store.put(bytesKey, valueAndTimestamp, context.timestamp());
 
         verify(inner, times(2)).put(bytesKey, valueAndTimestamp, 0);
         verify(context).logChange(store.name(), key1, value, 42L, 
Position.emptyPosition());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index e80a2325a2a..2607e56ad9f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -17,11 +17,9 @@
 
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
@@ -78,9 +76,8 @@ public class ChangeLoggingWindowBytesStoreTest {
     public void shouldLogPuts() {
         final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
         when(inner.getPosition()).thenReturn(Position.emptyPosition());
-        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
 
-        store.put(bytesKey, value, context.recordContext().timestamp());
+        store.put(bytesKey, value, context.timestamp());
 
         verify(inner).put(bytesKey, value, 0);
         verify(context).logChange(store.name(), key, value, 0L, 
Position.emptyPosition());
@@ -90,9 +87,8 @@ public class ChangeLoggingWindowBytesStoreTest {
     public void shouldLogPutsWithPosition() {
         final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
         when(inner.getPosition()).thenReturn(POSITION);
-        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
 
-        store.put(bytesKey, value, context.recordContext().timestamp());
+        store.put(bytesKey, value, context.timestamp());
 
         verify(inner).put(bytesKey, value, 0);
         verify(context).logChange(store.name(), key, value, 0L, POSITION);
@@ -135,13 +131,12 @@ public class ChangeLoggingWindowBytesStoreTest {
         store = new ChangeLoggingWindowBytesStore(inner, true, 
WindowKeySchema::toStoreKeyBinary);
         store.init(context, store);
         when(inner.getPosition()).thenReturn(Position.emptyPosition());
-        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
 
         final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
         final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
 
-        store.put(bytesKey, value, context.recordContext().timestamp());
-        store.put(bytesKey, value, context.recordContext().timestamp());
+        store.put(bytesKey, value, context.timestamp());
+        store.put(bytesKey, value, context.timestamp());
 
         verify(inner, times(2)).put(bytesKey, value, 0);
         verify(context).logChange(store.name(), key1, value, 0L, 
Position.emptyPosition());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 1c8935d1e1c..ba557104ebd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -223,10 +223,10 @@ public class MeteredWindowStoreTest {
     @Test
     public void shouldPutToInnerStoreAndRecordPutMetrics() {
         final byte[] bytes = "a".getBytes();
-        doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), 
eq(context.recordContext().timestamp()));
+        doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), 
eq(context.timestamp()));
 
         store.init(context, store);
-        store.put("a", "a", context.recordContext().timestamp());
+        store.put("a", "a", context.timestamp());
 
         // it suffices to verify one put metric since all put metrics are 
recorded by the same sensor
         // and the sensor is tested elsewhere
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index ffa509d5188..21d16b09be4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -935,9 +935,9 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
                 new byte[0],
                 new RecordHeaders(),
                 true,
-                context.recordContext().offset(),
-                context.recordContext().timestamp(),
-                context.recordContext().partition(),
+                context.offset(),
+                context.timestamp(),
+                context.partition(),
                 "",
                 context.recordContext().sourceRawKey(),
                 context.recordContext().sourceRawValue()
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index 9d0db9bae0f..f4ff30002ae 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -941,9 +941,9 @@ public class TimeOrderedWindowStoreTest {
                 new byte[0],
                 new RecordHeaders(),
                 true,
-                context.recordContext().offset(),
-                context.recordContext().timestamp(),
-                context.recordContext().partition(),
+                context.offset(),
+                context.timestamp(),
+                context.partition(),
                 "",
                 context.recordContext().sourceRawKey(),
                 context.recordContext().sourceRawValue()


Reply via email to