[
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847802#comment-15847802
]
Matthias J. Sax commented on KAFKA-3543:
----------------------------------------
[~gfodor] I close this as duplicate. Nevertheless one question: I don't
understand your comment in the JIRA description about "just calling forward()
myself on the context and actually emitting dummy values which are filtered out
downstream" ? Actually, using {{context.forward()}} is absolutely fine and
efficient.
> Allow a variant of transform() which can emit multiple values
> -------------------------------------------------------------
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 0.10.0.0
> Reporter: Greg Fodor
> Labels: api
> Fix For: 0.10.3.0
>
>
> Right now it seems that if you want to apply an arbitrary stateful
> transformation to a stream, you either have to use a TransformerSupplier or
> ProcessorSupplier sent to transform() or process(). The custom processor will
> allow you to emit multiple new values, but the process() method currently
> terminates that branch of the topology so you can't apply additional data
> flow. transform() lets you continue the data flow, but forces you to emit a
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the
> ProcessorContext and emit multiple, but that's probably not the ideal way to
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple
> values per input value. I'm not sure of the best way to factor this inside of
> the current TransformerSupplier/Transformer architecture in a way that is
> clean and efficient -- currently I'm doing the workaround above of just
> calling forward() myself on the context and actually emitting dummy values
> which are filtered out downstream.
> -------------
> It is worth considering adding a new flatTransofrm function as
> {code}
> <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V,
> Iterable<KeyValue<K1, V1>>> transformerSupplier, String... stateStoreNames)
> {code}
> which is essentially the same as
> {code} transform().flatMap() {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)