Hi,
I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):
public class FData implements Serializable, Comparable<FData> {
public ... data;
public FData () { ... }
@Override
public int compareTo ( FData x ) { return data.compareTo(x.data); }
...
}
Methods map and flatMap work fine on DataSet<FData>. But I have a
problem with the following groupBy code:
s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());
where s is a DataSet<FData> and the classes are defined as follows:
public static final class GroupbyKey extends KeySelector<FData,FData> {
@Override
public FData getKey ( FData value ) { return value; }
}
public static final class GroupbyReducer extends
GroupReduceFunction<FData,FData> {
@Override
public void reduce ( final Iterator<FData> values, Collector<FData>
out ) {}
}
This gives me the following error:
org.apache.flink.compiler.CompilerException: Error translating node
'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[
GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties
[ordering=null, grouped=null, unique=null] ]]': Could not serialize
comparator into the configuration.
at
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346)
at
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146)
at
org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
...
(I tried to make the example as simple as possible).
What is the problem here? Do I need to implement FData with a different
interface?
Thanks
Leonidas Fegaras