Hi All, I'm starting to experiment with the lower-level Processor Client API found on the KIP-28 wiki.
When starting the KafkaStream I get the following Exception: Exception in thread "main" java.util.NoSuchElementException: id: SINK at org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40) at org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387) at org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339) at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:139) at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:120) at org.apache.kafka.streams.KafkaStreaming.<init>(KafkaStreaming.java:110) at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35) The TopologyBuilder is being built like so: topologyBuilder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "src-topic") .addProcessor("PROCESS", new GenericProcessorClient(replaceVowels), "SOURCE") .addSink("SINK", "dest-topic", new StringSerializer(), new StringSerializer(), "PROCESS"); Looks to me the cause of the error is that in TopologyBuilder.addSink method the sink is never connected with it's parent. When I added the following two lines to the addSink method, the Exception goes away. nodeGrouper.add(name); nodeGrouper.unite(name, parentNames); Is this a bug or am I doing something incorrect? Thanks, Bill