[ 
https://issues.apache.org/jira/browse/KAFKA-5924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504370#comment-16504370
 ] 

Laurent T edited comment on KAFKA-5924 at 6/7/18 7:59 AM:
----------------------------------------------------------

The {{transform()}} method requires you to have a {{TransformerSupplier}} which 
itself must return a {{Transformer}} which is not a functional interface and 
thus requires a much less readable code. 

What i'm wishing for here is for a method that will provide better readability 
and will facilitate decoupling various parts of the stream processing without 
requiering the implementation of a somewhat complex interface.

Here is what might look like {{TopologyUtils::myUtil}} if i wanted it to just 
print the values. (also added to the main description)

 
{code:java}
public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) {
    return stream.peek((k, v) -> System.out.println(k + ": " + v));
} 
{code}
 

 


was (Author: crystark):
The {{transform()}} method requires you to have a {{TransformerSupplier}} which 
itself must return a {{Transformer }}which is not a functional interface and 
thus requires a much less readable code. 

What i'm wishing for here is for a method that will provide better readability 
and will facilitate decoupling various parts of the stream processing without 
requiering the implementation of a somewhat complex interface.

Here is what might look like {{TopologyUtils::myUtil}} if i wanted it to just 
print the values. (also added to the main description)

 
{code:java}
public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) {
    return stream.peek((k, v) -> System.out.println(k + ": " + v));
} 
{code}
 

 

> Add the compose method to the Kafka Stream API
> ----------------------------------------------
>
>                 Key: KAFKA-5924
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5924
>             Project: Kafka
>          Issue Type: Wish
>          Components: streams
>            Reporter: Laurent T
>            Priority: Minor
>              Labels: needs-kip
>
> Hi,
> I'm referencing RxJava for it's [compose 
> method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators]
>  which is very handy. It would be great if the Streams API would give us 
> something similar. It's pretty easy to implement and allows to have much more 
> clarity to the code (it avoids breaking the linearity of the code when you 
> want to reuse parts of the stream topology). e.g.
> Without compose:
> {code:java}
> TopologyUtils
>     .myUtil(topology
>         .map(...)
>         .flatMap(...)
>         .through(...))
>     .map(...)
>     .to(...);
> {code}
> With compose:
> {code:java}
> topology
>     .map(...)
>     .flatMap(...)
>     .through(...)
>     .compose(TopologyUtils::myUtil)
>     .map(...)
>     .to(...);
> {code}
> Here is what might look like TopologyUtils::myUtil if i wanted it to just 
> print the values.
> {code:java}
> public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) {
>     return stream.peek((k, v) -> System.out.println(k + ": " + v));
> } 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to