[
https://issues.apache.org/jira/browse/FLINK-1269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14222034#comment-14222034
]
Viktor Rosenfeld edited comment on FLINK-1269 at 11/22/14 4:35 PM:
-------------------------------------------------------------------
Just to clarify, given the input in the example by [~uce] my code produces the
tuples (0, 2) and (1, 1) as output. The first element is the group key, the
second the count. This is how I understand [~ssc]'s request.
It is already possible to drop group keys. For example:
{noformat}
DataSet<Tuple3<String, String, Integer>> input = env.fromElements(
new Tuple3<String, String, Integer>("a", "A", 1),
new Tuple3<String, String, Integer>("a", "A", 2),
new Tuple2<String, String, Integer>("a", "B", 0));
DataSet<Tuple3<Integer, Integer>> counts = input.groupBy(0,
1).aggregate(keys(0), count()); // result: ("a", 2), ("a", 1)
{noformat}
The second group key is dropped. Right now, at least one group key is required.
(If no key is specified, all group keys are returned.) However, it would be
simple to drop all of the group keys if requested. The only problem here is to
decide on an API that is concise. For example:
{noformat}
input.groupBy(0).aggregate(keys(), count());
input.groupBy(0).aggregate(dropKeys(), count());
input.groupBy(0).aggregate(count()).withoutKeys();
{noformat}
was (Author: hesk):
Just to clarify, given the input in the example by [~uce] my code produces the
tuples (0, 2) and (1, 1) as output. The first element is the group key, the
second the count. This is how I understand [~ssc]'s request.
It is already possible to drop group keys. For example:
{noformat}
DataSet<Tuple2<String, String, Integer>> input = env.fromElements(
new Tuple2<String, String, Integer>("a", "A", 1),
new Tuple2<String, String, Integer>("a", "A", 2),
new Tuple2<String, String, Integer>("a", "B", 0));
DataSet<Tuple2<Integer, Integer>> counts = input.groupBy(0,
1).aggregate(keys(0), count()); // result: ("a", 2), ("a", 1)
{noformat}
The second group key is dropped. Right now, at least one group key is required.
(If no key is specified, all group keys are returned.) However, it would be
simple to drop all of the group keys if requested. The only problem here is to
decide on an API that is concise. For example:
{noformat}
input.groupBy(0).aggregate(keys(), count());
input.groupBy(0).aggregate(dropKeys(), count());
input.groupBy(0).aggregate(count()).withoutKeys();
{noformat}
> Easy way to "group count" dataset
> ---------------------------------
>
> Key: FLINK-1269
> URL: https://issues.apache.org/jira/browse/FLINK-1269
> Project: Flink
> Issue Type: New Feature
> Components: Java API, Scala API
> Affects Versions: 0.7.0-incubating
> Reporter: Sebastian Schelter
> Assignee: Suneel Marthi
>
> Flink should offer an easy way to group datasets and compute the sizes of the
> resulting groups. This is one of the most essential operations in distributed
> processing, yet it is very hard to implement in Flink.
> I assume it could be a show-stopper for people trying Flink, because at the
> moment, users have to perform the grouping and then write a groupReduce that
> counts the tuples in the group and extracts the group key at the same time.
> Here is what I would semantically expect to happen:
> {noformat}
> def groupCount[T, K](data: DataSet[T], extractKey: (T) => K): DataSet[(K,
> Long)] = {
> data.groupBy { extractKey }
> .reduceGroup { group => countBy(extractKey, group) }
> }
> private[this] def countBy[T, K](extractKey: T => K,
> group: Iterator[T]): (K, Long) = {
> val key = extractKey(group.next())
> var count = 1L
> while (group.hasNext) {
> group.next()
> count += 1
> }
> key -> count
> }
> {noformat}
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)