Hi Ufuk,
Your getKey returns a String, so it's very simple. Mine must return a custom type (FData). So my getKey gets an FData and returns a different FData. I just made it identical to show you the error. So my question now is this: can getKey return a Comparable custom type or it must always be a simple type, such as String?
Thanks
Leonidas
PS. Should your WC class be Serializable?


On 07/30/2014 04:26 PM, Ufuk Celebi wrote:
Hey Leonidas,

I think the problem is with the KeySelector. The key selector should specify 
which field of your custom type should be used to do the grouping, but you are 
currently just returning the same object.

So you would have to think about which fields define the separate groups. For 
example with a custom type for word counts, where you want to group on distinct 
words:

public class WC {
     public String word;
     public int count;
     // [...]
}

input.groupBy(new KeySelector<WC, String>() {
     public String getKey(WC wc) {
         return wc.word;
     }
}).reduce(...);

Does this help? Feel free to get back if you have further questions! :-)

Ufuk

On 30 Jul 2014, at 23:14, Leonidas Fegaras <[email protected]> wrote:

Hi,
I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):

public class FData implements Serializable, Comparable<FData> {
    public ... data;
    public FData () { ... }
    @Override
    public int compareTo ( FData x ) { return data.compareTo(x.data); }
...
}

Methods map and flatMap work fine on DataSet<FData>. But I have a problem with 
the following groupBy code:

s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());

where s is a DataSet<FData> and the classes are defined as follows:

public static final class GroupbyKey extends KeySelector<FData,FData> {
   @Override
   public FData getKey ( FData value ) { return value; }
}
public static final class GroupbyReducer extends 
GroupReduceFunction<FData,FData> {
   @Override
   public void reduce ( final Iterator<FData> values, Collector<FData> out ) {}
}

This gives me the following error:

org.apache.flink.compiler.CompilerException: Error translating node 'GroupReduce 
"x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ GlobalProperties 
[partitioning=RANDOM] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] 
]]': Could not serialize comparator into the configuration.
    at 
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346)
    at 
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100)
    at 
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145)
    at 
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146)
    at 
org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
    at 
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
    at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
...

(I tried to make the example as simple as possible).
What is the problem here? Do I need to implement FData with a different 
interface?
Thanks
Leonidas Fegaras
.


Reply via email to