[ 
https://issues.apache.org/jira/browse/KAFKA-5924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Laurent T updated KAFKA-5924:
-----------------------------
    Description: 
Hi,

I'm referencing RxJava for it's [compose 
method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators]
 which is very handy. It would be great if the Streams API would give us 
something similar. It's pretty easy to implement and allows to have much more 
clarity to the code (it avoids breaking the linearity of the code when you want 
to reuse parts of the stream topology). e.g.

Without compose:
{code:java}
TopologyUtils
    .myUtil(topology
        .map(...)
        .flatMap(...)
        .through(...))
    .map(...)
    .to(...);
{code}
With compose:
{code:java}
topology
    .map(...)
    .flatMap(...)
    .through(...)
    .compose(TopologyUtils::myUtil)
    .map(...)
    .to(...);
{code}
public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) {

    return stream.peek((k, v) -> System.out.println(k + ": " + v));
} 

  was:
Hi,

I'm referencing RxJava for it's [compose 
method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators]
 which is very handy. It would be great if the Streams API would give us 
something similar. It's pretty easy to implement and allows to have much more 
clarity to the code (it avoids breaking the linearity of the code when you want 
to reuse parts of the stream topology). e.g.

Without compose:
{code:java}
TopologyUtils
    .myUtil(topology
        .map(...)
        .flatMap(...)
        .through(...))
    .map(...)
    .to(...);
{code}

With compose:
{code:java}
topology
    .map(...)
    .flatMap(...)
    .through(...)
    .compose(TopologyUtils::myUtil)
    .map(...)
    .to(...);
{code}


> Add the compose method to the Kafka Stream API
> ----------------------------------------------
>
>                 Key: KAFKA-5924
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5924
>             Project: Kafka
>          Issue Type: Wish
>          Components: streams
>            Reporter: Laurent T
>            Priority: Minor
>              Labels: needs-kip
>
> Hi,
> I'm referencing RxJava for it's [compose 
> method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators]
>  which is very handy. It would be great if the Streams API would give us 
> something similar. It's pretty easy to implement and allows to have much more 
> clarity to the code (it avoids breaking the linearity of the code when you 
> want to reuse parts of the stream topology). e.g.
> Without compose:
> {code:java}
> TopologyUtils
>     .myUtil(topology
>         .map(...)
>         .flatMap(...)
>         .through(...))
>     .map(...)
>     .to(...);
> {code}
> With compose:
> {code:java}
> topology
>     .map(...)
>     .flatMap(...)
>     .through(...)
>     .compose(TopologyUtils::myUtil)
>     .map(...)
>     .to(...);
> {code}
> public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) {
>     return stream.peek((k, v) -> System.out.println(k + ": " + v));
> } 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to