[
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tomasz Kaszuba updated KAFKA-13963:
-----------------------------------
Component/s: streams
> Topology Description ignores context.forward
> --------------------------------------------
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.7.2
> Reporter: Tomasz Kaszuba
> Priority: Minor
>
> I have a simple topology:
> {code:java}
> val topology = new Topology
> topology
> .addSource("source", Serdes.stringSerde.deserializer,
> Serdes.stringSerde.deserializer, inputTopic)
> .addProcessor(
> "process",
> new ProcessorSupplier[String, String] {
> override def get(): Processor[String, String] =
> new RecordCollectorProcessor()
> },
> "source"
> ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
> private class ContextForwardProcessor extends AbstractProcessor[String,
> String]() { override def process(key: String, value: String): Unit =
> context().forward("key", "value", To.child("output")) override def
> close(): Unit = ()
> } {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
> Sub-topology: 0
> Source: source (topics: [input])
> --> process
> Processor: process (stores: [])
> --> none
> <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
> topology
> .addSource("source", Serdes.stringSerde.deserializer,
> Serdes.stringSerde.deserializer, inputTopic)
> .addProcessor(
> "process",
> new ProcessorSupplier[String, String] {
> override def get(): Processor[String, String] =
> new ContextForwardProcessor()
> },
> "source"
> )
> .addSink("sink", "output1", Serdes.stringSerde.serializer(),
> Serdes.stringSerde.serializer(), "process")
> .addSink("sink2", "output2", Serdes.stringSerde.serializer(),
> Serdes.stringSerde.serializer(), "process") {code}
> but have the processor only output to "output1" it is in no way reflected in
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the
> context.forward is doing but when I tried to look for this information in the
> java doc I couldn't find it.
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)