This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f7d2b1b KAFKA-7885: TopologyDescription violates equals-hashCode
contract. (#6210)
f7d2b1b is described below
commit f7d2b1baf7745b0f505edd186e1731cc5c9f382c
Author: Piotr Fras <[email protected]>
AuthorDate: Wed Apr 15 23:50:39 2020 +0100
KAFKA-7885: TopologyDescription violates equals-hashCode contract. (#6210)
Reviewers: Matthias J. Sax <[email protected]>, John Roesler
<[email protected]>, Bill Bejeck <[email protected]>
---
.../internals/StaticTopicNameExtractor.java | 19 ++++++++++++++
.../org/apache/kafka/streams/TopologyTest.java | 29 ++++++++++++++++++++++
2 files changed, 48 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
index c525112..f38c547 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
+import java.util.Objects;
+
/**
* Static topic name extractor
*/
@@ -38,4 +40,21 @@ public class StaticTopicNameExtractor<K, V> implements
TopicNameExtractor<K, V>
public String toString() {
return "StaticTopicNameExtractor(" + topicName + ")";
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final StaticTopicNameExtractor<?, ?> that =
(StaticTopicNameExtractor<?, ?>) o;
+ return Objects.equals(topicName, that.topicName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicName);
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index d0147e1..7e324f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -398,6 +398,7 @@ public class TopologyTest {
Collections.singleton(expectedSourceNode)));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -409,6 +410,7 @@ public class TopologyTest {
Collections.singleton(expectedSourceNode)));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -420,6 +422,7 @@ public class TopologyTest {
Collections.singleton(expectedSourceNode)));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -440,6 +443,7 @@ public class TopologyTest {
Collections.singleton(expectedSourceNode3)));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -453,6 +457,7 @@ public class TopologyTest {
expectedDescription.addSubtopology(new
InternalTopologyBuilder.Subtopology(0, allNodes));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -468,6 +473,7 @@ public class TopologyTest {
expectedDescription.addSubtopology(new
InternalTopologyBuilder.Subtopology(0, allNodes));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@@ -484,6 +490,7 @@ public class TopologyTest {
expectedDescription.addSubtopology(new
InternalTopologyBuilder.Subtopology(0, allNodes));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -499,6 +506,7 @@ public class TopologyTest {
expectedDescription.addSubtopology(new
InternalTopologyBuilder.Subtopology(0, allNodes));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -514,6 +522,7 @@ public class TopologyTest {
expectedDescription.addSubtopology(new
InternalTopologyBuilder.Subtopology(0, allNodes));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -543,6 +552,7 @@ public class TopologyTest {
expectedDescription.addSubtopology(new
InternalTopologyBuilder.Subtopology(2, allNodes3));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -572,6 +582,7 @@ public class TopologyTest {
expectedDescription.addSubtopology(new
InternalTopologyBuilder.Subtopology(2, allNodes3));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -603,6 +614,7 @@ public class TopologyTest {
expectedDescription.addSubtopology(new
InternalTopologyBuilder.Subtopology(0, allNodes));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -633,12 +645,14 @@ public class TopologyTest {
expectedDescription.addSubtopology(new
InternalTopologyBuilder.Subtopology(0, allNodes));
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
public void shouldDescribeGlobalStoreTopology() {
addGlobalStoreToTopologyAndExpectedDescription("globalStore",
"source", "globalTopic", "processor", 0);
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -646,6 +660,7 @@ public class TopologyTest {
addGlobalStoreToTopologyAndExpectedDescription("globalStore1",
"source1", "globalTopic1", "processor1", 0);
addGlobalStoreToTopologyAndExpectedDescription("globalStore2",
"source2", "globalTopic2", "processor2", 1);
assertThat(topology.describe(), equalTo(expectedDescription));
+ assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
@Test
@@ -1085,6 +1100,20 @@ public class TopologyTest {
describe.toString());
}
+ @Test
+ public void
topologyWithStaticTopicNameExtractorShouldRespectEqualHashcodeContract() {
+ final Topology topologyA = topologyWithStaticTopicName();
+ final Topology topologyB = topologyWithStaticTopicName();
+ assertThat(topologyA.describe(), equalTo(topologyB.describe()));
+ assertThat(topologyA.describe().hashCode(),
equalTo(topologyB.describe().hashCode()));
+ }
+
+ private Topology topologyWithStaticTopicName() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream("from-topic-name").to("to-topic-name");
+ return builder.build();
+ }
+
private TopologyDescription.Source addSource(final String sourceName,
final String... sourceTopic) {
topology.addSource(null, sourceName, null, null, null, sourceTopic);