This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 9ef4d5ab867 KAFKA-18917: TransformValues throws NPE (#19089)
9ef4d5ab867 is described below

commit 9ef4d5ab867ca0dc2a9010770ea483e9724a40ba
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Mar 4 17:49:47 2025 +0100

    KAFKA-18917: TransformValues throws NPE (#19089)
    
    When `transformValues` is used with a `Materialized` instance, but
    without a queryable name, a `NullPointerException` is thrown. To
    preserve the semantics present in 3.9, we need to avoid materialization
    when a queryable name is not present.
    
    Reviewers: Bruno Cadonna <[email protected]>
---
 .../streams/kstream/internals/KTableImpl.java      |  8 +++++--
 .../internals/KTableTransformValuesTest.java       | 26 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 06a6043ffd3..8a0e647de93 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -450,8 +450,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
             valueSerde = materializedInternal.valueSerde();
             queryableStoreName = materializedInternal.queryableStoreName();
             // only materialize if materialized is specified and it has 
queryable name
-            final StoreFactory storeFactory = queryableStoreName != null ? 
(new KeyValueStoreMaterializer<>(materializedInternal)) : null;
-            storeBuilder = Collections.singleton(new 
FactoryWrappingStoreBuilder<>(storeFactory));
+            if (queryableStoreName != null) {
+                final StoreFactory storeFactory = new 
KeyValueStoreMaterializer<>(materializedInternal);
+                storeBuilder = Collections.singleton(new 
FactoryWrappingStoreBuilder<>(storeFactory));
+            } else {
+                storeBuilder = null;
+            }
         } else {
             keySerde = this.keySerde;
             valueSerde = null;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 402a27e461a..d03bf99da93 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -436,6 +436,32 @@ public class KTableTransformValuesTest {
                 new KeyValueTimestamp<>("A", "3", 15))));
     }
 
+    @Test
+    public void 
shouldCalculateCorrectOldValuesIfNotStatefulEvenNotMaterializedNoQueryableName()
 {
+        builder
+            .table(INPUT_TOPIC, CONSUMED)
+            .transformValues(new StatelessTransformerSupplier(),
+                Materialized.with(Serdes.String(), Serdes.Integer())
+            )
+            .groupBy(toForceSendingOfOldValues(), 
Grouped.with(Serdes.String(), Serdes.Integer()))
+            .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
+            .mapValues(mapBackToStrings())
+            .toStream()
+            .process(capture);
+
+        driver = new TopologyTestDriver(builder.build(), props());
+        final TestInputTopic<String, String> inputTopic =
+            driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new 
StringSerializer());
+
+        inputTopic.pipeInput("A", "a", 5L);
+        inputTopic.pipeInput("A", "aa", 15L);
+        inputTopic.pipeInput("A", "aaa", 10);
+
+        assertThat(output(), equalTo(Arrays.asList(new 
KeyValueTimestamp<>("A", "1", 5),
+            new KeyValueTimestamp<>("A", "2", 15),
+            new KeyValueTimestamp<>("A", "3", 15))));
+    }
+
     private ArrayList<KeyValueTimestamp<String, String>> output() {
         return capture.capturedProcessors(1).get(0).processed();
     }

Reply via email to