This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c635a76480c KAFKA-19638: Set dummy ProcessorRecordContext for
processor init (#20403)
c635a76480c is described below
commit c635a76480c2d5f7d86af9e61325d41e0c2219ef
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Fri Nov 14 14:50:05 2025 -0500
KAFKA-19638: Set dummy ProcessorRecordContext for processor init (#20403)
This PR fixes a `NullPointerException` in the `Processor#init` when the
topology is initializing. This issue was introduced via KAFKA-13722
(commit reverted for 4.1.0 release).
As part of the testing strategy, I added a new test case for the
`ProcessorTopologyTest` class, and also manually tested.
Reviewers: Matthias J. Sax <[email protected]>
---
.../streams/processor/internals/AbstractTask.java | 2 ++
.../streams/processor/internals/StreamTask.java | 5 +--
.../processor/internals/ProcessorTopologyTest.java | 42 ++++++++++++++++++++++
.../apache/kafka/streams/TopologyTestDriver.java | 2 +-
4 files changed, 48 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 7f33c4693b8..e9f182cf56d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -40,6 +40,8 @@ import static
org.apache.kafka.streams.processor.internals.Task.State.CREATED;
public abstract class AbstractTask implements Task {
private static final long NO_DEADLINE = -1L;
+ protected static final long NO_OFFSET = -1;
+ protected static final int NO_PARTITION = -1;
private Task.State state = CREATED;
private long deadlineMs = NO_DEADLINE;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 443ad5b0e9e..4ab672fd0c0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1138,11 +1138,12 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// initialize the task by initializing all its processor nodes in the
topology
log.trace("Initializing processor nodes of the topology");
for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
- processorContext.setCurrentNode(node);
+ final ProcessorRecordContext initContext = new
ProcessorRecordContext(time.milliseconds(), NO_OFFSET, NO_PARTITION, null, new
RecordHeaders());
+ updateProcessorContext(node, time.milliseconds(), initContext);
try {
node.init(processorContext, processingExceptionHandler);
} finally {
- processorContext.setCurrentNode(null);
+ updateProcessorContext(null, ConsumerRecord.NO_TIMESTAMP,
null);
}
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index a578e5b25f2..e77891bb531 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -867,6 +867,23 @@ public class ProcessorTopologyTest {
equalTo(new TestRecord<>("key3", "value3", null, 3000L)));
}
+
+ @Test
+ public void testTopologyInitializationWithInitialKeyAndValue() {
+ final String initialKey = "key1";
+ final String initialValue = "value1";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME),
Serdes.String(), Serdes.String());
+ topology.addSource("source1", STRING_DESERIALIZER,
STRING_DESERIALIZER, INPUT_TOPIC_1);
+ topology.addProcessor("processor1", defineWithStores(() -> new
StatefulProcessorWithInitialization(DEFAULT_STORE_NAME, initialKey,
initialValue), Collections.singleton(storeBuilder)), "source1");
+ driver = new TopologyTestDriver(topology, props);
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+ final List<KeyValue<String, String>> results =
prefixScanResults(store, DEFAULT_PREFIX);
+ assertEquals(1, results.size());
+ assertEquals(initialValue, results.get(0).value);
+ assertEquals(initialKey, results.get(0).key);
+ }
+
@Test
public void shouldCreateStringWithSourceAndTopics() {
topology.addSource("source", "topic1", "topic2");
@@ -1264,6 +1281,31 @@ public class ProcessorTopologyTest {
}
}
+ private static class StatefulProcessorWithInitialization implements
Processor<String, String, Void, Void> {
+ private KeyValueStore<String, String> store;
+ private final String storeName;
+ private final String initialKey;
+ private final String initialValue;
+
+ public StatefulProcessorWithInitialization(final String storeName,
final String initialKey, final String initialValue) {
+ this.storeName = storeName;
+ this.initialKey = initialKey;
+ this.initialValue = initialValue;
+ }
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(storeName);
+ store.put(initialKey, initialValue);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ store.put(record.key(), record.value());
+ }
+ }
+
+
/**
* A processor that stores each key-value pair in an in-memory key-value
store registered with the context.
*/
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 9b812b389fa..6488dfabfb7 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -494,7 +494,6 @@ public class TopologyTestDriver implements Closeable {
streamsMetrics,
cache
);
- context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1,
null, new RecordHeaders()));
task = new StreamTask(
TASK_ID,
@@ -920,6 +919,7 @@ public class TopologyTestDriver implements Closeable {
private StateStore getStateStore(final String name,
final boolean throwForBuiltInStores) {
if (task != null) {
+ task.processorContext().setRecordContext(new
ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
final StateStore stateStore = ((ProcessorContextImpl)
task.processorContext()).stateManager().store(name);
if (stateStore != null) {
if (throwForBuiltInStores) {