[ https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551333#comment-17551333 ]
Matthias J. Sax commented on KAFKA-13963: ----------------------------------------- {quote}Is it worth updating the java doc to mention this? {quote} Updating docs can never hurt :) – are you interested in doing a PR? {quote}if you use the internal RecordCollector, which I feel should be better hidden from the streams api users {quote} Yes, you should NEVER use internal stuff... Not sure how we could "better hide" it though? Seems not to be possible as long as we are using Java 8... {quote}I can open up a separate bug for that if it makes sense. {quote} Don't think it's a bug? It (unfortunately) how Java works. > Topology Description ignores context.forward > -------------------------------------------- > > Key: KAFKA-13963 > URL: https://issues.apache.org/jira/browse/KAFKA-13963 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.2 > Reporter: Tomasz Kaszuba > Priority: Minor > > I have a simple topology: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new RecordCollectorProcessor() > }, > "source" > ) {code} > And a simple processor that uses context.forward to forward messages: > {code:java} > private class ContextForwardProcessor extends AbstractProcessor[String, > String]() { override def process(key: String, value: String): Unit = > context().forward("key", "value", To.child("output")) override def > close(): Unit = () > } {code} > when I call topology.describe() I receive this: > {noformat} > Topologies: > Sub-topology: 0 > Source: source (topics: [input]) > --> process > Processor: process (stores: []) > --> none > <-- source {noformat} > Ignoring the fact that this will not run since it will throw a runtime > exception why is the To.child ignored? > Taking it one point further if I add multiple sinks to the topology like so: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new ContextForwardProcessor() > }, > "source" > ) > .addSink("sink", "output1", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") > .addSink("sink2", "output2", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") {code} > but have the processor only output to "output1" it is in no way reflected in > the described topology graph. > I assume this is by design since it's a lot more work to interpret what the > context.forward is doing but when I tried to look for this information in the > java doc I couldn't find it. > -- This message was sent by Atlassian Jira (v8.20.7#820007)