[ https://issues.apache.org/jira/browse/KAFKA-5924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504370#comment-16504370 ]
Laurent T edited comment on KAFKA-5924 at 6/7/18 7:59 AM: ---------------------------------------------------------- The {{transform()}} method requires you to have a {{TransformerSupplier}} which itself must return a {{Transformer}} which is not a functional interface and thus requires a much less readable code. What i'm wishing for here is for a method that will provide better readability and will facilitate decoupling various parts of the stream processing without requiering the implementation of a somewhat complex interface. Here is what might look like {{TopologyUtils::myUtil}} if i wanted it to just print the values. (also added to the main description) {code:java} public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) { return stream.peek((k, v) -> System.out.println(k + ": " + v)); } {code} was (Author: crystark): The {{transform()}} method requires you to have a {{TransformerSupplier}} which itself must return a {{Transformer }}which is not a functional interface and thus requires a much less readable code. What i'm wishing for here is for a method that will provide better readability and will facilitate decoupling various parts of the stream processing without requiering the implementation of a somewhat complex interface. Here is what might look like {{TopologyUtils::myUtil}} if i wanted it to just print the values. (also added to the main description) {code:java} public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) { return stream.peek((k, v) -> System.out.println(k + ": " + v)); } {code} > Add the compose method to the Kafka Stream API > ---------------------------------------------- > > Key: KAFKA-5924 > URL: https://issues.apache.org/jira/browse/KAFKA-5924 > Project: Kafka > Issue Type: Wish > Components: streams > Reporter: Laurent T > Priority: Minor > Labels: needs-kip > > Hi, > I'm referencing RxJava for it's [compose > method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators] > which is very handy. It would be great if the Streams API would give us > something similar. It's pretty easy to implement and allows to have much more > clarity to the code (it avoids breaking the linearity of the code when you > want to reuse parts of the stream topology). e.g. > Without compose: > {code:java} > TopologyUtils > .myUtil(topology > .map(...) > .flatMap(...) > .through(...)) > .map(...) > .to(...); > {code} > With compose: > {code:java} > topology > .map(...) > .flatMap(...) > .through(...) > .compose(TopologyUtils::myUtil) > .map(...) > .to(...); > {code} > Here is what might look like TopologyUtils::myUtil if i wanted it to just > print the values. > {code:java} > public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) { > return stream.peek((k, v) -> System.out.println(k + ": " + v)); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)