[ 
https://issues.apache.org/jira/browse/FLINK-970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14076163#comment-14076163
 ] 

Chesnay Schepler commented on FLINK-970:
----------------------------------------

lets see if i got it right this time:



new FirstReducer class:
{code:java}
//there is no combinable annotation here!
public class FirstReducer<T> extends GroupReduceFunction<T, T> {
        private final int count;
        public FirstReducer(int n) {
                this.count = n;}
        @Override
        public void reduce(Iterator<T> values, Collector<T> out) throws 
Exception {
                for (int x = 0; x < count & values.hasNext(); x++) {
                        out.collect(values.next());}}
        @Override
        public void combine(Iterator<T> values, Collector<T> out) throws 
Exception {
                for (int x = 0; x < count & values.hasNext(); x++) {
                        out.collect(values.next());}}}
{code}

first() method inside DataSet:
{code:java}
public ReduceGroupOperator<T, T> first(int n) {
                return reduceGroup(new FirstReducer(n));
        }
{code}
Per the Java API documentation, this should work.??"A GroupReduce 
transformation on a full DataSet cannot be done in parallel if the 
GroupReduceFunction is not combinable."??

first() method inside (Un-)SortedGrouping:
{code:java}
public ReduceGroupOperator<T, T> first(int n) {
        ReduceGroupOperator<T, T>  rgo = reduceGroup(new FirstReducer(n));
        rgo.setCombinable(true);
        return rgo;}
{code}
here is what i *think* it does:
{code}
for group:
        for partition:
                emit up to n elements (using the reduce function)
        gather all elements, end emit once again up to n elements (using the 
combine function)
{code}
so the result is that we have up to n*group elements.

> Implement a first(n) operator
> -----------------------------
>
>                 Key: FLINK-970
>                 URL: https://issues.apache.org/jira/browse/FLINK-970
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Timo Walther
>            Assignee: Chesnay Schepler
>            Priority: Minor
>
> It is only syntactic sugar, but I had many cases where I just needed the 
> first element  or the first 2 elements in a GroupReduce.
> E.g. Instead of
> {code:java}
> .reduceGroup(new GroupReduceFunction<String, String>() {
>                                       @Override
>                                       public void reduce(Iterator<String> 
> values, Collector<String> out) throws Exception {
>                                               out.collect(values.next());
>                                       }
>                               })
> {code}
> {code:java}
> .first()
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to