Nikki Thean created KAFKA-7055:
----------------------------------

             Summary: Kafka Streams Processor API allows you to add sinks and 
processors without parent
                 Key: KAFKA-7055
                 URL: https://issues.apache.org/jira/browse/KAFKA-7055
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.0.0
            Reporter: Nikki Thean
            Assignee: Nikki Thean


The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. 
([example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index)

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg (using [this 
method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]).]

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like [this one for when a user attempts to access a state 
store that is not connected to the 
processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to