Hi,
I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):

public class FData implements Serializable, Comparable<FData> {
    public ... data;
    public FData () { ... }
    @Override
    public int compareTo ( FData x ) { return data.compareTo(x.data); }
...
}

Methods map and flatMap work fine on DataSet<FData>. But I have a problem with the following groupBy code:

s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());

where s is a DataSet<FData> and the classes are defined as follows:

public static final class GroupbyKey extends KeySelector<FData,FData> {
   @Override
   public FData getKey ( FData value ) { return value; }
}
public static final class GroupbyReducer extends GroupReduceFunction<FData,FData> {
   @Override
public void reduce ( final Iterator<FData> values, Collector<FData> out ) {}
}

This gives me the following error:

org.apache.flink.compiler.CompilerException: Error translating node 'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not serialize comparator into the configuration. at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346) at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100) at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145) at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146) at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165) at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
    at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
...

(I tried to make the example as simple as possible).
What is the problem here? Do I need to implement FData with a different interface?
Thanks
Leonidas Fegaras

Reply via email to