Hi All,

I'm starting to experiment with the lower-level Processor Client API found
on the KIP-28 wiki.

When starting the KafkaStream I get the following Exception:

Exception in thread "main" java.util.NoSuchElementException: id: SINK
at
org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40)
at
org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387)
at
org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339)
at
org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:139)
at
org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:120)
at org.apache.kafka.streams.KafkaStreaming.<init>(KafkaStreaming.java:110)
at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35)

The TopologyBuilder is being built like so:
topologyBuilder.addSource("SOURCE", new StringDeserializer(), new
StringDeserializer(), "src-topic")
                .addProcessor("PROCESS", new
GenericProcessorClient(replaceVowels), "SOURCE")
                .addSink("SINK", "dest-topic", new StringSerializer(), new
StringSerializer(), "PROCESS");

Looks to me the cause of the error is that in  TopologyBuilder.addSink
method the sink  is never connected with it's parent.

When I added the following two lines to the addSink method, the Exception
goes away.

  nodeGrouper.add(name);
  nodeGrouper.unite(name, parentNames);

Is this a bug or am I doing something incorrect?

Thanks,
Bill

Reply via email to