Hi Leonidas! What you are doing should actually be supported. Do you have more of the stack-trace?
It seems that there is some non-serializable part somewhere in the GenericTypeComparator.. Stephan On Wed, Jul 30, 2014 at 11:39 PM, Leonidas Fegaras <[email protected]> wrote: > Hi Ufuk, > Your getKey returns a String, so it's very simple. Mine must return a > custom type (FData). So my getKey gets an FData and returns a different > FData. I just made it identical to show you the error. > So my question now is this: can getKey return a Comparable custom type or > it must always be a simple type, such as String? > Thanks > Leonidas > PS. Should your WC class be Serializable? > > > > On 07/30/2014 04:26 PM, Ufuk Celebi wrote: > >> Hey Leonidas, >> >> I think the problem is with the KeySelector. The key selector should >> specify which field of your custom type should be used to do the grouping, >> but you are currently just returning the same object. >> >> So you would have to think about which fields define the separate groups. >> For example with a custom type for word counts, where you want to group on >> distinct words: >> >> public class WC { >> public String word; >> public int count; >> // [...] >> } >> >> input.groupBy(new KeySelector<WC, String>() { >> public String getKey(WC wc) { >> return wc.word; >> } >> }).reduce(...); >> >> Does this help? Feel free to get back if you have further questions! :-) >> >> Ufuk >> >> On 30 Jul 2014, at 23:14, Leonidas Fegaras <[email protected]> wrote: >> >> Hi, >>> I am trying to do a groupBy over a DataSet with a custom type (not a >>> Tuple): >>> >>> public class FData implements Serializable, Comparable<FData> { >>> public ... data; >>> public FData () { ... } >>> @Override >>> public int compareTo ( FData x ) { return data.compareTo(x.data); } >>> ... >>> } >>> >>> Methods map and flatMap work fine on DataSet<FData>. But I have a >>> problem with the following groupBy code: >>> >>> s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer()); >>> >>> where s is a DataSet<FData> and the classes are defined as follows: >>> >>> public static final class GroupbyKey extends KeySelector<FData,FData> { >>> @Override >>> public FData getKey ( FData value ) { return value; } >>> } >>> public static final class GroupbyReducer extends >>> GroupReduceFunction<FData,FData> { >>> @Override >>> public void reduce ( final Iterator<FData> values, Collector<FData> >>> out ) {} >>> } >>> >>> This gives me the following error: >>> >>> org.apache.flink.compiler.CompilerException: Error translating node >>> 'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ >>> GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties >>> [ordering=null, grouped=null, unique=null] ]]': Could not serialize >>> comparator into the configuration. >>> at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator. >>> preVisit(NepheleJobGraphGenerator.java:346) >>> at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator. >>> preVisit(NepheleJobGraphGenerator.java:100) >>> at org.apache.flink.compiler.plan.SingleInputPlanNode. >>> accept(SingleInputPlanNode.java:145) >>> at org.apache.flink.compiler.plan.SingleInputPlanNode. >>> accept(SingleInputPlanNode.java:146) >>> at org.apache.flink.compiler.plan.OptimizedPlan.accept( >>> OptimizedPlan.java:165) >>> at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator. >>> compileJobGraph(NepheleJobGraphGenerator.java:170) >>> at org.apache.flink.client.program.Client.getJobGraph( >>> Client.java:214) >>> ... >>> >>> (I tried to make the example as simple as possible). >>> What is the problem here? Do I need to implement FData with a different >>> interface? >>> Thanks >>> Leonidas Fegaras >>> >> . >> >> >
