This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 79a2f89 KAFKA-6966: Extend TopologyDescription to better represent
Source and (#5284)
79a2f89 is described below
commit 79a2f892caef38f28c3ad837ad28b63dcbb58b87
Author: nprad <[email protected]>
AuthorDate: Fri Aug 10 18:12:03 2018 -0500
KAFKA-6966: Extend TopologyDescription to better represent Source and
(#5284)
Implements KIP-321
Reviewers: Matthias J. Sax <[email protected]>, John Roesler
<[email protected]>, Bill Bejeck <[email protected]>
---
docs/streams/upgrade-guide.html | 8 +++
.../apache/kafka/streams/TopologyDescription.java | 25 +++++++++-
.../internals/InternalTopologyBuilder.java | 58 +++++++++++++++-------
.../org/apache/kafka/streams/TopologyTest.java | 50 ++++++++++++++++++-
4 files changed, 120 insertions(+), 21 deletions(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 34f66ce..35e1f77 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -90,6 +90,14 @@
We have also removed some public APIs that are deprecated prior to
1.0.x in 2.0.0.
See below for a detailed list of removed APIs.
</p>
+ <h3><a id="streams_api_changes_210"
href="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
+ <p>
+ We updated <code>TopologyDescription</code> API to allow for better
runtime checking.
+ Users are encouraged to use <code>#topicSet()</code> and
<code>#topicPattern()</code> accordingly on
<code>TopologyDescription.Source</code> nodes,
+ instead of using <code>#topics()</code>, which has since been
deprecated. Similarly, use <code>#topic()</code> and
<code>#topicNameExtractor()</code>
+ to get descriptions of <code>TopologyDescription.Sink</code> nodes.
For more details, see
+ <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes">KIP-321</a>.
+ </p>
<h3><a id="streams_api_changes_200"
href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
<p>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 04a292f..870052d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StreamTask;
import java.util.Set;
+import java.util.regex.Pattern;
/**
* A meta representation of a {@link Topology topology}.
@@ -113,8 +115,22 @@ public interface TopologyDescription {
/**
* The topic names this source node is reading from.
* @return comma separated list of topic names or pattern (as String)
+ * @deprecated use {@link #topicSet()} or {@link #topicPattern()}
instead
*/
+ @Deprecated
String topics();
+
+ /**
+ * The topic names this source node is reading from.
+ * @return a set of topic names
+ */
+ Set<String> topicSet();
+
+ /**
+ * The pattern used to match topic names that is reading from.
+ * @return the pattern used to match topic names
+ */
+ Pattern topicPattern();
}
/**
@@ -134,10 +150,17 @@ public interface TopologyDescription {
interface Sink extends Node {
/**
* The topic name this sink node is writing to.
- * Could be null if the topic name can only be dynamically determined
based on {@code TopicNameExtractor}
+ * Could be {@code null} if the topic name can only be dynamically
determined based on {@link TopicNameExtractor}
* @return a topic name
*/
String topic();
+
+ /**
+ * The {@link TopicNameExtractor} class that this sink node uses to
dynamically extract the topic name to write to.
+ * Could be {@code null} if the topic name is not dynamically
determined.
+ * @return the {@link TopicNameExtractor} class used get the topic name
+ */
+ TopicNameExtractor topicNameExtractor();
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index b7e4303..99d616f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -282,15 +282,7 @@ public class InternalTopologyBuilder {
@Override
Source describe() {
- final String sourceTopics;
-
- if (pattern == null) {
- sourceTopics = topics.toString();
- } else {
- sourceTopics = pattern.toString();
- }
-
- return new Source(name, sourceTopics);
+ return new Source(name, new HashSet<>(topics), pattern);
}
}
@@ -1337,7 +1329,7 @@ public class InternalTopologyBuilder {
final String storeName,
final String topicName,
final int id) {
- source = new Source(sourceName, topicName);
+ source = new Source(sourceName, Collections.singleton(topicName),
null);
processor = new Processor(processorName,
Collections.singleton(storeName));
source.successors.add(processor);
processor.predecessors.add(source);
@@ -1424,27 +1416,43 @@ public class InternalTopologyBuilder {
}
public final static class Source extends AbstractNode implements
TopologyDescription.Source {
- private final String topics;
+ private final Set<String> topics;
+ private final Pattern topicPattern;
public Source(final String name,
- final String topics) {
+ final Set<String> topics,
+ final Pattern pattern) {
super(name);
this.topics = topics;
+ this.topicPattern = pattern;
}
+ @Deprecated
@Override
public String topics() {
+ return topics.toString();
+ }
+
+ @Override
+ public Set<String> topicSet() {
return topics;
}
@Override
+ public Pattern topicPattern() {
+ return topicPattern;
+ }
+
+ @Override
public void addPredecessor(final TopologyDescription.Node predecessor)
{
throw new UnsupportedOperationException("Sources don't have
predecessors.");
}
@Override
public String toString() {
- return "Source: " + name + " (topics: " + topics + ")\n --> "
+ nodeNames(successors);
+ final String topicsString = topics == null ?
topicPattern.toString() : topics.toString();
+
+ return "Source: " + name + " (topics: " + topicsString + ")\n
--> " + nodeNames(successors);
}
@Override
@@ -1459,13 +1467,14 @@ public class InternalTopologyBuilder {
final Source source = (Source) o;
// omit successor to avoid infinite loops
return name.equals(source.name)
- && topics.equals(source.topics);
+ && topics.equals(source.topics)
+ && topicPattern.equals(source.topicPattern);
}
@Override
public int hashCode() {
// omit successor as it might change and alter the hash code
- return Objects.hash(name, topics);
+ return Objects.hash(name, topics, topicPattern);
}
}
@@ -1528,10 +1537,20 @@ public class InternalTopologyBuilder {
@Override
public String topic() {
- if (topicNameExtractor instanceof StaticTopicNameExtractor)
+ if (topicNameExtractor instanceof StaticTopicNameExtractor) {
return ((StaticTopicNameExtractor)
topicNameExtractor).topicName;
- else
+ } else {
return null;
+ }
+ }
+
+ @Override
+ public TopicNameExtractor topicNameExtractor() {
+ if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+ return null;
+ } else {
+ return topicNameExtractor;
+ }
}
@Override
@@ -1541,7 +1560,10 @@ public class InternalTopologyBuilder {
@Override
public String toString() {
- return "Sink: " + name + " (topic: " + topic() + ")\n <-- " +
nodeNames(predecessors);
+ if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+ return "Sink: " + name + " (topic: " + topic() + ")\n <--
" + nodeNames(predecessors);
+ }
+ return "Sink: " + name + " (extractor class: " +
topicNameExtractor + ")\n <-- " + nodeNames(predecessors);
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index ece157c..eeb08ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -371,6 +372,23 @@ public class TopologyTest {
}
@Test
+ public void sinkShouldReturnNullTopicWithDynamicRouting() {
+ final TopologyDescription.Sink expectedSinkNode
+ = new InternalTopologyBuilder.Sink("sink", (key, value,
record) -> record.topic() + "-" + key);
+
+ assertThat(expectedSinkNode.topic(), equalTo(null));
+ }
+
+ @Test
+ public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
+ final TopicNameExtractor topicNameExtractor = (key, value, record) ->
record.topic() + "-" + key;
+ final TopologyDescription.Sink expectedSinkNode
+ = new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
+
+ assertThat(expectedSinkNode.topicNameExtractor(),
equalTo(topicNameExtractor));
+ }
+
+ @Test
public void singleSourceShouldHaveSingleSubtopology() {
final TopologyDescription.Source expectedSourceNode =
addSource("source", "topic");
@@ -630,6 +648,34 @@ public class TopologyTest {
}
@Test
+ public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final TopicNameExtractor topicNameExtractor = new TopicNameExtractor()
{
+ @Override
+ public String extract(final Object key, final Object value, final
RecordContext recordContext) {
+ return recordContext.topic() + "-" + key;
+ }
+
+ @Override
+ public String toString() {
+ return "anonymous topic name extractor. topic is
[recordContext.topic()]-[key]";
+ }
+ };
+ builder.stream("input-topic").to(topicNameExtractor);
+ final TopologyDescription describe = builder.build().describe();
+
+ assertEquals(
+ "Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics:
[input-topic])\n" +
+ " --> KSTREAM-SINK-0000000001\n" +
+ " Sink: KSTREAM-SINK-0000000001 (extractor class: anonymous
topic name extractor. topic is [recordContext.topic()]-[key])\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n\n",
+ describe.toString());
+ }
+
+ @Test
public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
@@ -1048,13 +1094,13 @@ public class TopologyTest {
for (int i = 1; i < sourceTopic.length; ++i) {
allSourceTopics.append(", ").append(sourceTopic[i]);
}
- return new InternalTopologyBuilder.Source(sourceName,
allSourceTopics.toString());
+ return new InternalTopologyBuilder.Source(sourceName, new
HashSet<>(Arrays.asList(sourceTopic)), null);
}
private TopologyDescription.Source addSource(final String sourceName,
final Pattern sourcePattern) {
topology.addSource(null, sourceName, null, null, null, sourcePattern);
- return new InternalTopologyBuilder.Source(sourceName,
sourcePattern.toString());
+ return new InternalTopologyBuilder.Source(sourceName, null,
sourcePattern);
}
private TopologyDescription.Processor addProcessor(final String
processorName,