This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c635a76480c KAFKA-19638: Set dummy ProcessorRecordContext for 
processor init (#20403)
c635a76480c is described below

commit c635a76480c2d5f7d86af9e61325d41e0c2219ef
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Fri Nov 14 14:50:05 2025 -0500

    KAFKA-19638: Set dummy ProcessorRecordContext for processor init (#20403)
    
    This PR fixes a `NullPointerException` in the `Processor#init` when the
    topology is initializing. This issue was introduced via KAFKA-13722
    (commit reverted for 4.1.0 release).
    
    As part of the testing strategy, I added a new test case for the
    `ProcessorTopologyTest` class, and also manually tested.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../streams/processor/internals/AbstractTask.java  |  2 ++
 .../streams/processor/internals/StreamTask.java    |  5 +--
 .../processor/internals/ProcessorTopologyTest.java | 42 ++++++++++++++++++++++
 .../apache/kafka/streams/TopologyTestDriver.java   |  2 +-
 4 files changed, 48 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 7f33c4693b8..e9f182cf56d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -40,6 +40,8 @@ import static 
org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
     private static final long NO_DEADLINE = -1L;
+    protected static final long NO_OFFSET = -1;
+    protected static final int NO_PARTITION = -1;
 
     private Task.State state = CREATED;
     private long deadlineMs = NO_DEADLINE;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 443ad5b0e9e..4ab672fd0c0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1138,11 +1138,12 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         // initialize the task by initializing all its processor nodes in the 
topology
         log.trace("Initializing processor nodes of the topology");
         for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
-            processorContext.setCurrentNode(node);
+            final ProcessorRecordContext initContext = new 
ProcessorRecordContext(time.milliseconds(), NO_OFFSET, NO_PARTITION, null, new 
RecordHeaders());
+            updateProcessorContext(node, time.milliseconds(), initContext);
             try {
                 node.init(processorContext, processingExceptionHandler);
             } finally {
-                processorContext.setCurrentNode(null);
+                updateProcessorContext(null, ConsumerRecord.NO_TIMESTAMP, 
null);
             }
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index a578e5b25f2..e77891bb531 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -867,6 +867,23 @@ public class ProcessorTopologyTest {
                 equalTo(new TestRecord<>("key3", "value3", null, 3000L)));
     }
 
+
+    @Test
+    public void testTopologyInitializationWithInitialKeyAndValue() {
+        final String initialKey = "key1";
+        final String initialValue = "value1";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+                
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), 
Serdes.String(), Serdes.String());
+        topology.addSource("source1", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1);
+        topology.addProcessor("processor1", defineWithStores(() -> new 
StatefulProcessorWithInitialization(DEFAULT_STORE_NAME, initialKey, 
initialValue), Collections.singleton(storeBuilder)), "source1");
+        driver = new TopologyTestDriver(topology, props);
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(DEFAULT_STORE_NAME);
+        final List<KeyValue<String, String>> results = 
prefixScanResults(store, DEFAULT_PREFIX);
+        assertEquals(1, results.size());
+        assertEquals(initialValue, results.get(0).value);
+        assertEquals(initialKey, results.get(0).key);
+    }
+
     @Test
     public void shouldCreateStringWithSourceAndTopics() {
         topology.addSource("source", "topic1", "topic2");
@@ -1264,6 +1281,31 @@ public class ProcessorTopologyTest {
         }
     }
 
+    private static class StatefulProcessorWithInitialization implements 
Processor<String, String, Void, Void> {
+        private KeyValueStore<String, String> store;
+        private final String storeName;
+        private final String initialKey;
+        private final String initialValue;
+
+        public StatefulProcessorWithInitialization(final String storeName, 
final String initialKey, final String initialValue) {
+            this.storeName = storeName;
+            this.initialKey = initialKey;
+            this.initialValue = initialValue;
+        }
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(storeName);
+            store.put(initialKey, initialValue);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            store.put(record.key(), record.value());
+        }
+    }
+
+
     /**
      * A processor that stores each key-value pair in an in-memory key-value 
store registered with the context.
      */
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 9b812b389fa..6488dfabfb7 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -494,7 +494,6 @@ public class TopologyTestDriver implements Closeable {
                 streamsMetrics,
                 cache
             );
-            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, 
null, new RecordHeaders()));
 
             task = new StreamTask(
                 TASK_ID,
@@ -920,6 +919,7 @@ public class TopologyTestDriver implements Closeable {
     private StateStore getStateStore(final String name,
                                      final boolean throwForBuiltInStores) {
         if (task != null) {
+            task.processorContext().setRecordContext(new 
ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
             final StateStore stateStore = ((ProcessorContextImpl) 
task.processorContext()).stateManager().store(name);
             if (stateStore != null) {
                 if (throwForBuiltInStores) {

Reply via email to