Nikki Thean created KAFKA-7055: ---------------------------------- Summary: Kafka Streams Processor API allows you to add sinks and processors without parent Key: KAFKA-7055 URL: https://issues.apache.org/jira/browse/KAFKA-7055 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: Nikki Thean Assignee: Nikki Thean
The Kafka Streams Processor API allows you to define a Topology and connect sources, processors, and sinks. From reading through the code, it seems that you cannot forward a message to a downstream node unless it is explicitly connected to the upstream node (from which you are forwarding the message) as a child. ([example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]] where you forward using name of downstream node rather than child index) However, I've been able to connect processors and sinks to the topology without including parent names, i.e with empty vararg (using [this method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]).] As any attempt to forward a message to those nodes will throw a StreamsException, I suggest throwing an exception if a processor or sink is added without at least one upstream node. There is a method in `InternalTopologyBuilder` that allows you to connect processors by name after you add them to the topology, but it is not part of the external Processor API. In addition (or alternatively), I suggest making [the error message for when users try to forward messages to a node that is not connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119] more descriptive, like [this one for when a user attempts to access a state store that is not connected to the processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].] -- This message was sent by Atlassian JIRA (v7.6.3#76005)