[
https://issues.apache.org/jira/browse/KAFKA-3891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Phil Derome updated KAFKA-3891:
-------------------------------
Please reject, my mistake.
> A KTable with Long values with a numeric filter apparently may retain null
> values
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-3891
> URL: https://issues.apache.org/jira/browse/KAFKA-3891
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 0.10.0.0
> Reporter: Phil Derome
> Assignee: Guozhang Wang
> Priority: Minor
>
> See Confluent's UserRegionLambdaExample for full detail. Not sure if this
> qualifies as a bug as I am new to community, but to me it looks like a bug
> (resolved KAFKA-739 and KAFKA-2026 also pertain to undesirable nulls and they
> were deemed Major Bugs).
> The first filter on KTable for count below should filter correctly for null
> since null does not satisfy predicate count >= 2.
> Variable regionCounts apparently contain some null values despite the filter
> on count given the second filter that takes place. It's quite confusing. Why
> would we want to publish these null values on any topic given the filter's
> intent should be quite clear?
> // Aggregate the user counts of by region
> KTable<String, Long> regionCounts = userRegions
> // Count by region
> // We do not need to specify any explict serdes because the key and
> value types do not change
> .groupBy((userId, region) -> KeyValue.pair(region, region))
> .count("CountsByRegion")
> // discard any regions with only 1 user
> .filter((regionName, count) -> count >= 2);
> // Note: The following operations would NOT be needed for the actual
> users-per-region
> // computation, which would normally stop at the filter() above. We use
> the operations
> // below only to "massage" the output data so it is easier to inspect on
> the console via
> // kafka-console-consumer.
> //
> KStream<String, Long> regionCountsForConsole = regionCounts
> // get rid of windows (and the underlying KTable) by transforming the
> KTable to a KStream
> .toStream()
> // sanitize the output by removing null record values (again, we do
> this only so that the
> // output is easier to read via kafka-console-consumer combined with
> LongDeserializer
> // because LongDeserializer fails on null values, and even though we
> could configure
> // kafka-console-consumer to skip messages on error the output still
> wouldn't look pretty)
> .filter((regionName, count) -> count != null);
> // write to the result topic, we need to override the value serializer to
> for type long
> regionCountsForConsole.to(stringSerde, longSerde, "LargeRegions");
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)