I forgot to mention that in this code my out.collect method is outputting a tuple of Map[key,value] and the count as Int.
On Fri, Apr 29, 2016 at 4:53 PM, Punit Naik <naik.puni...@gmail.com> wrote: > Anyways, I fixed it. To you groupBy you should attach this: > > .reduceGroup { > (in, out: org.apache.flink.util.Collector[(Map[key,value], Int)]) => > var v:Int = 0; > var k:Map[key,value]=Map() > for (t <- in) { > v+=1; > k=t > } > out.collect((k,v)) > } > > On Fri, Apr 29, 2016 at 2:58 PM, Punit Naik <naik.puni...@gmail.com> > wrote: > >> What if after grouping I wanted to count the occurrences of the key >> "ga_date"? >> >> On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino < >> stefano.bagh...@radicalbit.io> wrote: >> >>> The `get` method on the Scala map returns an Option, which is not >>> (currently) a valid key type for Flink (but there's ongoing work on this >>> [1]). Flink must be aware of how to use a particular type as a key if you >>> want to group by a value of said type. See the advanced DataSet concepts >>> in >>> the official Flink training for more info on this [2]. >>> >>> If you're just playing around the easy way to make it work is to directly >>> apply the key to the map (or use the apply method). Beware that you're >>> prone to exceptions in this way. A cleaner solution would be to write >>> your >>> own KeySelector for the Option type. >>> >>> val k=j.groupBy(_("ga_date")) or >>> val k=j.groupBy(ga => ga("ga_date")) or >>> val k=j.groupBy(_.apply("ga_date")) >>> >>> As a side note, I believe the user mailing list may be more appropriate >>> for >>> this kind of issues. >>> >>> [1]: https://issues.apache.org/jira/browse/FLINK-2673 >>> [2]: >>> http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html >>> >>> On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <naik.puni...@gmail.com> >>> wrote: >>> >>> > Below is my code: >>> > >>> > val env = ExecutionEnvironment.getExecutionEnvironment >>> > val data=env.readTextFile("file:///home/punit/test").flatMap( line => >>> > JSON.parseFull(line) ) >>> > val j=data.flatMap{ _ match {case map: Map[String, Any] => >>> > {List(Map("ga_date" -> >>> > map.get("ga_dateHour").get.toString().substring(0, >>> > map.get("ga_dateHour").get.toString().length()-2))) }}} >>> > >>> > val k=j.groupBy(_.get("ga_date")) >>> > >>> > But when I execute this, it throws an exception saying: >>> > >>> > org.apache.flink.api.common.InvalidProgramException: Return type >>> > Option[String] of KeySelector class >>> > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type >>> > >>> > Where am I going wrong? >>> > -- >>> > Thank You >>> > >>> > Regards >>> > >>> > Punit Naik >>> > >>> >>> >>> >>> -- >>> BR, >>> Stefano Baghino >>> >>> Software Engineer @ Radicalbit >>> >> >> >> >> -- >> Thank You >> >> Regards >> >> Punit Naik >> > > > > -- > Thank You > > Regards > > Punit Naik > -- Thank You Regards Punit Naik