Tomasz Kaszuba created KAFKA-13963: -------------------------------------- Summary: Topology Description ignores context.forward Key: KAFKA-13963 URL: https://issues.apache.org/jira/browse/KAFKA-13963 Project: Kafka Issue Type: Bug Affects Versions: 2.7.2 Reporter: Tomasz Kaszuba
I have a simple topology: {code:java} val topology = new Topology topology .addSource("source", Serdes.stringSerde.deserializer, Serdes.stringSerde.deserializer, inputTopic) .addProcessor( "process", new ProcessorSupplier[String, String] { override def get(): Processor[String, String] = new RecordCollectorProcessor() }, "source" ) {code} And a simple processor that uses context.forward to forward messages: {code:java} private class ContextForwardProcessor extends AbstractProcessor[String, String]() { override def process(key: String, value: String): Unit = context().forward("key", "value", To.child("output")) override def close(): Unit = () } {code} when I call topology.describe() I receive this: {noformat} Topologies: Sub-topology: 0 Source: source (topics: [input]) --> process Processor: process (stores: []) --> none <-- source {noformat} Ignoring the fact that this will not run since it will throw a runtime exception why is the To.child ignored? Taking it one point further if I add multiple sinks to the topology like so: {code:java} val topology = new Topology topology .addSource("source", Serdes.stringSerde.deserializer, Serdes.stringSerde.deserializer, inputTopic) .addProcessor( "process", new ProcessorSupplier[String, String] { override def get(): Processor[String, String] = new ContextForwardProcessor() }, "source" ) .addSink("sink", "output1", Serdes.stringSerde.serializer(), Serdes.stringSerde.serializer(), "process") .addSink("sink2", "output2", Serdes.stringSerde.serializer(), Serdes.stringSerde.serializer(), "process") {code} but have the processor only output to "output1" it is in no way reflected in the described topology graph. I assume this is by design since it's a lot more work to interpret what the context.forward is doing but when I tried to look for this information in the java doc I couldn't find it. -- This message was sent by Atlassian Jira (v8.20.7#820007)