[ https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551289#comment-17551289 ]
Tomasz Kaszuba commented on KAFKA-13963: ---------------------------------------- Ok, this is what I thought. Is it worth updating the java doc to mention this? The developers I work with were surprised that context.forward is not covered. We really heavily on the generated topology graphs for impact analysis. Btw, I think you can get around the context forward exception and the need for registering sinks if you use the internal RecordCollector, which I feel should be better hidden from the streams api users since it's a class cast exception waiting to happen. I can open up a separate bug for that if it makes sense. {code:java} collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector {code} > Topology Description ignores context.forward > -------------------------------------------- > > Key: KAFKA-13963 > URL: https://issues.apache.org/jira/browse/KAFKA-13963 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.2 > Reporter: Tomasz Kaszuba > Priority: Minor > > I have a simple topology: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new RecordCollectorProcessor() > }, > "source" > ) {code} > And a simple processor that uses context.forward to forward messages: > {code:java} > private class ContextForwardProcessor extends AbstractProcessor[String, > String]() { override def process(key: String, value: String): Unit = > context().forward("key", "value", To.child("output")) override def > close(): Unit = () > } {code} > when I call topology.describe() I receive this: > {noformat} > Topologies: > Sub-topology: 0 > Source: source (topics: [input]) > --> process > Processor: process (stores: []) > --> none > <-- source {noformat} > Ignoring the fact that this will not run since it will throw a runtime > exception why is the To.child ignored? > Taking it one point further if I add multiple sinks to the topology like so: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new ContextForwardProcessor() > }, > "source" > ) > .addSink("sink", "output1", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") > .addSink("sink2", "output2", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") {code} > but have the processor only output to "output1" it is in no way reflected in > the described topology graph. > I assume this is by design since it's a lot more work to interpret what the > context.forward is doing but when I tried to look for this information in the > java doc I couldn't find it. > -- This message was sent by Atlassian Jira (v8.20.7#820007)