[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16059527#comment-16059527 ]
Matthias J. Sax commented on KAFKA-5488: ---------------------------------------- You can always start a discussion on the mailing list. For this to be productive, it's usually best to propose an initial design and point out the strength/weakness. This helps people to give constructive feedback. As the wiki is a living document, I would just start a KIP page and dump my thoughts there, and start a KIP discussion thread on dev list (as describe at KIP wiki page). > KStream.branch should not return a Array of streams we have to access by > known index > ------------------------------------------------------------------------------------ > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.11.0.0, 0.10.2.1 > Reporter: Marcel "childNo͡.de" Trautwein > Labels: needs-kip > > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:lang=java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder() > .<byte[], EventType>stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition<Predicate<>, > Consumer<KStream<>>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:lang=java} > new KStreamBuilder() > .<byte[], EventType>stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > https://gitlab.com/childno.de/apache_kafka/snippets/1665655 -- This message was sent by Atlassian JIRA (v6.4.14#64029)