[
https://issues.apache.org/jira/browse/KAFKA-3511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234770#comment-15234770
]
Michael Noll commented on KAFKA-3511:
-------------------------------------
Hmm. I'd prefer an API that allows me to write:
* {{stream.max()}} rather than
* {{stream.aggregate(new Max())}}
That said I think we are talking about two related but different questions:
1. Which aggregations do we provide out of the box?
2. How do we expose these built-in aggregations?
Regarding point 1, we could consider these built-in aggregations (cf. Scala's
collection API, for example), followed by an example use case. Perhaps, in
general, it might be a good idea to think in terms of common use cases --
taking into account which would make sense on windowed streams, which on
non-windowed streams, which on KStream, which on KTable, which on both -- when
deciding whether/which aggregations we would provide out of the box. Again, I
am listing concrete function names here but the purpose is to list their
*functionality* (point 2 is meant to address how this functionality is being
exposed).
* {{count}} / {{countBy}} -- "how many pageviews between 4AM and 5 AM?"
(countBy and filter are related via filter.count)
* {{min}} / {{minBy}} -- "the page with the least views"
* {{max}} / {{maxBy}} -- "the page with the most views", "the IP address with
the most hits (e.g. to implement throttling, blocklists)"
* {{sum}} -- "total number of pageviews per user"
* {{distinct}} -- "distinct elements in the stream" (this is related to
changelog stream aka KTable and the stream-table duality; perhaps distinct
would make sense only for KStream, as "distinct" is essentially a native
feature of KTable)
* Somewhat related: we may also want to consider a "sortBy" and "take"
operators to help implement "give me the top N things" type of use cases; min
and max only return a single element. Idea:
"windowedStream.sortBy(...).take(5)".
Note: Unfortunately, because we use Java, we can't benefit from implicits /
typeclasses as we could e.g. in Scala. For example, here's the function
signature of {{sum}} in Scala:
{code}
def sum[B >: Int](implicit num: Numeric[B]): B
{code}
The benefit is that {{sum}} works automagically as long as type {{B}} "looks
like a number", which is a great help when you need to implement an
out-of-the-box version of {{sum}} that does the right thing, always.
Regarding point 2, my personal opinion is that *some* basic functionality
should be provided via dedicated operators -- call it sugar if you will -- to
achieve a more pleasant API for common use cases (e.g. "count()" rather than
"aggregate(new Count())"). Of course, we could opt to implement these
dedicated operators via "aggregate(new Func())" behind the scenes, which may
help with composability when one wants to write a new API layer on top.
> Provide built-in aggregators sum() and avg() in Kafka Streams DSL
> -----------------------------------------------------------------
>
> Key: KAFKA-3511
> URL: https://issues.apache.org/jira/browse/KAFKA-3511
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Guozhang Wang
> Labels: api, newbie
> Fix For: 0.10.0.0
>
>
> Currently we only have one built-in aggregate function count() in the Kafka
> Streams DSL, but we want to add more aggregation functions like sum() and
> avg().
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)