Hey Leonidas,
I think the problem is with the KeySelector. The key selector should specify
which field of your custom type should be used to do the grouping, but you are
currently just returning the same object.
So you would have to think about which fields define the separate groups. For
example with a custom type for word counts, where you want to group on distinct
words:
public class WC {
public String word;
public int count;
// [...]
}
input.groupBy(new KeySelector<WC, String>() {
public String getKey(WC wc) {
return wc.word;
}
}).reduce(...);
Does this help? Feel free to get back if you have further questions! :-)
Ufuk
On 30 Jul 2014, at 23:14, Leonidas Fegaras <[email protected]> wrote:
> Hi,
> I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):
>
> public class FData implements Serializable, Comparable<FData> {
> public ... data;
> public FData () { ... }
> @Override
> public int compareTo ( FData x ) { return data.compareTo(x.data); }
> ...
> }
>
> Methods map and flatMap work fine on DataSet<FData>. But I have a problem
> with the following groupBy code:
>
> s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());
>
> where s is a DataSet<FData> and the classes are defined as follows:
>
> public static final class GroupbyKey extends KeySelector<FData,FData> {
> @Override
> public FData getKey ( FData value ) { return value; }
> }
> public static final class GroupbyReducer extends
> GroupReduceFunction<FData,FData> {
> @Override
> public void reduce ( final Iterator<FData> values, Collector<FData> out ) {}
> }
>
> This gives me the following error:
>
> org.apache.flink.compiler.CompilerException: Error translating node
> 'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[
> GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties [ordering=null,
> grouped=null, unique=null] ]]': Could not serialize comparator into the
> configuration.
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346)
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145)
> at
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146)
> at
> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
> at
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
> at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
> ...
>
> (I tried to make the example as simple as possible).
> What is the problem here? Do I need to implement FData with a different
> interface?
> Thanks
> Leonidas Fegaras