Addendum: I made a quick fix for the POJOs to support the expression keys for reducers. The example from the above mail works with the code in this branch: https://github.com/StephanEwen/incubator-flink/commits/pojofix
On Thu, Jul 31, 2014 at 3:06 AM, Stephan Ewen <[email protected]> wrote: > Yep, there is a flaw in the POJO Code, it incorrectly replaces the > GenericTypes: > > @Leonidas: To explain what is going on: > > - Flink determines the types of the functions via reflection and build > up its own model of representing them at runtime. It handles basic types > (int, String, double, ...), arrays, and tuples in a special way, the rest > is treated in a generic fashion. The FData class is such a "generic" type > in flink. > > - We recently added experimental code to analyze those types and > represent their contained fields in a transparent way. Those are the POJO > types, the code at the bottom illustrates what they can do. > They replace the "generic types" at several levels, but their > implementation is incomplete currently > > > A few remarks on how to best use the system: > - Flink keeps data always in a serialized form. That allows it to > operate very robust with respect to memory pressure, spilling, etc. > > - A way in which we gain efficiency is to access in that binary data > only what is really necessary (only the parts that make up the key, for > example). > > - If you define the entire object to be the key, you prevent the system > from doing these optimizations. If you can actually define which part is > the key, you allow it more efficient operation. > > > > =================================== > > Example of PoJos and Expression Fields > > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet<FData> data = env.fromElements(new FData("some"), new > FData("test"), new FData("POJOs")); > data.groupBy("theString").reduceGroup(new GroupbyReducer()).print(); > > env.execute(); > } > > public static class FData implements Serializable, Comparable<FData> { > public String theString; > > public FData () { > theString = ""; > } > public FData (String data) { > this.theString = data; > } > @Override > public int compareTo ( FData x ) { return > theString.compareTo(x.theString); } > @Override > public String toString() { > return theString; > } > } > > public static final class GroupbyReducer extends > GroupReduceFunction<FData,FData> { > @Override > public void reduce ( Iterator<FData> values, Collector<FData> out ) { > while (values.hasNext()) { > out.collect(values.next()); > } > } > } >
