This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce6f0781929 MINOR: fix NPE in KS `Topology` for new `AutoOffsetReset` 
(#18780)
ce6f0781929 is described below

commit ce6f0781929c568b11893e934783ac39a1d474b1
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Feb 3 17:24:47 2025 -0800

    MINOR: fix NPE in KS `Topology` for new `AutoOffsetReset` (#18780)
    
    Introduced via KIP-1106.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../java/org/apache/kafka/streams/Topology.java    | 72 +++++++++++++++++++---
 .../org/apache/kafka/streams/TopologyTest.java     | 25 ++++----
 2 files changed, 76 insertions(+), 21 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index fffca97400e..9a25afd4350 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -169,7 +169,14 @@ public class Topology {
     public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
                                            final String name,
                                            final String... topics) {
-        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, null, null, null, topics);
+        internalTopologyBuilder.addSource(
+            offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
+            name,
+            null,
+            null,
+            null,
+            topics
+        );
         return this;
     }
 
@@ -215,7 +222,14 @@ public class Topology {
     public synchronized Topology addSource(final 
org.apache.kafka.streams.AutoOffsetReset offsetReset,
                                            final String name,
                                            final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, null, null, null, topicPattern);
+        internalTopologyBuilder.addSource(
+            offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
+            name,
+            null,
+            null,
+            null,
+            topicPattern
+        );
         return this;
     }
 
@@ -304,7 +318,14 @@ public class Topology {
                                            final TimestampExtractor 
timestampExtractor,
                                            final String name,
                                            final String... topics) {
-        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null, 
topics);
+        internalTopologyBuilder.addSource(
+            offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
+            name,
+            timestampExtractor,
+            null,
+            null,
+            topics
+        );
         return this;
     }
 
@@ -351,7 +372,14 @@ public class Topology {
                                            final TimestampExtractor 
timestampExtractor,
                                            final String name,
                                            final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null, 
topicPattern);
+        internalTopologyBuilder.addSource(
+            offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
+            name,
+            timestampExtractor,
+            null,
+            null,
+            topicPattern
+        );
         return this;
     }
 
@@ -457,7 +485,14 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final String... topics) {
-        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer, 
valueDeserializer, topics);
+        internalTopologyBuilder.addSource(
+            offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
+            name,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topics
+        );
         return this;
     }
 
@@ -514,7 +549,14 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer, 
valueDeserializer, topicPattern);
+        internalTopologyBuilder.addSource(
+            offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
+            name,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topicPattern
+        );
         return this;
     }
 
@@ -571,7 +613,14 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final String... topics) {
-        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, 
keyDeserializer, valueDeserializer, topics);
+        internalTopologyBuilder.addSource(
+            offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
+            name,
+            timestampExtractor,
+            keyDeserializer,
+            valueDeserializer,
+            topics
+        );
         return this;
     }
 
@@ -634,7 +683,14 @@ public class Topology {
                                            final Deserializer<?> 
keyDeserializer,
                                            final Deserializer<?> 
valueDeserializer,
                                            final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(new 
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, 
keyDeserializer, valueDeserializer, topicPattern);
+        internalTopologyBuilder.addSource(
+            offsetReset == null ? null : new 
AutoOffsetResetInternal(offsetReset),
+            name,
+            timestampExtractor,
+            keyDeserializer,
+            valueDeserializer,
+            topicPattern
+        );
         return this;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 0dc0179c6e5..0cb91b12a58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -146,8 +146,7 @@ public class TopologyTest {
 
     @Test
     public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
-        assertThrows(NullPointerException.class, () -> 
topology.addProcessor("name",
-            (ProcessorSupplier<Object, Object, Object, Object>) null));
+        assertThrows(NullPointerException.class, () -> 
topology.addProcessor("name", null));
     }
 
     @Test
@@ -376,6 +375,7 @@ public class TopologyTest {
         }
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void shouldThrowOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
@@ -411,7 +411,7 @@ public class TopologyTest {
 
         @Override
         public Processor<Object, Object, Object, Object> get() {
-            return new Processor<Object, Object, Object, Object>() {
+            return new Processor<>() {
                 @Override
                 public void init(final ProcessorContext<Object, Object> 
context) {
                     context.getStateStore(STORE_NAME);
@@ -1157,7 +1157,7 @@ public class TopologyTest {
     public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
         final StreamsBuilder builder  = new StreamsBuilder();
 
-        final TopicNameExtractor<Object, Object> topicNameExtractor = new 
TopicNameExtractor<Object, Object>() {
+        final TopicNameExtractor<Object, Object> topicNameExtractor = new 
TopicNameExtractor<>() {
             @Override
             public String extract(final Object key, final Object value, final 
RecordContext recordContext) {
                 return recordContext.topic() + "-" + key;
@@ -2257,16 +2257,16 @@ public class TopologyTest {
 
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final String... sourceTopic) {
-        topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, 
null, null, sourceTopic);
-        final StringBuilder allSourceTopics = new 
StringBuilder(sourceTopic[0]);
-        for (int i = 1; i < sourceTopic.length; ++i) {
-            allSourceTopics.append(", ").append(sourceTopic[i]);
-        }
+        topology.addSource((AutoOffsetReset) null, sourceName, null, null, 
null, sourceTopic);
         return new InternalTopologyBuilder.Source(sourceName, new 
HashSet<>(Arrays.asList(sourceTopic)), null);
     }
 
+    @SuppressWarnings("deprecation")
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final Pattern sourcePattern) {
+        // we still test the old `Topology.AutoOffsetReset` here, to increase 
test coverage
+        // (cf `addSource` about which used the new one)
+        // When can rewrite this to the new one, when the old one is removed
         topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, 
null, null, sourcePattern);
         return new InternalTopologyBuilder.Source(sourceName, null, 
sourcePattern);
     }
@@ -2338,7 +2338,6 @@ public class TopologyTest {
         return expectedSinkNode;
     }
 
-    @Deprecated // testing old PAPI
     private void addGlobalStoreToTopologyAndExpectedDescription(final String 
globalStoreName,
                                                                 final String 
sourceName,
                                                                 final String 
globalTopicName,
@@ -2441,17 +2440,17 @@ public class TopologyTest {
         topology.addSource("source", "topic");
         topology.addProcessor(
             "p1",
-            () -> (Processor<Object, Object, Object, Object>) record -> 
System.out.println("Processing: " + random.nextInt()),
+            () -> record -> System.out.println("Processing: " + 
random.nextInt()),
             "source"
         );
         topology.addProcessor(
             "p2",
-            () -> (Processor<Object, Object, Object, Object>) record -> 
System.out.println("Processing: " + random.nextInt()),
+            () -> record -> System.out.println("Processing: " + 
random.nextInt()),
             "p1"
         );
         topology.addProcessor(
             "p3",
-            () -> (Processor<Object, Object, Object, Object>) record -> 
System.out.println("Processing: " + random.nextInt()),
+            () -> record -> System.out.println("Processing: " + 
random.nextInt()),
             "p2"
         );
         assertThat(counter.numWrappedProcessors(), is(3));

Reply via email to