Hey Leonidas,

I think the problem is with the KeySelector. The key selector should specify 
which field of your custom type should be used to do the grouping, but you are 
currently just returning the same object.

So you would have to think about which fields define the separate groups. For 
example with a custom type for word counts, where you want to group on distinct 
words:

public class WC {
    public String word;
    public int count;
    // [...]
}

input.groupBy(new KeySelector<WC, String>() {
    public String getKey(WC wc) {
        return wc.word;
    }
}).reduce(...);

Does this help? Feel free to get back if you have further questions! :-)

Ufuk

On 30 Jul 2014, at 23:14, Leonidas Fegaras <[email protected]> wrote:

> Hi,
> I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):
> 
> public class FData implements Serializable, Comparable<FData> {
>    public ... data;
>    public FData () { ... }
>    @Override
>    public int compareTo ( FData x ) { return data.compareTo(x.data); }
> ...
> }
> 
> Methods map and flatMap work fine on DataSet<FData>. But I have a problem 
> with the following groupBy code:
> 
> s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());
> 
> where s is a DataSet<FData> and the classes are defined as follows:
> 
> public static final class GroupbyKey extends KeySelector<FData,FData> {
>   @Override
>   public FData getKey ( FData value ) { return value; }
> }
> public static final class GroupbyReducer extends 
> GroupReduceFunction<FData,FData> {
>   @Override
>   public void reduce ( final Iterator<FData> values, Collector<FData> out ) {}
> }
> 
> This gives me the following error:
> 
> org.apache.flink.compiler.CompilerException: Error translating node 
> 'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ 
> GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties [ordering=null, 
> grouped=null, unique=null] ]]': Could not serialize comparator into the 
> configuration.
>    at 
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346)
>    at 
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100)
>    at 
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145)
>    at 
> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146)
>    at 
> org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>    at 
> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
>    at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
> ...
> 
> (I tried to make the example as simple as possible).
> What is the problem here? Do I need to implement FData with a different 
> interface?
> Thanks
> Leonidas Fegaras

Reply via email to