[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551289#comment-17551289
 ] 

Tomasz Kaszuba commented on KAFKA-13963:
----------------------------------------

Ok, this is what I thought. Is it worth updating the java doc to mention this? 
The developers I work with were surprised that context.forward is not covered. 
We really heavily on the generated topology graphs for impact analysis.

Btw, I think you can get around the context forward exception and the need for 
registering sinks if you use the internal RecordCollector, which I feel should 
be better hidden from the streams api users since it's a class cast exception 
waiting to happen. I can open up a separate bug for that if it makes sense.
{code:java}
collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector 
{code}

> Topology Description ignores context.forward
> --------------------------------------------
>
>                 Key: KAFKA-13963
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13963
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.7.2
>            Reporter: Tomasz Kaszuba
>            Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to