This should directly go into the API, IMO. As I said, there are several open JIRAs for this issue.
2015-01-21 22:29 GMT+01:00 Felix Neutatz <neut...@googlemail.com>: > Thanks, @Fabian, your workaround works :) > > But I think this feature is really missing. Shall we add this functionality > natively or via the proposed lib package? > > 2015-01-21 20:38 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > > > Chesnay is right. > > Right now, it is not possible to do want you want in a straightforward > way > > because Flink does not support to fully sort a data set (there are > several > > related issues in JIRA). > > > > A workaround would be to attach a constant value to each tuple, group on > > that (all tuples are sent to the same group), sort that group, and apply > > the first operator. > > > > 2015-01-21 20:22 GMT+01:00 Chesnay Schepler < > chesnay.schep...@fu-berlin.de > > >: > > > > > If i remember correctly first() returns the first n values for every > > > group. the javadocs actually don't make this behaviour very clear. > > > > > > > > > On 21.01.2015 19:18, Felix Neutatz wrote: > > > > > >> Hi, > > >> > > >> my use case is the following: > > >> > > >> I have a Tuple2<String,Long>. I want to group by the String and sum up > > the > > >> Long values accordingly. This works fine with these lines: > > >> > > >> DataSet<Lineitem> lineitems = getLineitemDataSet(env); > > >> lineitems.project(new int > > []{3,0}).groupBy(0).aggregate(Aggregations.SUM, > > >> 1); > > >> > > >> After the aggregation I want to print the 10 groups with the highest > > sum, > > >> like: > > >> > > >> string1, 100L > > >> string2, 50L > > >> string3, 1L > > >> > > >> I tried that: > > >> > > >> lineitems.project(new int > > []{3,0}).groupBy(0).aggregate(Aggregations.SUM, > > >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); > > >> > > >> But instead of 3 records, I get a lot more. > > >> > > >> Can see my error? > > >> > > >> Best regards, > > >> > > >> Felix > > >> > > >> > > > > > >