[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-7595. ------------------------------------ Resolution: Not A Bug > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > -------------------------------------------------------------------------------- > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Vik Gamov > Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable<Long, Long> ratingCounts = ratingsById.count();}} > {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable<Long, Long> ratingCounts = ratingsById.count();}} > {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable<Long, Long> ratingCounts = ratingsById.count();}} > {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)