This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f13a22af0b3 KAFKA-13722: remove usage of old ProcessorContext (#18292)
f13a22af0b3 is described below
commit f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Jan 24 10:31:31 2025 -0800
KAFKA-13722: remove usage of old ProcessorContext (#18292)
We want to deprecate an remove the old ProcessorContext. Thus, we need
to refactor Kafka Streams runtime code, to not make calls into the old
ProcessorContext but only into new code path.
Reviewers: Bill Bejeck <[email protected]>
---
.../kstream/internals/KTableRepartitionMap.java | 2 +-
.../internals/GlobalProcessorContextImpl.java | 2 +-
.../processor/internals/GlobalStateUpdateTask.java | 4 ++--
.../processor/internals/ProcessorContextImpl.java | 8 ++++----
.../processor/internals/ProcessorContextUtils.java | 12 +++--------
.../streams/processor/internals/ProcessorNode.java | 11 ++++++-----
.../streams/processor/internals/SinkNode.java | 6 +++---
.../streams/processor/internals/StreamTask.java | 4 ++--
...stractDualSchemaRocksDBSegmentedBytesStore.java | 2 +-
.../state/internals/CachingKeyValueStore.java | 12 ++++++-----
.../state/internals/CachingSessionStore.java | 11 ++++++-----
.../state/internals/CachingWindowStore.java | 11 ++++++-----
.../internals/ChangeLoggingKeyValueBytesStore.java | 10 +++++-----
.../ChangeLoggingListValueBytesStore.java | 4 ++--
.../internals/ChangeLoggingSessionBytesStore.java | 4 ++--
...ChangeLoggingTimestampedKeyValueBytesStore.java | 6 +++---
.../ChangeLoggingTimestampedWindowBytesStore.java | 2 +-
.../internals/ChangeLoggingWindowBytesStore.java | 2 +-
.../state/internals/MeteredKeyValueStore.java | 2 +-
.../state/internals/MeteredSessionStore.java | 2 +-
.../state/internals/MeteredWindowStore.java | 2 +-
.../internals/TimeOrderedCachingWindowStore.java | 23 ++++++++++++----------
.../internals/AbstractProcessorContextTest.java | 2 +-
.../internals/ProcessorContextImplTest.java | 4 ++++
.../processor/internals/ProcessorNodeTest.java | 2 +-
.../ChangeLoggingSessionBytesStoreTest.java | 5 +++++
...angeLoggingTimestampedWindowBytesStoreTest.java | 13 ++++++++----
.../ChangeLoggingWindowBytesStoreTest.java | 13 ++++++++----
.../internals/GlobalStateStoreProviderTest.java | 4 ++--
.../state/internals/MeteredWindowStoreTest.java | 4 ++--
...imeOrderedCachingPersistentWindowStoreTest.java | 6 +++---
.../internals/TimeOrderedWindowStoreTest.java | 9 +++++----
.../apache/kafka/test/NoOpProcessorContext.java | 6 +++---
.../apache/kafka/streams/TopologyTestDriver.java | 5 +++--
34 files changed, 119 insertions(+), 96 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index f5cc449db7f..7c2361565da 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -213,7 +213,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements
KTableRepartitionMapS
private ValueAndTimestamp<KeyValue<K1, V1>> mapValue(final K key,
final ValueAndTimestamp<V> valueAndTimestamp) {
return ValueAndTimestamp.make(
mapper.apply(key, getValueOrNull(valueAndTimestamp)),
- valueAndTimestamp == null ? context.timestamp() :
valueAndTimestamp.timestamp()
+ valueAndTimestamp == null ?
context.recordContext().timestamp() : valueAndTimestamp.timestamp()
);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 828ae3a0a79..01b694863fd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -84,7 +84,7 @@ public class GlobalProcessorContextImpl extends
AbstractProcessorContext<Object,
@Override
public <KIn, VIn> void forward(final KIn key, final VIn value) {
- forward(new Record<>(key, value, timestamp(), headers()));
+ forward(new Record<>(key, value, recordContext().timestamp(),
headers()));
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 12a6beedbcd..f417ce76b5b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -119,8 +119,8 @@ public class GlobalStateUpdateTask implements
GlobalStateMaintainer {
final Record<Object, Object> toProcess = new Record<>(
deserialized.key(),
deserialized.value(),
- processorContext.timestamp(),
- processorContext.headers()
+ processorContext.recordContext().timestamp(),
+ processorContext.recordContext().headers()
);
((SourceNode<Object, Object>)
sourceNodeAndDeserializer.sourceNode()).process(toProcess);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 6a53afd07b3..8f739d0c056 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -190,7 +190,7 @@ public final class ProcessorContextImpl extends
AbstractProcessorContext<Object,
final Record<K, V> toForward = new Record<>(
key,
value,
- timestamp(),
+ recordContext.timestamp(),
headers()
);
forward(toForward);
@@ -204,7 +204,7 @@ public final class ProcessorContextImpl extends
AbstractProcessorContext<Object,
final Record<K, V> toForward = new Record<>(
key,
value,
- toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+ toInternal.hasTimestamp() ? toInternal.timestamp() :
recordContext.timestamp(),
headers()
);
forward(toForward, toInternal.child());
@@ -250,11 +250,11 @@ public final class ProcessorContextImpl extends
AbstractProcessorContext<Object,
// old API processors wouldn't see the timestamps or headers of
upstream
// new API processors. But then again, from the perspective of
those old-API
// processors, even consulting the timestamp or headers when the
record context
- // is undefined is itself not well defined. Plus, I don't think we
need to worry
+ // is undefined is itself not well-defined. Plus, I don't think we
need to worry
// too much about heterogeneous applications, in which the
upstream processor is
// implementing the new API and the downstream one is implementing
the old API.
// So, this seems like a fine compromise for now.
- if (recordContext != null && (record.timestamp() != timestamp() ||
record.headers() != headers())) {
+ if (recordContext != null && (record.timestamp() !=
recordContext.timestamp() || record.headers() != recordContext.headers())) {
recordContext = new ProcessorRecordContext(
record.timestamp(),
recordContext.offset(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index 906d129aa49..20890088999 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -34,13 +34,6 @@ public final class ProcessorContextUtils {
private ProcessorContextUtils() {}
- /**
- * Should be removed as part of KAFKA-10217
- */
- public static StreamsMetricsImpl metricsImpl(final ProcessorContext
context) {
- return (StreamsMetricsImpl) context.metrics();
- }
-
/**
* Should be removed as part of KAFKA-10217
*/
@@ -71,9 +64,10 @@ public final class ProcessorContextUtils {
}
}
- public static InternalProcessorContext asInternalProcessorContext(final
ProcessorContext context) {
+ @SuppressWarnings("unchecked")
+ public static <K, V> InternalProcessorContext<K, V>
asInternalProcessorContext(final ProcessorContext context) {
if (context instanceof InternalProcessorContext) {
- return (InternalProcessorContext) context;
+ return (InternalProcessorContext<K, V>) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams
and must be disabled for unit tests."
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 2bb58eb6b82..5d245ef5f30 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -209,13 +209,14 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
// (instead of `RuntimeException`) to work well with those
languages
final ErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
null, // only required to pass for
DeserializationExceptionHandler
- internalProcessorContext.topic(),
- internalProcessorContext.partition(),
- internalProcessorContext.offset(),
- internalProcessorContext.headers(),
+ internalProcessorContext.recordContext().topic(),
+ internalProcessorContext.recordContext().partition(),
+ internalProcessorContext.recordContext().offset(),
+ internalProcessorContext.recordContext().headers(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId(),
- internalProcessorContext.timestamp());
+ internalProcessorContext.recordContext().timestamp()
+ );
final ProcessingExceptionHandler.ProcessingHandlerResponse
response;
try {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index e95e098c26c..c7dcf135eaa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -85,9 +85,9 @@ public class SinkNode<KIn, VIn> extends ProcessorNode<KIn,
VIn, Void, Void> {
final ProcessorRecordContext contextForExtraction =
new ProcessorRecordContext(
timestamp,
- context.offset(),
- context.partition(),
- context.topic(),
+ context.recordContext().offset(),
+ context.recordContext().partition(),
+ context.recordContext().topic(),
record.headers()
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 5000522ed0d..b612223197e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -861,8 +861,8 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
final Record<Object, Object> toProcess = new Record<>(
record.key(),
record.value(),
- processorContext.timestamp(),
- processorContext.headers()
+ processorContext.recordContext().timestamp(),
+ processorContext.recordContext().headers()
);
maybeMeasureLatency(() -> currNode.process(toProcess), time,
processLatencySensor);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index e55d8452fae..e05b6328ec8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -55,7 +55,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected final Optional<KeySchema> indexKeySchema;
private final long retentionPeriod;
- protected InternalProcessorContext internalProcessorContext;
+ protected InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
protected boolean consistencyEnabled = false;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index b5f05b5c475..f59271920f5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -272,12 +272,14 @@ public class CachingKeyValueStore
key,
new LRUCacheEntry(
value,
- internalContext.headers(),
+ internalContext.recordContext().headers(),
true,
- internalContext.offset(),
- internalContext.timestamp(),
- internalContext.partition(),
- internalContext.topic()));
+ internalContext.recordContext().offset(),
+ internalContext.recordContext().timestamp(),
+ internalContext.recordContext().partition(),
+ internalContext.recordContext().topic()
+ )
+ );
StoreQueryUtils.updatePosition(position, internalContext);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index c863050f94d..00dbaa5589b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -135,12 +135,13 @@ class CachingSessionStore
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
- internalContext.headers(),
+ internalContext.recordContext().headers(),
true,
- internalContext.offset(),
- internalContext.timestamp(),
- internalContext.partition(),
- internalContext.topic());
+ internalContext.recordContext().offset(),
+ internalContext.recordContext().timestamp(),
+ internalContext.recordContext().partition(),
+ internalContext.recordContext().topic()
+ );
internalContext.cache().put(cacheName,
cacheFunction.cacheKey(binaryKey), entry);
maxObservedTimestamp = Math.max(keySchema.segmentTimestamp(binaryKey),
maxObservedTimestamp);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index dff0ac70749..f138ff9202a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -153,12 +153,13 @@ class CachingWindowStore
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
- internalContext.headers(),
+ internalContext.recordContext().headers(),
true,
- internalContext.offset(),
- internalContext.timestamp(),
- internalContext.partition(),
- internalContext.topic());
+ internalContext.recordContext().offset(),
+ internalContext.recordContext().timestamp(),
+ internalContext.recordContext().partition(),
+ internalContext.recordContext().topic()
+ );
internalContext.cache().put(cacheName,
cacheFunction.cacheKey(keyBytes), entry);
maxObservedTimestamp.set(Math.max(keySchema.segmentTimestamp(keyBytes),
maxObservedTimestamp.get()));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 78bcbd83a0b..5405ad9a71c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -52,7 +52,7 @@ public class ChangeLoggingKeyValueBytesStore
if (wrapped() instanceof MemoryLRUCache) {
((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
// pass null to indicate removal
- log(key, null, internalContext.timestamp());
+ log(key, null, internalContext.recordContext().timestamp());
});
}
}
@@ -66,7 +66,7 @@ public class ChangeLoggingKeyValueBytesStore
public void put(final Bytes key,
final byte[] value) {
wrapped().put(key, value);
- log(key, value, internalContext.timestamp());
+ log(key, value, internalContext.recordContext().timestamp());
}
@Override
@@ -75,7 +75,7 @@ public class ChangeLoggingKeyValueBytesStore
final byte[] previous = wrapped().putIfAbsent(key, value);
if (previous == null) {
// then it was absent
- log(key, value, internalContext.timestamp());
+ log(key, value, internalContext.recordContext().timestamp());
}
return previous;
}
@@ -84,7 +84,7 @@ public class ChangeLoggingKeyValueBytesStore
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
- log(entry.key, entry.value, internalContext.timestamp());
+ log(entry.key, entry.value,
internalContext.recordContext().timestamp());
}
}
@@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStore
@Override
public byte[] delete(final Bytes key) {
final byte[] oldValue = wrapped().delete(key);
- log(key, null, internalContext.timestamp());
+ log(key, null, internalContext.recordContext().timestamp());
return oldValue;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
index 9070fc8da5f..ba43ba30b17 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
@@ -32,9 +32,9 @@ public class ChangeLoggingListValueBytesStore extends
ChangeLoggingKeyValueBytes
// we need to log the full new list and thus call get() on the inner
store below
// if the value is a tombstone, we delete the whole list and thus can
save the get call
if (value == null) {
- log(key, null, internalContext.timestamp());
+ log(key, null, internalContext.recordContext().timestamp());
} else {
- log(key, wrapped().get(key), internalContext.timestamp());
+ log(key, wrapped().get(key),
internalContext.recordContext().timestamp());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 06097aa7a71..248889211c3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -73,13 +73,13 @@ public class ChangeLoggingSessionBytesStore
@Override
public void remove(final Windowed<Bytes> sessionKey) {
wrapped().remove(sessionKey);
- internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), null, internalContext.timestamp(),
wrapped().getPosition());
+ internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), null,
internalContext.recordContext().timestamp(), wrapped().getPosition());
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
wrapped().put(sessionKey, aggregate);
- internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.timestamp(),
wrapped().getPosition());
+ internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), aggregate,
internalContext.recordContext().timestamp(), wrapped().getPosition());
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
index 916c9547184..b95ede1bba8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
@@ -35,7 +35,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore
extends ChangeLoggingKey
public void put(final Bytes key,
final byte[] valueAndTimestamp) {
wrapped().put(key, valueAndTimestamp);
- log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.timestamp() : timestamp(valueAndTimestamp));
+ log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
}
@Override
@@ -44,7 +44,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore
extends ChangeLoggingKey
final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp);
if (previous == null) {
// then it was absent
- log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.timestamp() : timestamp(valueAndTimestamp));
+ log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
}
return previous;
}
@@ -54,7 +54,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore
extends ChangeLoggingKey
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
final byte[] valueAndTimestamp = entry.value;
- log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp ==
null ? internalContext.timestamp() : timestamp(valueAndTimestamp));
+ log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp ==
null ? internalContext.recordContext().timestamp() :
timestamp(valueAndTimestamp));
}
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
index 2bf87f9d2a8..5ae334f95cc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
@@ -36,7 +36,7 @@ class ChangeLoggingTimestampedWindowBytesStore extends
ChangeLoggingWindowBytesS
name(),
key,
rawValue(valueAndTimestamp),
- valueAndTimestamp != null ? timestamp(valueAndTimestamp) :
internalContext.timestamp(),
+ valueAndTimestamp != null ? timestamp(valueAndTimestamp) :
internalContext.recordContext().timestamp(),
wrapped().getPosition()
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index d5857d0456e..0d0f378af75 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -129,7 +129,7 @@ class ChangeLoggingWindowBytesStore
}
void log(final Bytes key, final byte[] value) {
- internalContext.logChange(name(), key, value,
internalContext.timestamp(), wrapped().getPosition());
+ internalContext.logChange(name(), key, value,
internalContext.recordContext().timestamp(), wrapped().getPosition());
}
private int maybeUpdateSeqnumForDups() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 991b9b365d7..b22da77441c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -430,7 +430,7 @@ public class MeteredKeyValueStore<K, V>
// In that case, we _can't_ get the current timestamp, so we don't
record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds();
- final long e2eLatency = currentTime - internalContext.timestamp();
+ final long e2eLatency = currentTime -
internalContext.recordContext().timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 7fb7276bcfc..20a32bbb1f7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -497,7 +497,7 @@ public class MeteredSessionStore<K, V>
// In that case, we _can't_ get the current timestamp, so we don't
record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds();
- final long e2eLatency = currentTime - internalContext.timestamp();
+ final long e2eLatency = currentTime -
internalContext.recordContext().timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index ed3d31e86d0..923728fc409 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -513,7 +513,7 @@ public class MeteredWindowStore<K, V>
// In that case, we _can't_ get the current timestamp, so we don't
record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds();
- final long e2eLatency = currentTime - internalContext.timestamp();
+ final long e2eLatency = currentTime -
internalContext.recordContext().timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
index 47cbfde4c40..7f443c3e32c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
@@ -97,12 +97,13 @@ class TimeOrderedCachingWindowStore
hasIndex = timeOrderedWindowStore.hasIndex();
}
+ @SuppressWarnings("unchecked")
private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore
wrapped) {
if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
return (RocksDBTimeOrderedWindowStore) wrapped;
}
if (wrapped instanceof WrappedStateStore) {
- return getWrappedStore(((WrappedStateStore<?, ?, ?>)
wrapped).wrapped());
+ return getWrappedStore(((WrappedStateStore<?, Bytes, byte[]>)
wrapped).wrapped());
}
return null;
}
@@ -255,12 +256,13 @@ class TimeOrderedCachingWindowStore
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
- internalContext.headers(),
+ internalContext.recordContext().headers(),
true,
- internalContext.offset(),
- internalContext.timestamp(),
- internalContext.partition(),
- internalContext.topic());
+ internalContext.recordContext().offset(),
+ internalContext.recordContext().timestamp(),
+ internalContext.recordContext().partition(),
+ internalContext.recordContext().topic()
+ );
// Put to index first so that base can be evicted later
if (hasIndex) {
@@ -274,10 +276,11 @@ class TimeOrderedCachingWindowStore
new byte[0],
new RecordHeaders(),
true,
- internalContext.offset(),
- internalContext.timestamp(),
- internalContext.partition(),
- "");
+ internalContext.recordContext().offset(),
+ internalContext.recordContext().timestamp(),
+ internalContext.recordContext().partition(),
+ ""
+ );
final Bytes indexKey =
KeyFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0);
internalContext.cache().put(cacheName,
indexKeyCacheFunction.cacheKey(indexKey), emptyEntry);
} else {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index b4d5c994265..26b8d36f3b0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -58,7 +58,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class AbstractProcessorContextTest {
private final MockStreamsMetrics metrics = new MockStreamsMetrics(new
Metrics());
- private final AbstractProcessorContext context = new
TestProcessorContext(metrics);
+ private final AbstractProcessorContext<?, ?> context = new
TestProcessorContext(metrics);
private final MockKeyValueStore stateStore = new
MockKeyValueStore("store", false);
private final Headers headers = new RecordHeaders(new Header[]{new
RecordHeader("key", "value".getBytes())});
private final ProcessorRecordContext recordContext = new
ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 42c466c2120..9410ca5a978 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -593,6 +593,8 @@ public class ProcessorContextImplTest {
@Test
public void shouldThrowUnsupportedOperationExceptionOnForward() {
context = getStandbyContext();
+ context.recordContext = mock(ProcessorRecordContext.class);
+
assertThrows(
UnsupportedOperationException.class,
() -> context.forward("key", "value")
@@ -602,6 +604,8 @@ public class ProcessorContextImplTest {
@Test
public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
context = getStandbyContext();
+ context.recordContext = mock(ProcessorRecordContext.class);
+
assertThrows(
UnsupportedOperationException.class,
() -> context.forward("key", "value", To.child("child-name"))
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index a16315d363b..5b4303a1695 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -358,7 +358,7 @@ public class ProcessorNodeTest {
assertEquals(internalProcessorContext.offset(), context.offset());
assertEquals(internalProcessorContext.currentNode().name(),
context.processorNodeId());
assertEquals(internalProcessorContext.taskId(), context.taskId());
- assertEquals(internalProcessorContext.timestamp(),
context.timestamp());
+ assertEquals(internalProcessorContext.recordContext().timestamp(),
context.timestamp());
assertEquals(KEY, record.key());
assertEquals(VALUE, record.value());
assertInstanceOf(RuntimeException.class, exception);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index d3243ef2fc6..9a23e657600 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -16,10 +16,12 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
@@ -75,6 +77,7 @@ public class ChangeLoggingSessionBytesStoreTest {
public void shouldLogPuts() {
final Bytes binaryKey = SessionKeySchema.toBinary(key1);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
+ when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
store.put(key1, value1);
@@ -86,6 +89,7 @@ public class ChangeLoggingSessionBytesStoreTest {
public void shouldLogPutsWithPosition() {
final Bytes binaryKey = SessionKeySchema.toBinary(key1);
when(inner.getPosition()).thenReturn(POSITION);
+ when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
store.put(key1, value1);
@@ -97,6 +101,7 @@ public class ChangeLoggingSessionBytesStoreTest {
public void shouldLogRemoves() {
final Bytes binaryKey = SessionKeySchema.toBinary(key1);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
+ when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
store.remove(key1);
store.remove(key1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 03701bdcb00..1c1b713ce21 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -17,9 +17,11 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
@@ -77,8 +79,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
public void shouldLogPuts() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
+ when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
- store.put(bytesKey, valueAndTimestamp, context.timestamp());
+ store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
verify(inner).put(bytesKey, valueAndTimestamp, 0);
verify(context).logChange(store.name(), key, value, 42,
Position.emptyPosition());
@@ -88,8 +91,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
public void shouldLogPutsWithPosition() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(POSITION);
+ when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
- store.put(bytesKey, valueAndTimestamp, context.timestamp());
+ store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
verify(inner).put(bytesKey, valueAndTimestamp, 0);
verify(context).logChange(store.name(), key, value, 42, POSITION);
@@ -118,9 +122,10 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
+ when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
- store.put(bytesKey, valueAndTimestamp, context.timestamp());
- store.put(bytesKey, valueAndTimestamp, context.timestamp());
+ store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
+ store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
verify(inner, times(2)).put(bytesKey, valueAndTimestamp, 0);
verify(context).logChange(store.name(), key1, value, 42L,
Position.emptyPosition());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 2607e56ad9f..e80a2325a2a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -17,9 +17,11 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
@@ -76,8 +78,9 @@ public class ChangeLoggingWindowBytesStoreTest {
public void shouldLogPuts() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
+ when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
- store.put(bytesKey, value, context.timestamp());
+ store.put(bytesKey, value, context.recordContext().timestamp());
verify(inner).put(bytesKey, value, 0);
verify(context).logChange(store.name(), key, value, 0L,
Position.emptyPosition());
@@ -87,8 +90,9 @@ public class ChangeLoggingWindowBytesStoreTest {
public void shouldLogPutsWithPosition() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(POSITION);
+ when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
- store.put(bytesKey, value, context.timestamp());
+ store.put(bytesKey, value, context.recordContext().timestamp());
verify(inner).put(bytesKey, value, 0);
verify(context).logChange(store.name(), key, value, 0L, POSITION);
@@ -131,12 +135,13 @@ public class ChangeLoggingWindowBytesStoreTest {
store = new ChangeLoggingWindowBytesStore(inner, true,
WindowKeySchema::toStoreKeyBinary);
store.init(context, store);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
+ when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
- store.put(bytesKey, value, context.timestamp());
- store.put(bytesKey, value, context.timestamp());
+ store.put(bytesKey, value, context.recordContext().timestamp());
+ store.put(bytesKey, value, context.recordContext().timestamp());
verify(inner, times(2)).put(bytesKey, value, 0);
verify(context).logChange(store.name(), key1, value, 0L,
Position.emptyPosition());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 8c28a9eabec..4239e3e5000 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -108,7 +108,7 @@ public class GlobalStateStoreProviderTest {
Serdes.String(),
Serdes.String()).build());
- final ProcessorContextImpl mockContext =
mock(ProcessorContextImpl.class);
+ final InternalProcessorContext<?, ?> mockContext =
mock(InternalProcessorContext.class);
when(mockContext.applicationId()).thenReturn("appId");
when(mockContext.metrics())
.thenReturn(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index ba557104ebd..1c8935d1e1c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -223,10 +223,10 @@ public class MeteredWindowStoreTest {
@Test
public void shouldPutToInnerStoreAndRecordPutMetrics() {
final byte[] bytes = "a".getBytes();
- doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(),
eq(context.timestamp()));
+ doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(),
eq(context.recordContext().timestamp()));
store.init(context, store);
- store.put("a", "a", context.timestamp());
+ store.put("a", "a", context.recordContext().timestamp());
// it suffices to verify one put metric since all put metrics are
recorded by the same sensor
// and the sensor is tested elsewhere
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index f2f1d513704..7aba2434457 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -935,9 +935,9 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
new byte[0],
new RecordHeaders(),
true,
- context.offset(),
- context.timestamp(),
- context.partition(),
+ context.recordContext().offset(),
+ context.recordContext().timestamp(),
+ context.recordContext().partition(),
"")
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index a82ca8e7300..9eb9ec21b5e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -941,10 +941,11 @@ public class TimeOrderedWindowStoreTest {
new byte[0],
new RecordHeaders(),
true,
- context.offset(),
- context.timestamp(),
- context.partition(),
- "")
+ context.recordContext().offset(),
+ context.recordContext().timestamp(),
+ context.recordContext().partition(),
+ ""
+ )
);
underlyingStore.put(key, value, 1);
diff --git
a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 47ebe4bdb44..734f744f20f 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -81,12 +81,12 @@ public class NoOpProcessorContext extends
AbstractProcessorContext<Object, Objec
@Override
public <K, V> void forward(final Record<K, V> record) {
- forward(record.key(), record.value());
+ forwardedValues.put(record.key(), record.value());
}
@Override
public <K, V> void forward(final Record<K, V> record, final String
childName) {
- forward(record.key(), record.value());
+ forwardedValues.put(record.key(), record.value());
}
@Override
@@ -96,7 +96,7 @@ public class NoOpProcessorContext extends
AbstractProcessorContext<Object, Objec
@Override
public <K, V> void forward(final K key, final V value, final To to) {
- forward(key, value);
+ forwardedValues.put(key, value);
}
@Override
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 2fc8400239d..a4cee67ad5f 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
@@ -61,6 +62,7 @@ import
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -447,7 +449,6 @@ public class TopologyTestDriver implements Closeable {
streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
);
globalStateTask.initialize();
- globalProcessorContext.setRecordContext(null);
} else {
globalStateManager = null;
globalStateTask = null;
@@ -492,6 +493,7 @@ public class TopologyTestDriver implements Closeable {
streamsMetrics,
cache
);
+ context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1,
null, new RecordHeaders()));
task = new StreamTask(
TASK_ID,
@@ -511,7 +513,6 @@ public class TopologyTestDriver implements Closeable {
);
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
- task.processorContext().setRecordContext(null);
for (final TopicPartition tp: task.inputPartitions()) {
task.updateNextOffsets(tp, new OffsetAndMetadata(0,
Optional.empty(), ""));
}