[ https://issues.apache.org/jira/browse/KAFKA-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15303877#comment-15303877 ]
Damian Guy commented on KAFKA-3561: ----------------------------------- Guozhang - i'd like to do this if it is ok with you. I'm going to need a bit of guidance though. It looks to me that I'd need to change the signature of map to something like: {code} <K1, V1> KStream<K1, V1> map(final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper, final Serde<K1> keySerde, final Serde<V1> valueSerde, final String topic); {code} and then it could be implemented like so: {code} @Override public <K1, V1> KStream<K1, V1> map(final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper, final Serde<K1> keySerde, final Serde<V1> valueSerde, final String topic) { String name = topology.newName(MAP_NAME); topology.addProcessor(name, new KStreamMap<>(mapper), this.name); return new KStreamImpl<K1, V1>(topology, name, null).through(keySerde, valueSerde, topic); } {code} What other methods does this apply to. What am I missing? (I'm sure there is a lot of context i'm missing) Thanks, Damian > Auto create through topic for KStream aggregation and join > ---------------------------------------------------------- > > Key: KAFKA-3561 > URL: https://issues.apache.org/jira/browse/KAFKA-3561 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Damian Guy > Labels: api > Fix For: 0.10.1.0 > > > For KStream.join / aggregateByKey operations that requires the streams to be > partitioned on the record key, today users should repartition themselves > through the "through" call: > {code} > stream1 = builder.stream("topic1"); > stream2 = builder.stream("topic2"); > stream3 = stream1.map(/* set the right key for join*/).through("topic3"); > stream4 = stream2.map(/* set the right key for join*/).through("topic4"); > stream3.join(stream4, ..) > {code} > This pattern can actually be done by the Streams DSL itself instead of > requiring users to specify themselves, i.e. users can just set the right key > like (see KAFKA-3430) and then call join, which will be translated by adding > the "internal topic for repartition". > Another thing is that today if user do not call "through" after setting a new > key, the aggregation result would not be correct as the aggregation is based > on key B while the source partitions is partitioned by key A and hence each > task will only get a partial aggregation for all keys. But this is not > validated in the DSL today. We should do both the auto-translation and > validation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)