I forgot to mention that in this code my out.collect method is outputting a
tuple of Map[key,value] and the count
as Int.

On Fri, Apr 29, 2016 at 4:53 PM, Punit Naik <naik.puni...@gmail.com> wrote:

> Anyways, I fixed it. To you groupBy you should attach this:
>
> .reduceGroup {
>       (in, out: org.apache.flink.util.Collector[(Map[key,value], Int)]) =>
>         var v:Int = 0;
>         var k:Map[key,value]=Map()
>         for (t <- in) {
>           v+=1;
>           k=t
>         }
>         out.collect((k,v))
>     }
>
> On Fri, Apr 29, 2016 at 2:58 PM, Punit Naik <naik.puni...@gmail.com>
> wrote:
>
>> What if after grouping I wanted to count the occurrences of the key
>> "ga_date"?
>>
>> On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino <
>> stefano.bagh...@radicalbit.io> wrote:
>>
>>> The `get` method on the Scala map returns an Option, which is not
>>> (currently) a valid key type for Flink (but there's ongoing work on this
>>> [1]). Flink must be aware of how to use a particular type as a key if you
>>> want to group by a value of said type. See the advanced DataSet concepts
>>> in
>>> the official Flink training for more info on this [2].
>>>
>>> If you're just playing around the easy way to make it work is to directly
>>> apply the key to the map (or use the apply method). Beware that you're
>>> prone to exceptions in this way. A cleaner solution would be to write
>>> your
>>> own KeySelector for the Option type.
>>>
>>> val k=j.groupBy(_("ga_date")) or
>>> val k=j.groupBy(ga => ga("ga_date")) or
>>> val k=j.groupBy(_.apply("ga_date"))
>>>
>>> As a side note, I believe the user mailing list may be more appropriate
>>> for
>>> this kind of issues.
>>>
>>> [1]: https://issues.apache.org/jira/browse/FLINK-2673
>>> [2]:
>>> http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html
>>>
>>> On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <naik.puni...@gmail.com>
>>> wrote:
>>>
>>> > Below is my code:
>>> >
>>> > val env = ExecutionEnvironment.getExecutionEnvironment
>>> > val data=env.readTextFile("file:///home/punit/test").flatMap( line =>
>>> > JSON.parseFull(line) )
>>> > val j=data.flatMap{ _ match {case map: Map[String, Any] =>
>>> >                             {List(Map("ga_date" ->
>>> > map.get("ga_dateHour").get.toString().substring(0,
>>> > map.get("ga_dateHour").get.toString().length()-2))) }}}
>>> >
>>> > val k=j.groupBy(_.get("ga_date"))
>>> >
>>> > But when I execute this, it throws an exception saying:
>>> >
>>> > org.apache.flink.api.common.InvalidProgramException: Return type
>>> > Option[String] of KeySelector class
>>> > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type
>>> >
>>> > Where am I going wrong?
>>> > --
>>> > Thank You
>>> >
>>> > Regards
>>> >
>>> > Punit Naik
>>> >
>>>
>>>
>>>
>>> --
>>> BR,
>>> Stefano Baghino
>>>
>>> Software Engineer @ Radicalbit
>>>
>>
>>
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>



-- 
Thank You

Regards

Punit Naik

Reply via email to