This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new 17708df KAFKA-8240: Fix NPE in Source.equals() (#6685) 17708df is described below commit 17708df8c6f23794183b0106a622d3657a9e7bd6 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Thu May 9 15:48:11 2019 +0200 KAFKA-8240: Fix NPE in Source.equals() (#6685) - backport of PR #6589 to 2.2 branch Reviewers: Bruno Cadonna <br...@confluent.io>, Bill Bejeck <b...@confluent.io> --- .../internals/InternalTopologyBuilder.java | 25 +++- .../internals/InternalTopologyBuilderTest.java | 144 ++++++++++++++++----- 2 files changed, 132 insertions(+), 37 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 0648fec..5c7203d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -280,7 +280,7 @@ public class InternalTopologyBuilder { @Override Source describe() { - return new Source(name, new HashSet<>(topics), pattern); + return new Source(name, topics.size() == 0 ? null : new HashSet<>(topics), pattern); } } @@ -1310,6 +1310,9 @@ public class InternalTopologyBuilder { @Override public int compare(final TopologyDescription.Node node1, final TopologyDescription.Node node2) { + if (node1.equals(node2)) { + return 0; + } final int size1 = ((AbstractNode) node1).size; final int size2 = ((AbstractNode) node2).size; @@ -1428,6 +1431,7 @@ public class InternalTopologyBuilder { int size; AbstractNode(final String name) { + Objects.requireNonNull(name, "name cannot be null"); this.name = name; this.size = 1; } @@ -1464,6 +1468,13 @@ public class InternalTopologyBuilder { final Set<String> topics, final Pattern pattern) { super(name); + if (topics == null && pattern == null) { + throw new IllegalArgumentException("Either topics or pattern must be not-null, but both are null."); + } + if (topics != null && pattern != null) { + throw new IllegalArgumentException("Either topics or pattern must be null, but both are not null."); + } + this.topics = topics; this.topicPattern = pattern; } @@ -1508,8 +1519,10 @@ public class InternalTopologyBuilder { final Source source = (Source) o; // omit successor to avoid infinite loops return name.equals(source.name) - && topics.equals(source.topics) - && topicPattern.equals(source.topicPattern); + && (topics == null && source.topics == null + || topics != null && topics.equals(source.topics)) + && (topicPattern == null && source.topicPattern == null + || topicPattern != null && topicPattern.pattern().equals(source.topicPattern.pattern())); } @Override @@ -1738,6 +1751,9 @@ public class InternalTopologyBuilder { @Override public int compare(final TopologyDescription.GlobalStore globalStore1, final TopologyDescription.GlobalStore globalStore2) { + if (globalStore1.equals(globalStore2)) { + return 0; + } return globalStore1.id() - globalStore2.id(); } } @@ -1748,6 +1764,9 @@ public class InternalTopologyBuilder { @Override public int compare(final TopologyDescription.Subtopology subtopology1, final TopologyDescription.Subtopology subtopology2) { + if (subtopology1.equals(subtopology2)) { + return 0; + } return subtopology1.id() - subtopology2.id(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 1be5231..552934f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -23,15 +23,13 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.TopologyException; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockKeyValueStoreBuilder; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; @@ -50,12 +48,13 @@ import java.util.regex.Pattern; import static java.time.Duration.ofSeconds; import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.not; import static org.hamcrest.core.IsInstanceOf.instanceOf; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -353,9 +352,9 @@ public class InternalTopologyBuilderTest { final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); - expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); - expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); + expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap())); + expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap())); + expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap())); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -393,17 +392,17 @@ public class InternalTopologyBuilderTest { final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2"); final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3"); expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo( - Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), - Collections.<String, InternalTopicConfig>emptyMap(), - Collections.singletonMap(store1, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store1, Collections.<String, String>emptyMap())))); + Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), + Collections.emptyMap(), + Collections.singletonMap(store1, new UnwindowedChangelogTopicConfig(store1, Collections.emptyMap())))); expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo( - Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), - Collections.<String, InternalTopicConfig>emptyMap(), - Collections.singletonMap(store2, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store2, Collections.<String, String>emptyMap())))); + Collections.emptySet(), mkSet("topic-3", "topic-4"), + Collections.emptyMap(), + Collections.singletonMap(store2, new UnwindowedChangelogTopicConfig(store2, Collections.emptyMap())))); expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo( - Collections.<String>emptySet(), mkSet("topic-5"), - Collections.<String, InternalTopicConfig>emptyMap(), - Collections.singletonMap(store3, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store3, Collections.<String, String>emptyMap())))); + Collections.emptySet(), mkSet("topic-5"), + Collections.emptyMap(), + Collections.singletonMap(store3, new UnwindowedChangelogTopicConfig(store3, Collections.emptyMap())))); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -499,12 +498,7 @@ public class InternalTopologyBuilderTest { @Test(expected = NullPointerException.class) public void shouldNotAllowNullNameWhenAddingProcessor() { - builder.addProcessor(null, new ProcessorSupplier() { - @Override - public Processor get() { - return null; - } - }); + builder.addProcessor(null, () -> null); } @Test(expected = NullPointerException.class) @@ -604,14 +598,14 @@ public class InternalTopologyBuilderTest { final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog"); - final Map<String, String> properties1 = topicConfig1.getProperties(Collections.<String, String>emptyMap(), 10000); + final Map<String, String> properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000); assertEquals(2, properties1.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("appId-store1-changelog", topicConfig1.name()); assertTrue(topicConfig1 instanceof WindowedChangelogTopicConfig); final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog"); - final Map<String, String> properties2 = topicConfig2.getProperties(Collections.<String, String>emptyMap(), 10000); + final Map<String, String> properties2 = topicConfig2.getProperties(Collections.emptyMap(), 10000); assertEquals(2, properties2.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG)); @@ -628,7 +622,7 @@ public class InternalTopologyBuilderTest { final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); - final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000); + final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000); assertEquals(1, properties.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("appId-store-changelog", topicConfig.name()); @@ -642,7 +636,7 @@ public class InternalTopologyBuilderTest { builder.addSource(null, "source", null, null, null, "foo"); final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo"); - final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000); + final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000); assertEquals(5, properties.size()); assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); @@ -708,32 +702,32 @@ public class InternalTopologyBuilderTest { assertTrue(iterator.hasNext()); InternalTopologyBuilder.AbstractNode node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("source1")); + assertEquals("source1", node.name); assertEquals(6, node.size); assertTrue(iterator.hasNext()); node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("source2")); + assertEquals("source2", node.name); assertEquals(4, node.size); assertTrue(iterator.hasNext()); node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("processor2")); + assertEquals("processor2", node.name); assertEquals(3, node.size); assertTrue(iterator.hasNext()); node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("processor1")); + assertEquals("processor1", node.name); assertEquals(2, node.size); assertTrue(iterator.hasNext()); node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("processor3")); + assertEquals("processor3", node.name); assertEquals(2, node.size); assertTrue(iterator.hasNext()); node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("sink1")); + assertEquals("sink1", node.name); assertEquals(1, node.size); } @@ -760,7 +754,7 @@ public class InternalTopologyBuilderTest { final Map<String, List<String>> stateStoreAndTopics = builder.stateStoreNameToSourceTopics(); final List<String> topics = stateStoreAndTopics.get(storeBuilder.name()); - assertTrue("Expected to contain two topics", topics.size() == 2); + assertEquals("Expected to contain two topics", 2, topics.size()); assertTrue(topics.contains("topic-2")); assertTrue(topics.contains("topic-3")); @@ -781,4 +775,86 @@ public class InternalTopologyBuilderTest { sameNameForSourceAndProcessor, new MockProcessorSupplier()); } + + @Test + public void shouldThrowIfNameIsNull() { + try { + new InternalTopologyBuilder.Source(null, Collections.emptySet(), null); + fail("Should have thrown NullPointerException"); + } catch (final NullPointerException expected) { + assertEquals("name cannot be null", expected.getMessage()); + } + } + + @Test + public void shouldThrowIfTopicAndPatternAreNull() { + try { + new InternalTopologyBuilder.Source("name", null, null); + fail("Should have thrown IllegalArgumentException"); + } catch (final IllegalArgumentException expected) { + assertEquals("Either topics or pattern must be not-null, but both are null.", expected.getMessage()); + } + } + + @Test + public void shouldThrowIfBothTopicAndPatternAreNotNull() { + try { + new InternalTopologyBuilder.Source("name", Collections.emptySet(), Pattern.compile("")); + fail("Should have thrown IllegalArgumentException"); + } catch (final IllegalArgumentException expected) { + assertEquals("Either topics or pattern must be null, but both are not null.", expected.getMessage()); + } + } + + @Test + public void sourceShouldBeEqualIfNameAndTopicListAreTheSame() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); + final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); + + assertThat(base, equalTo(sameAsBase)); + } + + @Test + public void sourceShouldBeEqualIfNameAndPatternAreTheSame() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); + final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); + + assertThat(base, equalTo(sameAsBase)); + } + + @Test + public void sourceShouldNotBeEqualForDifferentNamesWithSameTopicList() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); + final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", Collections.singleton("topic"), null); + + assertThat(base, not(equalTo(differentName))); + } + + @Test + public void sourceShouldNotBeEqualForDifferentNamesWithSamePattern() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); + final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", null, Pattern.compile("topic")); + + assertThat(base, not(equalTo(differentName))); + } + + @Test + public void sourceShouldNotBeEqualForDifferentTopicList() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); + final InternalTopologyBuilder.Source differentTopicList = new InternalTopologyBuilder.Source("name", Collections.emptySet(), null); + final InternalTopologyBuilder.Source differentTopic = new InternalTopologyBuilder.Source("name", Collections.singleton("topic2"), null); + + assertThat(base, not(equalTo(differentTopicList))); + assertThat(base, not(equalTo(differentTopic))); + } + + @Test + public void sourceShouldNotBeEqualForDifferentPattern() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); + final InternalTopologyBuilder.Source differentPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic2")); + final InternalTopologyBuilder.Source overlappingPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("top*")); + + assertThat(base, not(equalTo(differentPattern))); + assertThat(base, not(equalTo(overlappingPattern))); + } }