[ 
https://issues.apache.org/jira/browse/FLINK-1269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14221553#comment-14221553
 ] 

Ufuk Celebi commented on FLINK-1269:
------------------------------------

I think Sebastian's suggestion and Viktor's aggregations are two different 
things:

{code}
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<Integer, Integer>> input = env.fromElements(
        new Tuple2<Integer, Integer>(0, 1),
        new Tuple2<Integer, Integer>(0, 2),
        new Tuple2<Integer, Integer>(1, 0));

DataSet<Long> counts = input.groupBy(0).count(); // result: 2, 1
{code}

>From tests in Viktor's branch I think his changes are a better API to what we 
>have right now (SQL-like aggregations):

{code}
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<Integer, Integer>> input = env.fromElements(
        new Tuple2<Integer, Integer>(0, 1),
        new Tuple2<Integer, Integer>(0, 2),
        new Tuple2<Integer, Integer>(1, 0));

DataSet<Tuple2<Integer, Integer>> counts = input.groupBy(0).count(); // result: 
(2, 2), (1, 0)
{code}

Is this right?

There will also be further overlap with an "action"-style count operator, which 
returns a result to the client (see 
https://github.com/apache/incubator-flink/pull/210).

For the first example, I had a PR some time ago (FLINK-758 and 
https://github.com/apache/incubator-flink/pull/63), which I closed, because it 
wasn't really picked up.

All and all, I think we should aim for an API, which supports all three 
versions of count (and other aggregations where applicable).

> Easy way to "group count" dataset
> ---------------------------------
>
>                 Key: FLINK-1269
>                 URL: https://issues.apache.org/jira/browse/FLINK-1269
>             Project: Flink
>          Issue Type: New Feature
>          Components: Java API, Scala API
>    Affects Versions: 0.7.0-incubating
>            Reporter: Sebastian Schelter
>            Assignee: Viktor Rosenfeld
>
> Flink should offer an easy way to group datasets and compute the sizes of the 
> resulting groups. This is one of the most essential operations in distributed 
> processing, yet it is very hard to implement in Flink.
> I assume it could be a show-stopper for people trying Flink, because at the 
> moment, users have to perform the grouping and then write a groupReduce that 
> counts the tuples in the group and extracts the group key at the same time.
> Here is what I would semantically expect to happen:
> {noformat}
> def groupCount[T, K](data: DataSet[T], extractKey: (T) => K): DataSet[(K, 
> Long)] = {
>     data.groupBy { extractKey }
>         .reduceGroup { group => countBy(extractKey, group) }
>   }
>   private[this] def countBy[T, K](extractKey: T => K,
>                                   group: Iterator[T]): (K, Long) = {
>     val key = extractKey(group.next())
>     var count = 1L
>     while (group.hasNext) {
>       group.next()
>       count += 1
>     }
>     key -> count
>   }
> {noformat}
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to