This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f7d2b1b  KAFKA-7885: TopologyDescription violates equals-hashCode 
contract. (#6210)
f7d2b1b is described below

commit f7d2b1baf7745b0f505edd186e1731cc5c9f382c
Author: Piotr Fras <[email protected]>
AuthorDate: Wed Apr 15 23:50:39 2020 +0100

    KAFKA-7885: TopologyDescription violates equals-hashCode contract. (#6210)
    
    Reviewers: Matthias J. Sax <[email protected]>, John Roesler 
<[email protected]>, Bill Bejeck <[email protected]>
---
 .../internals/StaticTopicNameExtractor.java        | 19 ++++++++++++++
 .../org/apache/kafka/streams/TopologyTest.java     | 29 ++++++++++++++++++++++
 2 files changed, 48 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
index c525112..f38c547 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.streams.processor.RecordContext;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 
+import java.util.Objects;
+
 /**
  * Static topic name extractor
  */
@@ -38,4 +40,21 @@ public class StaticTopicNameExtractor<K, V> implements 
TopicNameExtractor<K, V>
     public String toString() {
         return "StaticTopicNameExtractor(" + topicName + ")";
     }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final StaticTopicNameExtractor<?, ?> that = 
(StaticTopicNameExtractor<?, ?>) o;
+        return Objects.equals(topicName, that.topicName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topicName);
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index d0147e1..7e324f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -398,6 +398,7 @@ public class TopologyTest {
                 Collections.singleton(expectedSourceNode)));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -409,6 +410,7 @@ public class TopologyTest {
                 Collections.singleton(expectedSourceNode)));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -420,6 +422,7 @@ public class TopologyTest {
                 Collections.singleton(expectedSourceNode)));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -440,6 +443,7 @@ public class TopologyTest {
                 Collections.singleton(expectedSourceNode3)));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -453,6 +457,7 @@ public class TopologyTest {
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -468,6 +473,7 @@ public class TopologyTest {
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
 
@@ -484,6 +490,7 @@ public class TopologyTest {
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -499,6 +506,7 @@ public class TopologyTest {
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -514,6 +522,7 @@ public class TopologyTest {
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -543,6 +552,7 @@ public class TopologyTest {
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(2, allNodes3));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -572,6 +582,7 @@ public class TopologyTest {
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(2, allNodes3));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -603,6 +614,7 @@ public class TopologyTest {
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -633,12 +645,14 @@ public class TopologyTest {
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
     public void shouldDescribeGlobalStoreTopology() {
         addGlobalStoreToTopologyAndExpectedDescription("globalStore", 
"source", "globalTopic", "processor", 0);
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -646,6 +660,7 @@ public class TopologyTest {
         addGlobalStoreToTopologyAndExpectedDescription("globalStore1", 
"source1", "globalTopic1", "processor1", 0);
         addGlobalStoreToTopologyAndExpectedDescription("globalStore2", 
"source2", "globalTopic2", "processor2", 1);
         assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
     @Test
@@ -1085,6 +1100,20 @@ public class TopologyTest {
             describe.toString());
     }
 
+    @Test
+    public void 
topologyWithStaticTopicNameExtractorShouldRespectEqualHashcodeContract() {
+        final Topology topologyA = topologyWithStaticTopicName();
+        final Topology topologyB = topologyWithStaticTopicName();
+        assertThat(topologyA.describe(), equalTo(topologyB.describe()));
+        assertThat(topologyA.describe().hashCode(), 
equalTo(topologyB.describe().hashCode()));
+    }
+
+    private Topology topologyWithStaticTopicName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("from-topic-name").to("to-topic-name");
+        return builder.build();
+    }
+
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final String... sourceTopic) {
         topology.addSource(null, sourceName, null, null, null, sourceTopic);

Reply via email to