[ https://issues.apache.org/jira/browse/KAFKA-5924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Laurent T updated KAFKA-5924: ----------------------------- Description: Hi, I'm referencing RxJava for it's [compose method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators] which is very handy. It would be great if the Streams API would give us something similar. It's pretty easy to implement and allows to have much more clarity to the code (it avoids breaking the linearity of the code when you want to reuse parts of the stream topology). e.g. Without compose: {code:java} TopologyUtils .myUtil(topology .map(...) .flatMap(...) .through(...)) .map(...) .to(...); {code} With compose: {code:java} topology .map(...) .flatMap(...) .through(...) .compose(TopologyUtils::myUtil) .map(...) .to(...); {code} public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) { return stream.peek((k, v) -> System.out.println(k + ": " + v)); } was: Hi, I'm referencing RxJava for it's [compose method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators] which is very handy. It would be great if the Streams API would give us something similar. It's pretty easy to implement and allows to have much more clarity to the code (it avoids breaking the linearity of the code when you want to reuse parts of the stream topology). e.g. Without compose: {code:java} TopologyUtils .myUtil(topology .map(...) .flatMap(...) .through(...)) .map(...) .to(...); {code} With compose: {code:java} topology .map(...) .flatMap(...) .through(...) .compose(TopologyUtils::myUtil) .map(...) .to(...); {code} > Add the compose method to the Kafka Stream API > ---------------------------------------------- > > Key: KAFKA-5924 > URL: https://issues.apache.org/jira/browse/KAFKA-5924 > Project: Kafka > Issue Type: Wish > Components: streams > Reporter: Laurent T > Priority: Minor > Labels: needs-kip > > Hi, > I'm referencing RxJava for it's [compose > method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators] > which is very handy. It would be great if the Streams API would give us > something similar. It's pretty easy to implement and allows to have much more > clarity to the code (it avoids breaking the linearity of the code when you > want to reuse parts of the stream topology). e.g. > Without compose: > {code:java} > TopologyUtils > .myUtil(topology > .map(...) > .flatMap(...) > .through(...)) > .map(...) > .to(...); > {code} > With compose: > {code:java} > topology > .map(...) > .flatMap(...) > .through(...) > .compose(TopologyUtils::myUtil) > .map(...) > .to(...); > {code} > public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) { > return stream.peek((k, v) -> System.out.println(k + ": " + v)); > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)