Repository: kafka Updated Branches: refs/heads/0.10.0 55af7ec6b -> 801a70612
MINOR: Add application id prefix for copartitionGroups in TopologyBuilder This is bugfix that is already in trunk but not backported to 0.10.0. Author: Guozhang Wang <[email protected]> Reviewers: Damian Guy <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1735 from guozhangwang/Kminor-topology-applicationID-0.10.0 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/801a7061 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/801a7061 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/801a7061 Branch: refs/heads/0.10.0 Commit: 801a706124af16f605abc6141f38f9eed916ffc2 Parents: 55af7ec Author: Guozhang Wang <[email protected]> Authored: Mon Aug 15 23:04:40 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Aug 15 23:04:40 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 2 + .../streams/processor/TopologyBuilder.java | 79 ++++++++++++++------ .../internals/StreamPartitionAssignor.java | 2 +- .../processor/internals/StreamThread.java | 6 +- .../kstream/internals/KStreamImplTest.java | 2 +- .../streams/processor/TopologyBuilderTest.java | 25 ++++--- .../internals/ProcessorTopologyTest.java | 2 +- .../internals/StreamPartitionAssignorTest.java | 12 +-- .../processor/internals/StreamThreadTest.java | 6 +- .../apache/kafka/test/KStreamTestDriver.java | 2 +- .../kafka/test/ProcessorTopologyTestDriver.java | 2 +- 11 files changed, 91 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 17c760e..3a311a8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -141,6 +141,8 @@ public class KafkaStreams { // The application ID is a required config and hence should always have value final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); + builder.setApplicationId(applicationId); + String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement(); http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 7161a80..6b57b17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -38,6 +38,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -64,6 +65,7 @@ public class TopologyBuilder { private final HashMap<String, String[]> nodeToSourceTopics = new HashMap<>(); private final HashMap<String, String> nodeToSinkTopic = new HashMap<>(); private Map<Integer, Set<String>> nodeGroups = null; + private String applicationId = null; private static class StateStoreFactory { public final Set<String> users; @@ -85,7 +87,7 @@ public class TopologyBuilder { this.name = name; } - public abstract ProcessorNode build(String applicationId); + public abstract ProcessorNode build(); } private static class ProcessorNodeFactory extends NodeFactory { @@ -105,7 +107,7 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build(String applicationId) { + public ProcessorNode build() { return new ProcessorNode(name, supplier.get(), stateStoreNames); } } @@ -124,7 +126,7 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build(String applicationId) { + public ProcessorNode build() { return new SourceNode(name, keyDeserializer, valDeserializer); } } @@ -147,10 +149,10 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build(String applicationId) { + public ProcessorNode build() { if (internalTopicNames.contains(topic)) { // prefix the internal topic name with the application id - return new SinkNode(name, applicationId + "-" + topic, keySerializer, valSerializer, partitioner); + return new SinkNode(name, decorateTopic(topic), keySerializer, valSerializer, partitioner); } else { return new SinkNode(name, topic, keySerializer, valSerializer, partitioner); } @@ -193,6 +195,22 @@ public class TopologyBuilder { public TopologyBuilder() {} /** + * Set the applicationId to be used for auto-generated internal topics. + * + * This is required before calling {@link #sourceTopics}, {@link #topicGroups}, + * {@link #copartitionSources} and {@link #build(Integer)}. + * + * @param applicationId the streams applicationId. Should be the same as set by + * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG} + */ + public synchronized final TopologyBuilder setApplicationId(String applicationId) { + Objects.requireNonNull(applicationId, "applicationId can't be null"); + this.applicationId = applicationId; + + return this; + } + + /** * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the @@ -501,7 +519,7 @@ public class TopologyBuilder { * * @return groups of topic names */ - public synchronized Map<Integer, TopicsInfo> topicGroups(String applicationId) { + public synchronized Map<Integer, TopicsInfo> topicGroups() { Map<Integer, TopicsInfo> topicGroups = new HashMap<>(); if (nodeGroups == null) @@ -520,7 +538,7 @@ public class TopologyBuilder { for (String topic : topics) { if (this.internalTopicNames.contains(topic)) { // prefix the internal topic name with the application id - String internalTopic = applicationId + "-" + topic; + String internalTopic = decorateTopic(topic); internalSourceTopics.add(internalTopic); sourceTopics.add(internalTopic); } else { @@ -534,7 +552,7 @@ public class TopologyBuilder { if (topic != null) { if (internalTopicNames.contains(topic)) { // prefix the change log topic name with the application id - sinkTopics.add(applicationId + "-" + topic); + sinkTopics.add(decorateTopic(topic)); } else { sinkTopics.add(topic); } @@ -544,7 +562,7 @@ public class TopologyBuilder { for (StateStoreFactory stateFactory : stateFactories.values()) { if (stateFactory.isInternal && stateFactory.users.contains(node)) { // prefix the change log topic name with the application id - stateChangelogTopics.add(applicationId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); + stateChangelogTopics.add(ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.supplier.name())); } } } @@ -629,7 +647,7 @@ public class TopologyBuilder { for (String node : nodeNames) { String[] topics = nodeToSourceTopics.get(node); if (topics != null) - copartitionGroup.addAll(Arrays.asList(topics)); + copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics)); } list.add(Collections.unmodifiableSet(copartitionGroup)); } @@ -642,7 +660,7 @@ public class TopologyBuilder { * * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig) */ - public synchronized ProcessorTopology build(String applicationId, Integer topicGroupId) { + public synchronized ProcessorTopology build(Integer topicGroupId) { Set<String> nodeGroup; if (topicGroupId != null) { nodeGroup = nodeGroups().get(topicGroupId); @@ -650,11 +668,11 @@ public class TopologyBuilder { // when nodeGroup is null, we build the full topology. this is used in some tests. nodeGroup = null; } - return build(applicationId, nodeGroup); + return build(nodeGroup); } @SuppressWarnings("unchecked") - private ProcessorTopology build(String applicationId, Set<String> nodeGroup) { + private ProcessorTopology build(Set<String> nodeGroup) { List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size()); Map<String, ProcessorNode> processorMap = new HashMap<>(); Map<String, SourceNode> topicSourceMap = new HashMap<>(); @@ -663,7 +681,7 @@ public class TopologyBuilder { // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) for (NodeFactory factory : nodeFactories.values()) { if (nodeGroup == null || nodeGroup.contains(factory.name)) { - ProcessorNode node = factory.build(applicationId); + ProcessorNode node = factory.build(); processorNodes.add(node); processorMap.put(node.name(), node); @@ -680,7 +698,7 @@ public class TopologyBuilder { for (String topic : ((SourceNodeFactory) factory).topics) { if (internalTopicNames.contains(topic)) { // prefix the internal topic name with the application id - topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node); + topicSourceMap.put(decorateTopic(topic), (SourceNode) node); } else { topicSourceMap.put(topic, (SourceNode) node); } @@ -702,15 +720,34 @@ public class TopologyBuilder { * Get the names of topics that are to be consumed by the source nodes created by this builder. * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null */ - public synchronized Set<String> sourceTopics(String applicationId) { - Set<String> topics = new HashSet<>(); - for (String topic : sourceTopicNames) { + public synchronized Set<String> sourceTopics() { + Set<String> topics = maybeDecorateInternalSourceTopics(sourceTopicNames); + return Collections.unmodifiableSet(topics); + } + + private Set<String> maybeDecorateInternalSourceTopics(final Set<String> sourceTopics) { + return maybeDecorateInternalSourceTopics(sourceTopics.toArray(new String[sourceTopics.size()])); + } + + private Set<String> maybeDecorateInternalSourceTopics(String ... sourceTopics) { + final Set<String> decoratedTopics = new HashSet<>(); + for (String topic : sourceTopics) { if (internalTopicNames.contains(topic)) { - topics.add(applicationId + "-" + topic); + decoratedTopics.add(decorateTopic(topic)); } else { - topics.add(topic); + decoratedTopics.add(topic); } } - return Collections.unmodifiableSet(topics); + return decoratedTopics; + } + + private String decorateTopic(String topic) { + if (applicationId == null) { + throw new TopologyBuilderException("there are internal topics and " + + "applicationId hasn't been set. Call " + + "setApplicationId first"); + } + + return applicationId + "-" + topic; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 085ff94..2ddfe43 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -118,7 +118,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable streamThread = (StreamThread) o; streamThread.partitionAssignor(this); - this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId); + this.topicGroups = streamThread.builder.topicGroups(); if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) { internalTopicManager = new InternalTopicManager( http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 72eeef5..bf88d1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -159,7 +159,7 @@ public class StreamThread extends Thread { this.applicationId = applicationId; this.config = config; this.builder = builder; - this.sourceTopics = builder.sourceTopics(applicationId); + this.sourceTopics = builder.sourceTopics(); this.clientId = clientId; this.processId = processId; this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); @@ -545,7 +545,7 @@ public class StreamThread extends Thread { protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) { sensors.taskCreationSensor.record(); - ProcessorTopology topology = builder.build(applicationId, id.topicGroupId); + ProcessorTopology topology = builder.build(id.topicGroupId); return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors); } @@ -615,7 +615,7 @@ public class StreamThread extends Thread { protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) { sensors.taskCreationSensor.record(); - ProcessorTopology topology = builder.build(applicationId, id.topicGroupId); + ProcessorTopology topology = builder.build(id.topicGroupId); if (!topology.stateStoreSuppliers().isEmpty()) { return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors); http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index a40c8fb..ff16a79 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -132,7 +132,7 @@ public class KStreamImplTest { 1 + // to 2 + // through 1, // process - builder.build("X", null).processors().size()); + builder.build(null).processors().size()); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 9af313a..a67b4a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -144,12 +144,12 @@ public class TopologyBuilderTest { builder.addSource("source-3", "topic-3"); builder.addInternalTopic("topic-3"); - Set<String> expected = new HashSet<String>(); + Set<String> expected = new HashSet<>(); expected.add("topic-1"); expected.add("topic-2"); expected.add("X-topic-3"); - assertEquals(expected, builder.sourceTopics("X")); + assertEquals(expected, builder.setApplicationId("X").sourceTopics()); } @Test(expected = TopologyBuilderException.class) @@ -190,21 +190,22 @@ public class TopologyBuilderTest { StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false); builder.addStateStore(supplier); - suppliers = builder.build("X", null).stateStoreSuppliers(); + suppliers = builder.build(null).stateStoreSuppliers(); assertEquals(0, suppliers.size()); builder.addSource("source-1", "topic-1"); builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); builder.connectProcessorAndStateStores("processor-1", "store-1"); - suppliers = builder.build("X", null).stateStoreSuppliers(); + suppliers = builder.build(null).stateStoreSuppliers(); assertEquals(1, suppliers.size()); assertEquals(supplier.name(), suppliers.get(0).name()); } @Test public void testTopicGroups() { - final TopologyBuilder builder = new TopologyBuilder(); + final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); + builder.addInternalTopic("topic-1x"); builder.addSource("source-1", "topic-1", "topic-1x"); builder.addSource("source-2", "topic-2"); builder.addSource("source-3", "topic-3"); @@ -218,10 +219,10 @@ public class TopologyBuilderTest { builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); - Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X"); + Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet())); + expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet())); expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet())); expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), Collections.<String>emptySet())); @@ -230,7 +231,7 @@ public class TopologyBuilderTest { Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); - assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups)); + assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups)); } @Test @@ -256,7 +257,7 @@ public class TopologyBuilderTest { builder.addStateStore(supplier); builder.connectProcessorAndStateStores("processor-5", "store-3"); - Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X"); + Map<Integer, TopicsInfo> topicGroups = builder.setApplicationId("X").topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1")))); @@ -281,9 +282,9 @@ public class TopologyBuilderTest { builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1"); builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); - ProcessorTopology topology0 = builder.build("X", 0); - ProcessorTopology topology1 = builder.build("X", 1); - ProcessorTopology topology2 = builder.build("X", 2); + ProcessorTopology topology0 = builder.build(0); + ProcessorTopology topology1 = builder.build(1); + ProcessorTopology topology2 = builder.build(2); assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors())); assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors())); http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 62b283a..382e853 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -96,7 +96,7 @@ public class ProcessorTopologyTest { builder.addSink("sink-1", "topic-3", "processor-1"); builder.addSink("sink-2", "topic-4", "processor-1", "processor-2"); - final ProcessorTopology topology = builder.build("X", null); + final ProcessorTopology topology = builder.build(null); assertEquals(6, topology.processors().size()); http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 17bda54..f743631 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -269,8 +269,9 @@ public class StreamPartitionAssignorTest { @Test public void testAssignWithStates() throws Exception { StreamsConfig config = new StreamsConfig(configProps()); - + String applicationId = "test"; TopologyBuilder builder = new TopologyBuilder(); + builder.setApplicationId(applicationId); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); @@ -295,10 +296,10 @@ public class StreamPartitionAssignorTest { UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime()); + StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime()); StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); subscriptions.put("consumer10", @@ -474,6 +475,7 @@ public class StreamPartitionAssignorTest { @Test public void testAssignWithInternalTopics() throws Exception { StreamsConfig config = new StreamsConfig(configProps()); + String applicationId = "test"; TopologyBuilder builder = new TopologyBuilder(); builder.addInternalTopic("topicX"); @@ -489,10 +491,10 @@ public class StreamPartitionAssignorTest { String client1 = "client1"; MockClientSupplier clientSupplier = new MockClientSupplier(); - StreamThread thread10 = new StreamThread(builder, config, clientSupplier, "test", client1, uuid1, new Metrics(), new SystemTime()); + StreamThread thread10 = new StreamThread(builder.setApplicationId(applicationId), config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime()); StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(internalTopicManager); http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 4ae31e4..b6a6bff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -160,7 +160,7 @@ public class StreamThreadTest { StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), new SystemTime()) { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { - ProcessorTopology topology = builder.build("X", id.topicGroupId); + ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config); } }; @@ -284,7 +284,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { - ProcessorTopology topology = builder.build("X", id.topicGroupId); + ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config); } }; @@ -403,7 +403,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { - ProcessorTopology topology = builder.build("X", id.topicGroupId); + ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config); } }; http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 7316804..dfa7f5d 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -57,7 +57,7 @@ public class KStreamTestDriver { File stateDir, Serde<?> keySerde, Serde<?> valSerde) { - this.topology = builder.build("X", null); + this.topology = builder.setApplicationId("KStreamTestDriver").build(null); this.stateDir = stateDir; this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector()); this.context.setTime(0L); http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 4ddbc2a..5188f34 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -147,7 +147,7 @@ public class ProcessorTopologyTestDriver { */ public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) { id = new TaskId(0, 0); - topology = builder.build("X", null); + topology = builder.build(null); // Set up the consumer and producer ... consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
