Addendum: I made a quick fix for the POJOs to support the expression keys
for reducers. The example from the above mail works with the code in this
branch: https://github.com/StephanEwen/incubator-flink/commits/pojofix


On Thu, Jul 31, 2014 at 3:06 AM, Stephan Ewen <[email protected]> wrote:

> Yep, there is a flaw in the POJO Code, it incorrectly replaces the
> GenericTypes:
>
> @Leonidas: To explain what is going on:
>
>   - Flink determines the types of the functions via reflection and build
> up its own model of representing them at runtime. It handles basic types
> (int, String, double, ...), arrays, and tuples in a special way, the rest
> is treated in a generic fashion. The FData class is such a "generic" type
> in flink.
>
>   - We recently added experimental code to analyze those types and
> represent their contained fields in a transparent way. Those are the POJO
> types, the code at the bottom illustrates what they can do.
>   They replace the "generic types" at several levels, but their
> implementation is incomplete currently
>
>
> A few remarks on how to best use the system:
>   - Flink keeps data always in a serialized form. That allows it to
> operate very robust with respect to memory pressure, spilling, etc.
>
>   - A way in which we gain efficiency is to access in that binary data
> only what is really necessary (only the parts that make up the key, for
> example).
>
>   - If you define the entire object to be the key, you prevent the system
> from doing these optimizations. If you can actually define which part is
> the key, you allow it more efficient operation.
>
>
>
> ===================================
>
> Example of PoJos and Expression Fields
>
> public static void main(String[] args) throws Exception {
>  ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>  DataSet<FData> data = env.fromElements(new FData("some"), new
> FData("test"), new FData("POJOs"));
>  data.groupBy("theString").reduceGroup(new GroupbyReducer()).print();
>
> env.execute();
> }
>
> public static class FData implements Serializable, Comparable<FData> {
>  public String theString;
>
>  public FData () {
> theString = "";
> }
>  public FData (String data) {
> this.theString = data;
>  }
>  @Override
>  public int compareTo ( FData x ) { return
> theString.compareTo(x.theString); }
>  @Override
> public String toString() {
> return theString;
>  }
> }
>
> public static final class GroupbyReducer extends
> GroupReduceFunction<FData,FData> {
>  @Override
> public void reduce ( Iterator<FData> values, Collector<FData> out ) {
>  while (values.hasNext()) {
> out.collect(values.next());
>  }
> }
> }
>

Reply via email to