Apache Wiki
Mon, 16 Nov 2009 09:52:21 -0800
Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The "PigAccumulatorSpec" page has been changed by yinghe. http://wiki.apache.org/pig/PigAccumulatorSpec?action=diff&rev1=1&rev2=2 -------------------------------------------------- = Accumulator UDF = - == Introduction == For data processing with PIG, it is very common to call "group by" or "cogroup" to group input tuples by a key, then call one or more UDFs to process each group. For example: @@ -11, +10 @@ C = foreach B generate group, myUDF1(A), myUDF2(A, 'some_param'), myUDF3(A); store C into 'myresult'; }}} - - The current implementation is during grouping process, all tuples that belongs to the same key are materialized into a DataBag, and the DataBag(s) are passed to the UDFs. This causes performance and memory problem. For a large key, if its tuples can not fit into memory, performance has to sacrifice to spill extra data into disk. + The current implementation is during grouping process, all tuples that belongs to the same key are materialized into a DataBag, and the DataBag(s) are passed to the UDFs. This causes performance and memory problem. For a large key, if its tuples can not fit into memory, performance has to sacrifice to spill extra data into disk. Since many UDFs do not really need to see all the tuples that belongs to a key at the same time, it is possible to pass those tuples as batches. A good example would be like COUNT(), SUM(). Tuples can be passed to UDFs in accumulative manner. When all the tuples are passed, the final method is called to retrieve the value. This way, we can minimize the memory usage and improve performance by avoiding data spill. @@ -22, +20 @@ {{{ public interface Accumulator <T> { /** - * Pass tuples to the UDF. You can retrive DataBag by calling b.get(index). + * Pass tuples to the UDF. You can retrive DataBag by calling b.get(index). * Each DataBag may contain 0 to many tuples for current key */ public void accumulate(Tuple b) throws IOException; @@ -32, +30 @@ * @return the value for the UDF for this key. */ public T getValue(); - + - /** + /** - * Called after getValue() to prepare processing for next key. + * Called after getValue() to prepare processing for next key. */ public void cleanup(); } }}} - UDF should still extend EvalFunc as before. The PIG engine would detect based on context whether tuples can be processed accumulatively. If not, then regular EvalFunc would be called. Therefore, for a UDF, both interfaces should be implemented properly == Use Cases == PIG engine would process tuples accumulatively only when all of the UDFs implements Accumulator interface. If one of the UDF is not Accumulator, then all UDFs are called by their EvalFunc interface as regular UDFs. Following are examples accumulator interface of UDFs would be called: - * group by + * group by - {{{ + . {{{ A = load 'mydata'; B = group A by $0; C = foreach B generate group, myUDF(A); store C into 'myresult'; - }}} + }}} - * cogroup + * cogroup - {{{ + . {{{ A = load 'mydata1'; B = load 'mydata2'; C = cogroup A by $0, B by $0; D = foreach C generate group, myUDF(A), myUDF(B); store D into 'myresult'; - }}} + }}} - * group by with sort + * group by with sort - {{{ + . {{{ A = load 'mydata'; B = group A by $0; C = foreach B { @@ -72, +69 @@ generate group, myUDF(D); } store C into 'myresult'; - }}} + }}} - * group by with distinct + * group by with distinct - {{{ + . {{{ A = load 'mydata'; B = group A by $0; C = foreach B { @@ -84, +81 @@ generate group, myUDF(E); } store C into 'myresult'; - }}} + }}} == When to Call Accumulator == - MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible to run in accumulative mode. Before AccumulatorOptimizer is called, another optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks if POSort or PODistinct in the inner plan of foreach can be removed/replaced by using secondary sorting key supported by hadoop. If it is POSort, then it is removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of this optimizer, the last two use cases with order by and distinct inside foreach inner plan can still run in accumulative mode. + . MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible to run in accumulative mode. Before AccumulatorOptimizer is called, another optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks if POSort or PODistinct in the inner plan of foreach can be removed/replaced by using secondary sorting key supported by hadoop. If it is POSort, then it is removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of this optimizer, the last two use cases with order by and distinct inside foreach inner plan can still run in accumulative mode. - - The AccumulatorOptimizer checks the reducer plan and enables accumulator if following criteria are met: + The AccumulatorOptimizer checks the reducer plan and enables accumulator if following criteria are met: - * The reducer plan uses POPackage as root, not any of its sub-classes. POPackage is not for distinct, and any of its input is not set as inner. + * The reducer plan uses POPackage as root, not any of its sub-classes. POPackage is not for distinct, and any of its input is not set as inner. - * The successor of POPackage is a POForeach. + * The successor of POPackage is a POForeach. - * The leaves of each POForEach input plan is an ExpressionOperator and it must be one of the following: + * The leaves of each POForEach input plan is an ExpressionOperator and it must be one of the following: - * ConstantExpression + * ConstantExpression - * POProject, whose result type is not BAG, or TUPLE and overloaded + * POProject, whose result type is not BAG, or TUPLE and overloaded - * POMapLookup + * POMapLookup - * POCase + * POCase - * UnaryExpressionOperator + * UnaryExpressionOperator - * BinaryExpressionOperator + * BinaryExpressionOperator - * POBinCond + * POBinCond - * POUserFunc that implements Accumulator interface and its inputs contains only ExpressionOperation, POForEach, or POSortedDistinct, but not another POUserFunc. + * POUserFunc that implements Accumulator interface and its inputs contains only ExpressionOperation, POForEach, or POSortedDistinct, but not another POUserFunc. Therefore, if under POForEach, there are multiple UDFs, some are Accumulators,while some are not, the Accumulator would be off. == Design == - Once the optimizer detects that the reduce plan can run accumulatively, it set a flag to POPackage and POForEach to indicate the data is going to be processed in accumulative mode. POForEach in turn sets this flat to all the operations of its input plans. - During runtime, POPackages creates a tuple with AccumultiveBag as its fields. This bag wraps up an AccumulativeTupleFeeder, which has a handler to the reducer Iterator to pull next batch of tuples. It also has a buffer to hold tuples of current batch. All AccumulativeBag shares the same feeder. The tuple generated by POPackage is passed to POForeach, POForeach is able to get AccumulativeTupleFeeder from AccumulativeBag. It then calls feeder.nextBatch() to fill the AccumulativeBag with first batch of tuples, pass them to the POUserFunc. Because POUserFunc is marked as accumulative, it would call the accumulate() of the UDF. The POUserFunc returns with a code of STATUS_BATCH_OK. Then POForeach pulls next batch, and so on until the last batch of tuples are retrieved and processed. At the end, POForeach notifies POUserFunc that accumulation is done. It makes a final call to POUserFunc, which in turn calls getValue() to return the final result. + During runtime, POPackages creates a tuple with AccumultiveBag as its fields. This bag wraps up an AccumulativeTupleFeeder, which has a handler to the reducer Iterator to pull next batch of tuples. It also has a buffer to hold tuples of current batch. All AccumulativeBag shares the same feeder. The tuple generated by POPackage is passed to POForeach, POForeach is able to get AccumulativeTupleFeeder from AccumulativeBag. It then calls feeder.nextBatch() to fill the AccumulativeBag with first batch of tuples, pass them to the POUserFunc. Because POUserFunc is marked as accumulative, it would call the accumulate() of the UDF. The POUserFunc returns with a code of STATUS_BATCH_OK. Then POForeach pulls next batch, and so on until the last batch of tuples are retrieved and processed. At the end, POForeach notifies POUserFunc that accumulation is done. It makes a final call to POUserFunc, which in turn calls getValue() to return the final result. - Following is the sequence diagram of the data flow: + Following is the sequence diagram of the data flow: - <img src="http://twiki.corp.yahoo.com/pub/Tiger/AccumulatorUDF/SequenceDiagram3.jpg" width="500" height="500> + {{attachment:/homes/yinghe/Desktop/SequenceDiagram.jpg}} +