This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ce6f0781929 MINOR: fix NPE in KS `Topology` for new `AutoOffsetReset`
(#18780)
ce6f0781929 is described below
commit ce6f0781929c568b11893e934783ac39a1d474b1
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Feb 3 17:24:47 2025 -0800
MINOR: fix NPE in KS `Topology` for new `AutoOffsetReset` (#18780)
Introduced via KIP-1106.
Reviewers: Lucas Brutschy <[email protected]>
---
.../java/org/apache/kafka/streams/Topology.java | 72 +++++++++++++++++++---
.../org/apache/kafka/streams/TopologyTest.java | 25 ++++----
2 files changed, 76 insertions(+), 21 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index fffca97400e..9a25afd4350 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -169,7 +169,14 @@ public class Topology {
public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final String... topics) {
- internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, null, null, null, topics);
+ internalTopologyBuilder.addSource(
+ offsetReset == null ? null : new
AutoOffsetResetInternal(offsetReset),
+ name,
+ null,
+ null,
+ null,
+ topics
+ );
return this;
}
@@ -215,7 +222,14 @@ public class Topology {
public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, null, null, null, topicPattern);
+ internalTopologyBuilder.addSource(
+ offsetReset == null ? null : new
AutoOffsetResetInternal(offsetReset),
+ name,
+ null,
+ null,
+ null,
+ topicPattern
+ );
return this;
}
@@ -304,7 +318,14 @@ public class Topology {
final TimestampExtractor
timestampExtractor,
final String name,
final String... topics) {
- internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null,
topics);
+ internalTopologyBuilder.addSource(
+ offsetReset == null ? null : new
AutoOffsetResetInternal(offsetReset),
+ name,
+ timestampExtractor,
+ null,
+ null,
+ topics
+ );
return this;
}
@@ -351,7 +372,14 @@ public class Topology {
final TimestampExtractor
timestampExtractor,
final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null,
topicPattern);
+ internalTopologyBuilder.addSource(
+ offsetReset == null ? null : new
AutoOffsetResetInternal(offsetReset),
+ name,
+ timestampExtractor,
+ null,
+ null,
+ topicPattern
+ );
return this;
}
@@ -457,7 +485,14 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final String... topics) {
- internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer,
valueDeserializer, topics);
+ internalTopologyBuilder.addSource(
+ offsetReset == null ? null : new
AutoOffsetResetInternal(offsetReset),
+ name,
+ null,
+ keyDeserializer,
+ valueDeserializer,
+ topics
+ );
return this;
}
@@ -514,7 +549,14 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer,
valueDeserializer, topicPattern);
+ internalTopologyBuilder.addSource(
+ offsetReset == null ? null : new
AutoOffsetResetInternal(offsetReset),
+ name,
+ null,
+ keyDeserializer,
+ valueDeserializer,
+ topicPattern
+ );
return this;
}
@@ -571,7 +613,14 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final String... topics) {
- internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, timestampExtractor,
keyDeserializer, valueDeserializer, topics);
+ internalTopologyBuilder.addSource(
+ offsetReset == null ? null : new
AutoOffsetResetInternal(offsetReset),
+ name,
+ timestampExtractor,
+ keyDeserializer,
+ valueDeserializer,
+ topics
+ );
return this;
}
@@ -634,7 +683,14 @@ public class Topology {
final Deserializer<?>
keyDeserializer,
final Deserializer<?>
valueDeserializer,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(new
AutoOffsetResetInternal(offsetReset), name, timestampExtractor,
keyDeserializer, valueDeserializer, topicPattern);
+ internalTopologyBuilder.addSource(
+ offsetReset == null ? null : new
AutoOffsetResetInternal(offsetReset),
+ name,
+ timestampExtractor,
+ keyDeserializer,
+ valueDeserializer,
+ topicPattern
+ );
return this;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 0dc0179c6e5..0cb91b12a58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -146,8 +146,7 @@ public class TopologyTest {
@Test
public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
- assertThrows(NullPointerException.class, () ->
topology.addProcessor("name",
- (ProcessorSupplier<Object, Object, Object, Object>) null));
+ assertThrows(NullPointerException.class, () ->
topology.addProcessor("name", null));
}
@Test
@@ -376,6 +375,7 @@ public class TopologyTest {
}
}
+ @SuppressWarnings("resource")
@Test
public void shouldThrowOnUnassignedStateStoreAccess() {
final String sourceNodeName = "source";
@@ -411,7 +411,7 @@ public class TopologyTest {
@Override
public Processor<Object, Object, Object, Object> get() {
- return new Processor<Object, Object, Object, Object>() {
+ return new Processor<>() {
@Override
public void init(final ProcessorContext<Object, Object>
context) {
context.getStateStore(STORE_NAME);
@@ -1157,7 +1157,7 @@ public class TopologyTest {
public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
final StreamsBuilder builder = new StreamsBuilder();
- final TopicNameExtractor<Object, Object> topicNameExtractor = new
TopicNameExtractor<Object, Object>() {
+ final TopicNameExtractor<Object, Object> topicNameExtractor = new
TopicNameExtractor<>() {
@Override
public String extract(final Object key, final Object value, final
RecordContext recordContext) {
return recordContext.topic() + "-" + key;
@@ -2257,16 +2257,16 @@ public class TopologyTest {
private TopologyDescription.Source addSource(final String sourceName,
final String... sourceTopic) {
- topology.addSource((Topology.AutoOffsetReset) null, sourceName, null,
null, null, sourceTopic);
- final StringBuilder allSourceTopics = new
StringBuilder(sourceTopic[0]);
- for (int i = 1; i < sourceTopic.length; ++i) {
- allSourceTopics.append(", ").append(sourceTopic[i]);
- }
+ topology.addSource((AutoOffsetReset) null, sourceName, null, null,
null, sourceTopic);
return new InternalTopologyBuilder.Source(sourceName, new
HashSet<>(Arrays.asList(sourceTopic)), null);
}
+ @SuppressWarnings("deprecation")
private TopologyDescription.Source addSource(final String sourceName,
final Pattern sourcePattern) {
+ // we still test the old `Topology.AutoOffsetReset` here, to increase
test coverage
+ // (cf `addSource` about which used the new one)
+ // When can rewrite this to the new one, when the old one is removed
topology.addSource((Topology.AutoOffsetReset) null, sourceName, null,
null, null, sourcePattern);
return new InternalTopologyBuilder.Source(sourceName, null,
sourcePattern);
}
@@ -2338,7 +2338,6 @@ public class TopologyTest {
return expectedSinkNode;
}
- @Deprecated // testing old PAPI
private void addGlobalStoreToTopologyAndExpectedDescription(final String
globalStoreName,
final String
sourceName,
final String
globalTopicName,
@@ -2441,17 +2440,17 @@ public class TopologyTest {
topology.addSource("source", "topic");
topology.addProcessor(
"p1",
- () -> (Processor<Object, Object, Object, Object>) record ->
System.out.println("Processing: " + random.nextInt()),
+ () -> record -> System.out.println("Processing: " +
random.nextInt()),
"source"
);
topology.addProcessor(
"p2",
- () -> (Processor<Object, Object, Object, Object>) record ->
System.out.println("Processing: " + random.nextInt()),
+ () -> record -> System.out.println("Processing: " +
random.nextInt()),
"p1"
);
topology.addProcessor(
"p3",
- () -> (Processor<Object, Object, Object, Object>) record ->
System.out.println("Processing: " + random.nextInt()),
+ () -> record -> System.out.println("Processing: " +
random.nextInt()),
"p2"
);
assertThat(counter.numWrappedProcessors(), is(3));