[ https://issues.apache.org/jira/browse/KAFKA-3891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Phil Derome resolved KAFKA-3891. -------------------------------- Resolution: Invalid My mistake. > A KTable with Long values with a numeric filter apparently may retain null > values > --------------------------------------------------------------------------------- > > Key: KAFKA-3891 > URL: https://issues.apache.org/jira/browse/KAFKA-3891 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Phil Derome > Assignee: Guozhang Wang > Priority: Minor > > See Confluent's UserRegionLambdaExample for full detail. Not sure if this > qualifies as a bug as I am new to community, but to me it looks like a bug > (resolved KAFKA-739 and KAFKA-2026 also pertain to undesirable nulls and they > were deemed Major Bugs). > The first filter on KTable for count below should filter correctly for null > since null does not satisfy predicate count >= 2. > Variable regionCounts apparently contain some null values despite the filter > on count given the second filter that takes place. It's quite confusing. Why > would we want to publish these null values on any topic given the filter's > intent should be quite clear? > // Aggregate the user counts of by region > KTable<String, Long> regionCounts = userRegions > // Count by region > // We do not need to specify any explict serdes because the key and > value types do not change > .groupBy((userId, region) -> KeyValue.pair(region, region)) > .count("CountsByRegion") > // discard any regions with only 1 user > .filter((regionName, count) -> count >= 2); > // Note: The following operations would NOT be needed for the actual > users-per-region > // computation, which would normally stop at the filter() above. We use > the operations > // below only to "massage" the output data so it is easier to inspect on > the console via > // kafka-console-consumer. > // > KStream<String, Long> regionCountsForConsole = regionCounts > // get rid of windows (and the underlying KTable) by transforming the > KTable to a KStream > .toStream() > // sanitize the output by removing null record values (again, we do > this only so that the > // output is easier to read via kafka-console-consumer combined with > LongDeserializer > // because LongDeserializer fails on null values, and even though we > could configure > // kafka-console-consumer to skip messages on error the output still > wouldn't look pretty) > .filter((regionName, count) -> count != null); > // write to the result topic, we need to override the value serializer to > for type long > regionCountsForConsole.to(stringSerde, longSerde, "LargeRegions"); -- This message was sent by Atlassian JIRA (v6.3.4#6332)