Motivated both by Martin and our recent use-case, I updated <https://github.com/mbalassi/incubator-flink/commit/a6cd742958dace6ead896af189983933470d8eb1> the groupBys and aggregations for the Streaming API to work also on arrays by default.
I think it would probably make sense to do something similar for the batch too: DataStream<double[500]> ds = ... ; ds.groupBy(100,200).sum(10); (Here we group by the 100 and 200th field and sum the 10th) What do you think? Gyula On Tue, Oct 21, 2014 at 7:04 PM, Martin Neumann <mneum...@spotify.com> wrote: > There was not enough time to clean it up and gold plate it. He got semi > horrible java code now with some explanation how it would look in scala. > My colleague was asking for a quick (and dirty) job, so taking more time on > it would have defied the purpose of the whole thing a bit. > > In any case thanks for the advice, hopefully I found us another Flink > supporter. > > On Tue, Oct 21, 2014 at 3:52 PM, Stephan Ewen <se...@apache.org> wrote: > > > Hej, > > > > Do you want to use Scala? You can use simple case classes there and use > > fields directly as keys, it will look very elegant... > > > > If you want to stick with Java, you can actually use POJOs (Robert just > > corrected me, expression keys should be available there) > > > > Can you define a class > > > > public class MyClass { > > public String id; > > public int someValue; > > public double anotherValue; > > ... > > } > > > > and then run a program like this: > > > > DataSet<MyClass> data = env.readAsText(...).map(new Parser()); > > > > data.groupBy("id").sort("someValue").reduceGroup(new > > GroupReduceFunction(...)); > > > > > > Feel free to post your program here so we can give you comments! > > > > Greetings, > > Stephan > > > > > > > > On Tue, Oct 21, 2014 at 3:32 PM, Martin Neumann <mneum...@spotify.com> > > wrote: > > > > > Nope, > > > > > > but I cant filter out the useless data since the program I'm comparing > to > > > does not either. The point is to prove to one of my Colleague that > Flink > > > > > > Spark. > > > The Spark program runs out of memory and crashes when just doing a > simple > > > group and counting the number of items. > > > > > > This is also one of the reasons I ask for what is the best style of > doing > > > this so I can get it as clean as possible to compare it to Spark. > > > > > > cheers Martin > > > > > > > > > On Tue, Oct 21, 2014 at 3:07 PM, Aljoscha Krettek <aljos...@apache.org > > > > > wrote: > > > > > > > By the way, do you actually need all those 54 columns in your job? > > > > > > > > On Tue, Oct 21, 2014 at 3:02 PM, Martin Neumann < > mneum...@spotify.com> > > > > wrote: > > > > > I will go with that workaround, however I would have preferred if I > > > could > > > > > have done that directly with the API instead of doing Map/Reduce > like > > > > > Key/Value tuples again :-) > > > > > > > > > > By the way is there a simple function to count the number of items > > in a > > > > > reduce group? It feels stupid to write a GroupReduce that just > > iterates > > > > and > > > > > increments a counter. > > > > > > > > > > cheers Martin > > > > > > > > > > On Tue, Oct 21, 2014 at 2:54 PM, Robert Metzger < > rmetz...@apache.org > > > > > > > wrote: > > > > > > > > > >> Yes, for sorted groups, you need to use Pojos or Tuples. > > > > >> I think you have to split the input lines manually, with a mapper. > > > > >> How about using a TupleN<...> with only the fields you need? > > (returned > > > > by > > > > >> the mapper) > > > > >> > > > > >> if you need all fields, you could also use a Tuple2<String, > > String[]> > > > > where > > > > >> the first position is the sort key? > > > > >> > > > > >> > > > > >> > > > > >> On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora <gyf...@apache.org> > > > wrote: > > > > >> > > > > >> > I am not sure how you should go about that, let’s wait for some > > > > feedback > > > > >> > from the others. > > > > >> > > > > > >> > Until then you can always map the array to (array, keyfield) and > > use > > > > >> > groupBy(1). > > > > >> > > > > > >> > > > > > >> > > On 21 Oct 2014, at 14:17, Martin Neumann < > mneum...@spotify.com> > > > > wrote: > > > > >> > > > > > > >> > > Hej, > > > > >> > > > > > > >> > > Unfortunately .sort() cannot take a key extractor, would I > have > > to > > > > do > > > > >> the > > > > >> > > sort myself then? > > > > >> > > > > > > >> > > cheers Martin > > > > >> > > > > > > >> > > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora < > gyf...@apache.org> > > > > wrote: > > > > >> > > > > > > >> > >> Hey, > > > > >> > >> > > > > >> > >> Using arrays is probably a convenient way to do so. > > > > >> > >> > > > > >> > >> I think the way you described the groupBy only works for > tuples > > > > now. > > > > >> To > > > > >> > do > > > > >> > >> the grouping on the array field, you would need to create a > key > > > > >> > extractor > > > > >> > >> for this and pass that to groupBy. > > > > >> > >> > > > > >> > >> Actually we have some use-cases like this for streaming so we > > are > > > > >> > thinking > > > > >> > >> of writing a wrapper for the array types that would behave as > > you > > > > >> > described. > > > > >> > >> > > > > >> > >> Regards, > > > > >> > >> Gyula > > > > >> > >> > > > > >> > >>> On 21 Oct 2014, at 14:03, Martin Neumann < > > mneum...@spotify.com> > > > > >> wrote: > > > > >> > >>> > > > > >> > >>> Hej, > > > > >> > >>> > > > > >> > >>> I have a csv file with 54 columns each of them is string > (for > > > > now). I > > > > >> > >> need > > > > >> > >>> to group and sort them on field 15. > > > > >> > >>> > > > > >> > >>> Whats the best way to load the data into Flink? > > > > >> > >>> There is no Tuple54 (and the <> would look awful anyway with > > 54 > > > > times > > > > >> > >>> String in it). > > > > >> > >>> My current Idea is to write a Mapper and split the string to > > > > Arrays > > > > >> of > > > > >> > >>> Strings would grouping and sorting work on this? > > > > >> > >>> > > > > >> > >>> So can I do something like this or does that only work on > > > tuples: > > > > >> > >>> Dataset<String[]> ds; > > > > >> > >>> ds.groupBy(15).sort(20. ANY) > > > > >> > >>> > > > > >> > >>> cheers Martin > > > > >> > >> > > > > >> > >> > > > > >> > > > > > >> > > > > > >> > > > > > > > > > >