This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 79a2f89  KAFKA-6966: Extend TopologyDescription to better represent 
Source and (#5284)
79a2f89 is described below

commit 79a2f892caef38f28c3ad837ad28b63dcbb58b87
Author: nprad <[email protected]>
AuthorDate: Fri Aug 10 18:12:03 2018 -0500

    KAFKA-6966: Extend TopologyDescription to better represent Source and 
(#5284)
    
    Implements KIP-321
    
    Reviewers: Matthias J. Sax <[email protected]>, John Roesler 
<[email protected]>, Bill Bejeck <[email protected]>
---
 docs/streams/upgrade-guide.html                    |  8 +++
 .../apache/kafka/streams/TopologyDescription.java  | 25 +++++++++-
 .../internals/InternalTopologyBuilder.java         | 58 +++++++++++++++-------
 .../org/apache/kafka/streams/TopologyTest.java     | 50 ++++++++++++++++++-
 4 files changed, 120 insertions(+), 21 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 34f66ce..35e1f77 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -90,6 +90,14 @@
         We have also removed some public APIs that are deprecated prior to 
1.0.x in 2.0.0.
         See below for a detailed list of removed APIs.
     </p>
+    <h3><a id="streams_api_changes_210" 
href="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
+    <p>
+        We updated <code>TopologyDescription</code> API to allow for better 
runtime checking.
+        Users are encouraged to use <code>#topicSet()</code> and 
<code>#topicPattern()</code> accordingly on 
<code>TopologyDescription.Source</code> nodes,
+        instead of using <code>#topics()</code>, which has since been 
deprecated. Similarly, use <code>#topic()</code> and 
<code>#topicNameExtractor()</code>
+        to get descriptions of <code>TopologyDescription.Sink</code> nodes. 
For more details, see
+        <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes";>KIP-321</a>.
+    </p>
 
     <h3><a id="streams_api_changes_200" 
href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
     <p>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 04a292f..870052d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 
 import java.util.Set;
+import java.util.regex.Pattern;
 
 /**
  * A meta representation of a {@link Topology topology}.
@@ -113,8 +115,22 @@ public interface TopologyDescription {
         /**
          * The topic names this source node is reading from.
          * @return comma separated list of topic names or pattern (as String)
+         * @deprecated use {@link #topicSet()} or {@link #topicPattern()} 
instead
          */
+        @Deprecated
         String topics();
+
+        /**
+         * The topic names this source node is reading from.
+         * @return a set of topic names
+         */
+        Set<String> topicSet();
+
+        /**
+         * The pattern used to match topic names that is reading from.
+         * @return the pattern used to match topic names
+         */
+        Pattern topicPattern();
     }
 
     /**
@@ -134,10 +150,17 @@ public interface TopologyDescription {
     interface Sink extends Node {
         /**
          * The topic name this sink node is writing to.
-         * Could be null if the topic name can only be dynamically determined 
based on {@code TopicNameExtractor}
+         * Could be {@code null} if the topic name can only be dynamically 
determined based on {@link TopicNameExtractor}
          * @return a topic name
          */
         String topic();
+
+        /**
+         * The {@link TopicNameExtractor} class that this sink node uses to 
dynamically extract the topic name to write to.
+         * Could be {@code null} if the topic name is not dynamically 
determined.
+         * @return the {@link TopicNameExtractor} class used get the topic name
+         */
+        TopicNameExtractor topicNameExtractor();
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index b7e4303..99d616f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -282,15 +282,7 @@ public class InternalTopologyBuilder {
 
         @Override
         Source describe() {
-            final String sourceTopics;
-
-            if (pattern == null) {
-                sourceTopics = topics.toString();
-            } else {
-                sourceTopics = pattern.toString();
-            }
-
-            return new Source(name, sourceTopics);
+            return new Source(name, new HashSet<>(topics), pattern);
         }
     }
 
@@ -1337,7 +1329,7 @@ public class InternalTopologyBuilder {
                            final String storeName,
                            final String topicName,
                            final int id) {
-            source = new Source(sourceName, topicName);
+            source = new Source(sourceName, Collections.singleton(topicName), 
null);
             processor = new Processor(processorName, 
Collections.singleton(storeName));
             source.successors.add(processor);
             processor.predecessors.add(source);
@@ -1424,27 +1416,43 @@ public class InternalTopologyBuilder {
     }
 
     public final static class Source extends AbstractNode implements 
TopologyDescription.Source {
-        private final String topics;
+        private final Set<String> topics;
+        private final Pattern topicPattern;
 
         public Source(final String name,
-                      final String topics) {
+                      final Set<String> topics,
+                      final Pattern pattern) {
             super(name);
             this.topics = topics;
+            this.topicPattern = pattern;
         }
 
+        @Deprecated
         @Override
         public String topics() {
+            return topics.toString();
+        }
+
+        @Override
+        public Set<String> topicSet() {
             return topics;
         }
 
         @Override
+        public Pattern topicPattern() {
+            return topicPattern;
+        }
+
+        @Override
         public void addPredecessor(final TopologyDescription.Node predecessor) 
{
             throw new UnsupportedOperationException("Sources don't have 
predecessors.");
         }
 
         @Override
         public String toString() {
-            return "Source: " + name + " (topics: " + topics + ")\n      --> " 
+ nodeNames(successors);
+            final String topicsString = topics == null ? 
topicPattern.toString() : topics.toString();
+            
+            return "Source: " + name + " (topics: " + topicsString + ")\n      
--> " + nodeNames(successors);
         }
 
         @Override
@@ -1459,13 +1467,14 @@ public class InternalTopologyBuilder {
             final Source source = (Source) o;
             // omit successor to avoid infinite loops
             return name.equals(source.name)
-                && topics.equals(source.topics);
+                && topics.equals(source.topics)
+                && topicPattern.equals(source.topicPattern);
         }
 
         @Override
         public int hashCode() {
             // omit successor as it might change and alter the hash code
-            return Objects.hash(name, topics);
+            return Objects.hash(name, topics, topicPattern);
         }
     }
 
@@ -1528,10 +1537,20 @@ public class InternalTopologyBuilder {
 
         @Override
         public String topic() {
-            if (topicNameExtractor instanceof StaticTopicNameExtractor)
+            if (topicNameExtractor instanceof StaticTopicNameExtractor) {
                 return ((StaticTopicNameExtractor) 
topicNameExtractor).topicName;
-            else
+            } else {
                 return null;
+            }
+        }
+
+        @Override
+        public TopicNameExtractor topicNameExtractor() {
+            if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+                return null;
+            } else {
+                return topicNameExtractor;
+            }
         }
 
         @Override
@@ -1541,7 +1560,10 @@ public class InternalTopologyBuilder {
 
         @Override
         public String toString() {
-            return "Sink: " + name + " (topic: " + topic() + ")\n      <-- " + 
nodeNames(predecessors);
+            if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+                return "Sink: " + name + " (topic: " + topic() + ")\n      <-- 
" + nodeNames(predecessors);
+            }
+            return "Sink: " + name + " (extractor class: " + 
topicNameExtractor + ")\n      <-- " + nodeNames(predecessors);
         }
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index ece157c..eeb08ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.RecordContext;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -371,6 +372,23 @@ public class TopologyTest {
     }
 
     @Test
+    public void sinkShouldReturnNullTopicWithDynamicRouting() {
+        final TopologyDescription.Sink expectedSinkNode
+                = new InternalTopologyBuilder.Sink("sink", (key, value, 
record) -> record.topic() + "-" + key);
+
+        assertThat(expectedSinkNode.topic(), equalTo(null));
+    }
+
+    @Test
+    public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
+        final TopicNameExtractor topicNameExtractor = (key, value, record) -> 
record.topic() + "-" + key;
+        final TopologyDescription.Sink expectedSinkNode
+                = new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
+
+        assertThat(expectedSinkNode.topicNameExtractor(), 
equalTo(topicNameExtractor));
+    }
+
+    @Test
     public void singleSourceShouldHaveSingleSubtopology() {
         final TopologyDescription.Source expectedSourceNode = 
addSource("source", "topic");
 
@@ -630,6 +648,34 @@ public class TopologyTest {
     }
 
     @Test
+    public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
+        final StreamsBuilder builder  = new StreamsBuilder();
+
+        final TopicNameExtractor topicNameExtractor = new TopicNameExtractor() 
{
+            @Override
+            public String extract(final Object key, final Object value, final 
RecordContext recordContext) {
+                return recordContext.topic() + "-" + key;
+            }
+
+            @Override
+            public String toString() {
+                return "anonymous topic name extractor. topic is 
[recordContext.topic()]-[key]";
+            }
+        };
+        builder.stream("input-topic").to(topicNameExtractor);
+        final TopologyDescription describe = builder.build().describe();
+
+        assertEquals(
+                "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-SINK-0000000001\n" +
+                "    Sink: KSTREAM-SINK-0000000001 (extractor class: anonymous 
topic name extractor. topic is [recordContext.topic()]-[key])\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+                describe.toString());
+    }
+
+    @Test
     public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
@@ -1048,13 +1094,13 @@ public class TopologyTest {
         for (int i = 1; i < sourceTopic.length; ++i) {
             allSourceTopics.append(", ").append(sourceTopic[i]);
         }
-        return new InternalTopologyBuilder.Source(sourceName, 
allSourceTopics.toString());
+        return new InternalTopologyBuilder.Source(sourceName, new 
HashSet<>(Arrays.asList(sourceTopic)), null);
     }
 
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final Pattern sourcePattern) {
         topology.addSource(null, sourceName, null, null, null, sourcePattern);
-        return new InternalTopologyBuilder.Source(sourceName, 
sourcePattern.toString());
+        return new InternalTopologyBuilder.Source(sourceName, null, 
sourcePattern);
     }
 
     private TopologyDescription.Processor addProcessor(final String 
processorName,

Reply via email to