[ 
https://issues.apache.org/jira/browse/FLINK-65?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-65.
-------------------------------

       Resolution: Fixed
    Fix Version/s:     (was: pre-apache)
                   0.7-incubating
         Assignee: Stephan Ewen

Solved through mapPartition() function, added in 
d4de9774b3237bb1850024b1208640bc50f7adab

> Support custom and efficient (in-memory) pre-aggregations (without Combiner)
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-65
>                 URL: https://issues.apache.org/jira/browse/FLINK-65
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: GitHub Import
>            Assignee: Stephan Ewen
>              Labels: github-import
>             Fix For: 0.7-incubating
>
>
> I use and evaluate Stratosphere in the course of my thesis and want to give 
> feedback on what I found is missing or could be improved. This is written 
> from a user perspective.
> ### Requirement
> We have partitioned input we want group and aggregate (by a grouping-key) to 
> a single or multiple records. The input is  either streamed from hdfs or 
> pipelined from other contracts. Our input is not (necessarily) partitioned by 
> the grouping-key. The input we want to aggregate is the input of any 
> tuple-at-a-time contract such as Map, Cross or Match. We can express this 
> requirement in terms of SQL: “SELECT key, aggregate(value) FROM records GROUP 
> BY key”. Common cases are that the key has cardinality 1, 2, or any other 
> small, medium or high cardinality.
> ### Problem
> Currently only the Combine+Reduce/CoGroup strategy is supported: Forward all 
> data to a Combiner, repartition it’s output by the grouping-key and do the 
> final aggregation in Reduce. **For N record this always involves emission of 
> N intermediate records to the Combiner**. Even though the values are just 
> forwarded to the local Combiner, there is a lot of copying, (de)serialization 
> and probably some networking overhead involved (jobmanager status). Also the 
> user has to manually write the code to serialize the input to a record and 
> include the Combiner. If we want to do a join and aggregate, this could mean 
> that a udf does nothing but to forward the joined tuples to a 
> Combiner/Reducer.
> ### Goal
> Enable efficient in-memory aggregation (for the case where efficiency matters 
> - otherwise I can use Combiner/Reducer approach), reduce the 
> (de)serialization overhead, reduce the “stupid” code to be written by the 
> user (e.g. forward all tuples to a combiner).
> ### Use cases
> Some use cases that would benefit from efficient pre-aggregation
> * Group and aggregate with low group-key cardinality: Let’s assume 
> cardinality is 1 and we want to do a simple aggregation like a sum, it is 
> obviously much more efficient (and easy to code) to just do the 
> pre-aggregation (sum) in the udf, and send a single record to a reducer.
> * Wordcount: “SELECT word, sum(occurence) FROM word-occurences”. In this case 
> occurence is a column that has constant value 1. Very simple to aggragete 
> in-memory.
> * Machine Learning: Accuracy computation: “SELECT correctly-classified, 
> sum(correctly-classified) GROUP BY correctly-classified“. E.g. if we trained 
> a model (e.g. a numeric weight vector for logistic regression) the accuracy 
> is defined as  #correct-classified/#total. I do this in one of my jobs and 
> have currently to emit N records with constant value true or false to 
> Combiner.
> ### Possible solutions
> 1. Give close() the option to write output in all tuple-at-a-time contracts 
> (hadoop-way).
> 2. Give UDF knowledge about whether this is the last element (hasNext). 
> Almost similar to 1.
> 3. Add iterator-option to tuple-at-a-time contracts (map, cross, match). The 
> contracts can be configured to pass an iterator over all records that are 
> pipelined/streamed to this udf. E.g. via 
> CrossContract.builder(MyCross.class).asIterator(true)....build(). I assume 
> that this is easy to implement because the code that calls the udf probably 
> looks like “while (it.hasNext){ udf(it.next) }. Not sure if this is true. The 
> user would then implement a separate stub, e.g. MatchIteratorStub instead of 
> MatchStub.
> 4. Keep it as it is (Combiner is the only way to pre-aggregate)
> ### Discussion
> * The current way, to use a Combiner, is very explicit and gives the system 
> more knowledge about what happens. In an ideal world, the optimizer chooses 
> how to do the pre-aggragation, and we would just define the aggregation 
> function in the combiner. Currently however, we have to hardcode the 
> serialization code that forwards everything to the Combiner and the system 
> would have to understand and modify the udf to get rid of the serialization 
> and to do a direct pre-aggregation.
> * Solutions 1-3 do more or less the same. We can write our own 
> pre-aggregation/combiner code. If cardinality is 1, this is just a counter, 
> if cardinality is medium, we can use a HashTable. After the udf processed all 
> records, it can send a single or multiple pre-aggregated records to the 
> Reducer. This is less explicit, but more powerfull and enables high 
> efficiency. The system however still knows that we do a grouping, because we 
> still have to use Reduce.
> * To make it easy for Hadoop users to switch over, solution 1 or 2 would be 
> fine. Solution 3 is basically the same, but looks a bit different.
> Looking forward to hearing your opinion. I hope I didn't miss anything and 
> the feature is already existing;)
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/65
> Created by: [andrehacker|https://github.com/andrehacker]
> Labels: 
> Created at: Wed Aug 21 16:50:05 CEST 2013
> State: open



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to