Yep, there is a flaw in the POJO Code, it incorrectly replaces the
GenericTypes:

@Leonidas: To explain what is going on:

  - Flink determines the types of the functions via reflection and build up
its own model of representing them at runtime. It handles basic types (int,
String, double, ...), arrays, and tuples in a special way, the rest is
treated in a generic fashion. The FData class is such a "generic" type in
flink.

  - We recently added experimental code to analyze those types and
represent their contained fields in a transparent way. Those are the POJO
types, the code at the bottom illustrates what they can do.
  They replace the "generic types" at several levels, but their
implementation is incomplete currently


A few remarks on how to best use the system:
  - Flink keeps data always in a serialized form. That allows it to operate
very robust with respect to memory pressure, spilling, etc.

  - A way in which we gain efficiency is to access in that binary data only
what is really necessary (only the parts that make up the key, for example).

  - If you define the entire object to be the key, you prevent the system
from doing these optimizations. If you can actually define which part is
the key, you allow it more efficient operation.



===================================

Example of PoJos and Expression Fields

public static void main(String[] args) throws Exception {
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 DataSet<FData> data = env.fromElements(new FData("some"), new
FData("test"), new FData("POJOs"));
 data.groupBy("theString").reduceGroup(new GroupbyReducer()).print();

env.execute();
}

public static class FData implements Serializable, Comparable<FData> {
 public String theString;

public FData () {
theString = "";
}
 public FData (String data) {
this.theString = data;
}
 @Override
public int compareTo ( FData x ) { return theString.compareTo(x.theString);
}
 @Override
public String toString() {
return theString;
}
}

public static final class GroupbyReducer extends
GroupReduceFunction<FData,FData> {
@Override
public void reduce ( Iterator<FData> values, Collector<FData> out ) {
while (values.hasNext()) {
out.collect(values.next());
}
}
}

Reply via email to