Hi Ufuk,
Your getKey returns a String, so it's very simple. Mine must return a
custom type (FData). So my getKey gets an FData and returns a different
FData. I just made it identical to show you the error.
So my question now is this: can getKey return a Comparable custom type
or it must always be a simple type, such as String?
Thanks
Leonidas
PS. Should your WC class be Serializable?
On 07/30/2014 04:26 PM, Ufuk Celebi wrote:
Hey Leonidas,
I think the problem is with the KeySelector. The key selector should specify
which field of your custom type should be used to do the grouping, but you are
currently just returning the same object.
So you would have to think about which fields define the separate groups. For
example with a custom type for word counts, where you want to group on distinct
words:
public class WC {
public String word;
public int count;
// [...]
}
input.groupBy(new KeySelector<WC, String>() {
public String getKey(WC wc) {
return wc.word;
}
}).reduce(...);
Does this help? Feel free to get back if you have further questions! :-)
Ufuk
On 30 Jul 2014, at 23:14, Leonidas Fegaras <[email protected]> wrote:
Hi,
I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):
public class FData implements Serializable, Comparable<FData> {
public ... data;
public FData () { ... }
@Override
public int compareTo ( FData x ) { return data.compareTo(x.data); }
...
}
Methods map and flatMap work fine on DataSet<FData>. But I have a problem with
the following groupBy code:
s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());
where s is a DataSet<FData> and the classes are defined as follows:
public static final class GroupbyKey extends KeySelector<FData,FData> {
@Override
public FData getKey ( FData value ) { return value; }
}
public static final class GroupbyReducer extends
GroupReduceFunction<FData,FData> {
@Override
public void reduce ( final Iterator<FData> values, Collector<FData> out ) {}
}
This gives me the following error:
org.apache.flink.compiler.CompilerException: Error translating node 'GroupReduce
"x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ GlobalProperties
[partitioning=RANDOM] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null]
]]': Could not serialize comparator into the configuration.
at
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346)
at
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145)
at
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146)
at
org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
at
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
...
(I tried to make the example as simple as possible).
What is the problem here? Do I need to implement FData with a different
interface?
Thanks
Leonidas Fegaras
.