This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push: new 35e59427434 Revert "KAFKA-13722: remove usage of old ProcessorContext (#18292)" (#20398) 35e59427434 is described below commit 35e594274342eb10035eeec39bf82620637a6825 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Mon Aug 25 03:43:39 2025 -0700 Revert "KAFKA-13722: remove usage of old ProcessorContext (#18292)" (#20398) This reverts commit f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d. Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, Eduwer Camacaro <eduw...@gmail.com>, Mickael Maison <mickael.mai...@gmail.com>, --- .../kstream/internals/KTableRepartitionMap.java | 2 +- .../internals/GlobalProcessorContextImpl.java | 2 +- .../processor/internals/GlobalStateUpdateTask.java | 4 ++-- .../processor/internals/ProcessorContextImpl.java | 8 ++++---- .../streams/processor/internals/ProcessorNode.java | 10 +++++----- .../kafka/streams/processor/internals/SinkNode.java | 6 +++--- .../kafka/streams/processor/internals/StreamTask.java | 4 ++-- .../streams/state/internals/CachingKeyValueStore.java | 10 +++++----- .../streams/state/internals/CachingSessionStore.java | 10 +++++----- .../streams/state/internals/CachingWindowStore.java | 10 +++++----- .../internals/ChangeLoggingKeyValueBytesStore.java | 10 +++++----- .../internals/ChangeLoggingListValueBytesStore.java | 4 ++-- .../internals/ChangeLoggingSessionBytesStore.java | 4 ++-- .../ChangeLoggingTimestampedKeyValueBytesStore.java | 6 +++--- .../ChangeLoggingTimestampedWindowBytesStore.java | 2 +- .../internals/ChangeLoggingWindowBytesStore.java | 2 +- .../streams/state/internals/MeteredKeyValueStore.java | 2 +- .../streams/state/internals/MeteredSessionStore.java | 2 +- .../streams/state/internals/MeteredWindowStore.java | 2 +- .../internals/TimeOrderedCachingWindowStore.java | 19 +++++++++---------- .../processor/internals/ProcessorContextImplTest.java | 4 ---- .../processor/internals/ProcessorNodeTest.java | 2 +- .../internals/ChangeLoggingSessionBytesStoreTest.java | 5 ----- .../ChangeLoggingTimestampedWindowBytesStoreTest.java | 13 ++++--------- .../internals/ChangeLoggingWindowBytesStoreTest.java | 13 ++++--------- .../state/internals/MeteredWindowStoreTest.java | 4 ++-- .../TimeOrderedCachingPersistentWindowStoreTest.java | 6 +++--- .../state/internals/TimeOrderedWindowStoreTest.java | 6 +++--- 28 files changed, 76 insertions(+), 96 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index a686692b40a..567d9a2947f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -213,7 +213,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableRepartitionMapS private ValueAndTimestamp<KeyValue<? extends K1, ? extends V1>> mapValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) { return ValueAndTimestamp.make( mapper.apply(key, getValueOrNull(valueAndTimestamp)), - valueAndTimestamp == null ? context.recordContext().timestamp() : valueAndTimestamp.timestamp() + valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp() ); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 01b694863fd..828ae3a0a79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -84,7 +84,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, @Override public <KIn, VIn> void forward(final KIn key, final VIn value) { - forward(new Record<>(key, value, recordContext().timestamp(), headers())); + forward(new Record<>(key, value, timestamp(), headers())); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index bfab9a770f6..1d8f18d326d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -120,8 +120,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { final Record<Object, Object> toProcess = new Record<>( deserialized.key(), deserialized.value(), - processorContext.recordContext().timestamp(), - processorContext.recordContext().headers() + processorContext.timestamp(), + processorContext.headers() ); ((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 93961daf97b..b5e0515522a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -190,7 +190,7 @@ public final class ProcessorContextImpl extends AbstractProcessorContext<Object, final Record<K, V> toForward = new Record<>( key, value, - recordContext.timestamp(), + timestamp(), headers() ); forward(toForward); @@ -204,7 +204,7 @@ public final class ProcessorContextImpl extends AbstractProcessorContext<Object, final Record<K, V> toForward = new Record<>( key, value, - toInternal.hasTimestamp() ? toInternal.timestamp() : recordContext.timestamp(), + toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(), headers() ); forward(toForward, toInternal.child()); @@ -250,11 +250,11 @@ public final class ProcessorContextImpl extends AbstractProcessorContext<Object, // old API processors wouldn't see the timestamps or headers of upstream // new API processors. But then again, from the perspective of those old-API // processors, even consulting the timestamp or headers when the record context - // is undefined is itself not well-defined. Plus, I don't think we need to worry + // is undefined is itself not well defined. Plus, I don't think we need to worry // too much about heterogeneous applications, in which the upstream processor is // implementing the new API and the downstream one is implementing the old API. // So, this seems like a fine compromise for now. - if (recordContext != null && (record.timestamp() != recordContext.timestamp() || record.headers() != recordContext.headers())) { + if (recordContext != null && (record.timestamp() != timestamp() || record.headers() != headers())) { recordContext = new ProcessorRecordContext( record.timestamp(), recordContext.offset(), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 1dddc55ca3c..62173e807fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -209,13 +209,13 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> { // (instead of `RuntimeException`) to work well with those languages final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( null, // only required to pass for DeserializationExceptionHandler - internalProcessorContext.recordContext().topic(), - internalProcessorContext.recordContext().partition(), - internalProcessorContext.recordContext().offset(), - internalProcessorContext.recordContext().headers(), + internalProcessorContext.topic(), + internalProcessorContext.partition(), + internalProcessorContext.offset(), + internalProcessorContext.headers(), internalProcessorContext.currentNode().name(), internalProcessorContext.taskId(), - internalProcessorContext.recordContext().timestamp(), + internalProcessorContext.timestamp(), internalProcessorContext.recordContext().sourceRawKey(), internalProcessorContext.recordContext().sourceRawValue() ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index d32cf2523e0..8cb82a6bfa5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -85,9 +85,9 @@ public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> { final ProcessorRecordContext contextForExtraction = new ProcessorRecordContext( timestamp, - context.recordContext().offset(), - context.recordContext().partition(), - context.recordContext().topic(), + context.offset(), + context.partition(), + context.topic(), record.headers() ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 82e9c8d7fb1..8c3b6cc506b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -866,8 +866,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, final Record<Object, Object> toProcess = new Record<>( record.key(), record.value(), - processorContext.recordContext().timestamp(), - processorContext.recordContext().headers() + processorContext.timestamp(), + processorContext.headers() ); maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 83343d04494..389cf688f4a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -272,12 +272,12 @@ public class CachingKeyValueStore key, new LRUCacheEntry( value, - internalContext.recordContext().headers(), + internalContext.headers(), true, - internalContext.recordContext().offset(), - internalContext.recordContext().timestamp(), - internalContext.recordContext().partition(), - internalContext.recordContext().topic(), + internalContext.offset(), + internalContext.timestamp(), + internalContext.partition(), + internalContext.topic(), internalContext.recordContext().sourceRawKey(), internalContext.recordContext().sourceRawValue() ) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index ec0c1bd077d..7bb615ea4ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -135,12 +135,12 @@ class CachingSessionStore final LRUCacheEntry entry = new LRUCacheEntry( value, - internalContext.recordContext().headers(), + internalContext.headers(), true, - internalContext.recordContext().offset(), - internalContext.recordContext().timestamp(), - internalContext.recordContext().partition(), - internalContext.recordContext().topic(), + internalContext.offset(), + internalContext.timestamp(), + internalContext.partition(), + internalContext.topic(), internalContext.recordContext().sourceRawKey(), internalContext.recordContext().sourceRawValue() ); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 0432c1726cb..38d98b58d7e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -153,12 +153,12 @@ class CachingWindowStore final LRUCacheEntry entry = new LRUCacheEntry( value, - internalContext.recordContext().headers(), + internalContext.headers(), true, - internalContext.recordContext().offset(), - internalContext.recordContext().timestamp(), - internalContext.recordContext().partition(), - internalContext.recordContext().topic(), + internalContext.offset(), + internalContext.timestamp(), + internalContext.partition(), + internalContext.topic(), internalContext.recordContext().sourceRawKey(), internalContext.recordContext().sourceRawValue() ); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 9c1c3f9ae76..b21b102cdfc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -52,7 +52,7 @@ public class ChangeLoggingKeyValueBytesStore if (wrapped() instanceof MemoryLRUCache) { ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> { // pass null to indicate removal - log(key, null, internalContext.recordContext().timestamp()); + log(key, null, internalContext.timestamp()); }); } } @@ -66,7 +66,7 @@ public class ChangeLoggingKeyValueBytesStore public void put(final Bytes key, final byte[] value) { wrapped().put(key, value); - log(key, value, internalContext.recordContext().timestamp()); + log(key, value, internalContext.timestamp()); } @Override @@ -75,7 +75,7 @@ public class ChangeLoggingKeyValueBytesStore final byte[] previous = wrapped().putIfAbsent(key, value); if (previous == null) { // then it was absent - log(key, value, internalContext.recordContext().timestamp()); + log(key, value, internalContext.timestamp()); } return previous; } @@ -84,7 +84,7 @@ public class ChangeLoggingKeyValueBytesStore public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { wrapped().putAll(entries); for (final KeyValue<Bytes, byte[]> entry : entries) { - log(entry.key, entry.value, internalContext.recordContext().timestamp()); + log(entry.key, entry.value, internalContext.timestamp()); } } @@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStore @Override public byte[] delete(final Bytes key) { final byte[] oldValue = wrapped().delete(key); - log(key, null, internalContext.recordContext().timestamp()); + log(key, null, internalContext.timestamp()); return oldValue; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java index ba43ba30b17..9070fc8da5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java @@ -32,9 +32,9 @@ public class ChangeLoggingListValueBytesStore extends ChangeLoggingKeyValueBytes // we need to log the full new list and thus call get() on the inner store below // if the value is a tombstone, we delete the whole list and thus can save the get call if (value == null) { - log(key, null, internalContext.recordContext().timestamp()); + log(key, null, internalContext.timestamp()); } else { - log(key, wrapped().get(key), internalContext.recordContext().timestamp()); + log(key, wrapped().get(key), internalContext.timestamp()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 248889211c3..06097aa7a71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -73,13 +73,13 @@ public class ChangeLoggingSessionBytesStore @Override public void remove(final Windowed<Bytes> sessionKey) { wrapped().remove(sessionKey); - internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.recordContext().timestamp(), wrapped().getPosition()); + internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.timestamp(), wrapped().getPosition()); } @Override public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) { wrapped().put(sessionKey, aggregate); - internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.recordContext().timestamp(), wrapped().getPosition()); + internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.timestamp(), wrapped().getPosition()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java index b95ede1bba8..916c9547184 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java @@ -35,7 +35,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey public void put(final Bytes key, final byte[] valueAndTimestamp) { wrapped().put(key, valueAndTimestamp); - log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp)); + log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); } @Override @@ -44,7 +44,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp); if (previous == null) { // then it was absent - log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp)); + log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); } return previous; } @@ -54,7 +54,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey wrapped().putAll(entries); for (final KeyValue<Bytes, byte[]> entry : entries) { final byte[] valueAndTimestamp = entry.value; - log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp)); + log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); } } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java index 5ae334f95cc..2bf87f9d2a8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java @@ -36,7 +36,7 @@ class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesS name(), key, rawValue(valueAndTimestamp), - valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.recordContext().timestamp(), + valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.timestamp(), wrapped().getPosition() ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index 0d0f378af75..d5857d0456e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -129,7 +129,7 @@ class ChangeLoggingWindowBytesStore } void log(final Bytes key, final byte[] value) { - internalContext.logChange(name(), key, value, internalContext.recordContext().timestamp(), wrapped().getPosition()); + internalContext.logChange(name(), key, value, internalContext.timestamp(), wrapped().getPosition()); } private int maybeUpdateSeqnumForDups() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 0962033b7ef..8678111f989 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -417,7 +417,7 @@ public class MeteredKeyValueStore<K, V> // In that case, we _can't_ get the current timestamp, so we don't record anything. if (e2eLatencySensor.shouldRecord() && internalContext != null) { final long currentTime = time.milliseconds(); - final long e2eLatency = currentTime - internalContext.recordContext().timestamp(); + final long e2eLatency = currentTime - internalContext.timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 234ac1220f7..546959a9269 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -494,7 +494,7 @@ public class MeteredSessionStore<K, V> // In that case, we _can't_ get the current timestamp, so we don't record anything. if (e2eLatencySensor.shouldRecord() && internalContext != null) { final long currentTime = time.milliseconds(); - final long e2eLatency = currentTime - internalContext.recordContext().timestamp(); + final long e2eLatency = currentTime - internalContext.timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 2da877453ce..783c16b2f4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -510,7 +510,7 @@ public class MeteredWindowStore<K, V> // In that case, we _can't_ get the current timestamp, so we don't record anything. if (e2eLatencySensor.shouldRecord() && internalContext != null) { final long currentTime = time.milliseconds(); - final long e2eLatency = currentTime - internalContext.recordContext().timestamp(); + final long e2eLatency = currentTime - internalContext.timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java index 646cbf2ca35..15a728ba0d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java @@ -97,13 +97,12 @@ class TimeOrderedCachingWindowStore hasIndex = timeOrderedWindowStore.hasIndex(); } - @SuppressWarnings("unchecked") private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore wrapped) { if (wrapped instanceof RocksDBTimeOrderedWindowStore) { return (RocksDBTimeOrderedWindowStore) wrapped; } if (wrapped instanceof WrappedStateStore) { - return getWrappedStore(((WrappedStateStore<?, Bytes, byte[]>) wrapped).wrapped()); + return getWrappedStore(((WrappedStateStore<?, ?, ?>) wrapped).wrapped()); } return null; } @@ -256,12 +255,12 @@ class TimeOrderedCachingWindowStore final LRUCacheEntry entry = new LRUCacheEntry( value, - internalContext.recordContext().headers(), + internalContext.headers(), true, - internalContext.recordContext().offset(), - internalContext.recordContext().timestamp(), - internalContext.recordContext().partition(), - internalContext.recordContext().topic(), + internalContext.offset(), + internalContext.timestamp(), + internalContext.partition(), + internalContext.topic(), internalContext.recordContext().sourceRawKey(), internalContext.recordContext().sourceRawValue() ); @@ -278,9 +277,9 @@ class TimeOrderedCachingWindowStore new byte[0], new RecordHeaders(), true, - internalContext.recordContext().offset(), - internalContext.recordContext().timestamp(), - internalContext.recordContext().partition(), + internalContext.offset(), + internalContext.timestamp(), + internalContext.partition(), "", internalContext.recordContext().sourceRawKey(), internalContext.recordContext().sourceRawValue() diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 9410ca5a978..42c466c2120 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -593,8 +593,6 @@ public class ProcessorContextImplTest { @Test public void shouldThrowUnsupportedOperationExceptionOnForward() { context = getStandbyContext(); - context.recordContext = mock(ProcessorRecordContext.class); - assertThrows( UnsupportedOperationException.class, () -> context.forward("key", "value") @@ -604,8 +602,6 @@ public class ProcessorContextImplTest { @Test public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() { context = getStandbyContext(); - context.recordContext = mock(ProcessorRecordContext.class); - assertThrows( UnsupportedOperationException.class, () -> context.forward("key", "value", To.child("child-name")) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 5341cd25f0d..ce5fddb870a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -362,7 +362,7 @@ public class ProcessorNodeTest { assertEquals(internalProcessorContext.offset(), context.offset()); assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); assertEquals(internalProcessorContext.taskId(), context.taskId()); - assertEquals(internalProcessorContext.recordContext().timestamp(), context.timestamp()); + assertEquals(internalProcessorContext.timestamp(), context.timestamp()); assertEquals(internalProcessorContext.recordContext().sourceRawKey(), context.sourceRawKey()); assertEquals(internalProcessorContext.recordContext().sourceRawValue(), context.sourceRawValue()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index 9a23e657600..d3243ef2fc6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -16,12 +16,10 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; -import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -77,7 +75,6 @@ public class ChangeLoggingSessionBytesStoreTest { public void shouldLogPuts() { final Bytes binaryKey = SessionKeySchema.toBinary(key1); when(inner.getPosition()).thenReturn(Position.emptyPosition()); - when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); store.put(key1, value1); @@ -89,7 +86,6 @@ public class ChangeLoggingSessionBytesStoreTest { public void shouldLogPutsWithPosition() { final Bytes binaryKey = SessionKeySchema.toBinary(key1); when(inner.getPosition()).thenReturn(POSITION); - when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); store.put(key1, value1); @@ -101,7 +97,6 @@ public class ChangeLoggingSessionBytesStoreTest { public void shouldLogRemoves() { final Bytes binaryKey = SessionKeySchema.toBinary(key1); when(inner.getPosition()).thenReturn(Position.emptyPosition()); - when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); store.remove(key1); store.remove(key1); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java index 1c1b713ce21..03701bdcb00 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java @@ -17,11 +17,9 @@ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; -import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; @@ -79,9 +77,8 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { public void shouldLogPuts() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(Position.emptyPosition()); - when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); + store.put(bytesKey, valueAndTimestamp, context.timestamp()); verify(inner).put(bytesKey, valueAndTimestamp, 0); verify(context).logChange(store.name(), key, value, 42, Position.emptyPosition()); @@ -91,9 +88,8 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { public void shouldLogPutsWithPosition() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(POSITION); - when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); + store.put(bytesKey, valueAndTimestamp, context.timestamp()); verify(inner).put(bytesKey, valueAndTimestamp, 0); verify(context).logChange(store.name(), key, value, 42, POSITION); @@ -122,10 +118,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1); final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2); when(inner.getPosition()).thenReturn(Position.emptyPosition()); - when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); - store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); + store.put(bytesKey, valueAndTimestamp, context.timestamp()); + store.put(bytesKey, valueAndTimestamp, context.timestamp()); verify(inner, times(2)).put(bytesKey, valueAndTimestamp, 0); verify(context).logChange(store.name(), key1, value, 42L, Position.emptyPosition()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java index e80a2325a2a..2607e56ad9f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java @@ -17,11 +17,9 @@ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; -import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; @@ -78,9 +76,8 @@ public class ChangeLoggingWindowBytesStoreTest { public void shouldLogPuts() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(Position.emptyPosition()); - when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, value, context.recordContext().timestamp()); + store.put(bytesKey, value, context.timestamp()); verify(inner).put(bytesKey, value, 0); verify(context).logChange(store.name(), key, value, 0L, Position.emptyPosition()); @@ -90,9 +87,8 @@ public class ChangeLoggingWindowBytesStoreTest { public void shouldLogPutsWithPosition() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(POSITION); - when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, value, context.recordContext().timestamp()); + store.put(bytesKey, value, context.timestamp()); verify(inner).put(bytesKey, value, 0); verify(context).logChange(store.name(), key, value, 0L, POSITION); @@ -135,13 +131,12 @@ public class ChangeLoggingWindowBytesStoreTest { store = new ChangeLoggingWindowBytesStore(inner, true, WindowKeySchema::toStoreKeyBinary); store.init(context, store); when(inner.getPosition()).thenReturn(Position.emptyPosition()); - when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1); final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2); - store.put(bytesKey, value, context.recordContext().timestamp()); - store.put(bytesKey, value, context.recordContext().timestamp()); + store.put(bytesKey, value, context.timestamp()); + store.put(bytesKey, value, context.timestamp()); verify(inner, times(2)).put(bytesKey, value, 0); verify(context).logChange(store.name(), key1, value, 0L, Position.emptyPosition()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 1c8935d1e1c..ba557104ebd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -223,10 +223,10 @@ public class MeteredWindowStoreTest { @Test public void shouldPutToInnerStoreAndRecordPutMetrics() { final byte[] bytes = "a".getBytes(); - doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), eq(context.recordContext().timestamp())); + doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), eq(context.timestamp())); store.init(context, store); - store.put("a", "a", context.recordContext().timestamp()); + store.put("a", "a", context.timestamp()); // it suffices to verify one put metric since all put metrics are recorded by the same sensor // and the sensor is tested elsewhere diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java index ffa509d5188..21d16b09be4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java @@ -935,9 +935,9 @@ public class TimeOrderedCachingPersistentWindowStoreTest { new byte[0], new RecordHeaders(), true, - context.recordContext().offset(), - context.recordContext().timestamp(), - context.recordContext().partition(), + context.offset(), + context.timestamp(), + context.partition(), "", context.recordContext().sourceRawKey(), context.recordContext().sourceRawValue() diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java index 9d0db9bae0f..f4ff30002ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java @@ -941,9 +941,9 @@ public class TimeOrderedWindowStoreTest { new byte[0], new RecordHeaders(), true, - context.recordContext().offset(), - context.recordContext().timestamp(), - context.recordContext().partition(), + context.offset(), + context.timestamp(), + context.partition(), "", context.recordContext().sourceRawKey(), context.recordContext().sourceRawValue()