Hi everybody, First, I want to introduce myself to the community. I am a PhD student who wants to work with and improve Flink.
Second, I thought to work on improving aggregations as a start. My first goal
is to simplify the computaton of a field average. Basically, I want to turn
this plan:
val input = env.fromCollection( Array(1L, 2L, 3L, 4L) )
input
.map { in => (in, 1L) }
.sum(0).andSum(1)
.map { in => in._1.toDouble / in._2.toDouble }
.print
into this:
// val input = ...
input.average(0).print()
My basic idea is to internally still add the counter field and execute the map
and sum steps but to hide them from the user.
Next, I want to support multiple aggregations so one can write something like:
input.min(0).max(0).sum(0).count(0).average(0)
Internally, there should only be one pass over the input data and average
should reuse the work done by sum and count.
In September there was some discussion [1] on the semantics of the min/max
aggregations vs. minBy/maxBy. The consensus was that min/max should not simply
return the respective field value but return the entire tuple. However, for
count/sum/average there is no specific tuple and it would also not work for
combinations of min/max.
One possible route is to simply return a random element, similar to MySQL. I
think this can be very surprising to the user especially when min/max are
combined.
Another possibility is to return the tuple only for single invocations of min
or max and return the field value for the other aggregation functions or
combinations. This is also inconstent but appears to be more inline with
people's expectation. Also, there might be two or more tuples with the same
min/max value and then the question is which should be returned.
I haven't yet thought about aggregations in a streaming context and I would
appreciate any input on this.
Best,
Viktor
[1]
http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Aggregations-td1706.html
signature.asc
Description: Message signed with OpenPGP using GPGMail
