appchemist created KAFKA-20264:
----------------------------------

             Summary: `MERGE_REPARTITION_TOPICS` optimization fails depending 
on which branch of a merged stream contains the key-changing operation
                 Key: KAFKA-20264
                 URL: https://issues.apache.org/jira/browse/KAFKA-20264
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: appchemist


When the MERGE_REPARTITION_TOPICS optimization is enabled, merging two streams 
where only one branch contains a key-changing operation may fail to consolidate 
repartition topics.

The result depends on the order in which parent branches are traversed, if the 
branch with the key-changing operation is searched first and a subsequent 
branch without one returns null, the earlier result is overwritten.

 

*Steps to Reproduce*
{code:java}
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> left = 
builder.stream(Collections.singleton("topic-1"), consumed);
final KStream<String, String> right = 
builder.stream(Collections.singleton("topic-2"), consumed)
        .selectKey((k, v) -> v)
        .filter((k, v) -> v != null);

final KStream<String, String> merged = left.merge(right);

final KGroupedStream<String, String> grouped = merged.groupByKey();
grouped.count(Materialized.as("count-store"));
grouped.aggregate(
        () -> null,
        (k, v, agg) -> k, Materialized.as("latest-store")); {code}
*Expected Behavior*
With MERGE_REPARTITION_TOPICS enabled, count() and aggregate() should share a 
single repartition topic — only 1 repartition topic should be created.

*Actual Behavior*
Duplicate repartition topics are created. The optimization fails to recognize 
the relationship between the merge node and the key-changing node.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to