Apache Wiki
Mon, 16 Nov 2009 10:55:32 -0800
Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The "PigAccumulatorSpec" page has been changed by yinghe. http://wiki.apache.org/pig/PigAccumulatorSpec?action=diff&rev1=2&rev2=3 -------------------------------------------------- }}} == When to Call Accumulator == - . MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible to run in accumulative mode. Before AccumulatorOptimizer is called, another optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks if POSort or PODistinct in the inner plan of foreach can be removed/replaced by using secondary sorting key supported by hadoop. If it is POSort, then it is removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of this optimizer, the last two use cases with order by and distinct inside foreach inner plan can still run in accumulative mode. + . MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible to run in accumulative mode. Before AccumulatorOptimizer is called, another optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks if POSort or PODistinct in the inner plan of foreach can be removed/replaced by using secondary sorting key supported by hadoop. If it is POSort, then it is removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of this optimizer, the last two use cases with order by and distinct inside foreach inner plan can still run in accumulative mode. The AccumulatorOptimizer checks the reducer plan and enables accumulator if following criteria are met: - The AccumulatorOptimizer checks the reducer plan and enables accumulator if following criteria are met: * The reducer plan uses POPackage as root, not any of its sub-classes. POPackage is not for distinct, and any of its input is not set as inner. * The successor of POPackage is a POForeach. * The leaves of each POForEach input plan is an ExpressionOperator and it must be one of the following: @@ -109, +108 @@ {{attachment:/homes/yinghe/Desktop/SequenceDiagram.jpg}} + == Internal Changes == + === Accumulator === + . A new interface that UDF can implement if it can run in accumulative mode. + + === PhysicalOperator === + . Add new methods setAccumulative(), setAccumStart(), setAccumEnd() to flag a physical operator to run in accumulative mode, and mark the start and end of accumulation. This change is in patch of PIG-1038. + + === MapReduceLauncher === + . Create AccumulatorOptimizer and use it to visit the plan. + + === AccumulatorOptimizer === + . Another MROpPlanVisitor. It checks the reduce plan, if it meets all the criteria, it sets the "accumulative" flag to POPackage and POForEach. It is created and invoked by MapReducerLauncher. + + === POStatus === + . Add a new state "STATUS_BATCH_OK" to indicate a batch is processed successfully in accumulative mode. + + === POForEach === + . If its "accumulative" flag is set, the bags passed to it through a tuple are AccumulativeBag as opposed to regular tuple bags. It gets AccumulativeTupleBuffer from the bag. Then it runs a while loop of calling nextBatch() of AccumulativeTupleBuffer, pass the input to inner plans. If an inner plan contains any UDF, the inner plan returns POStatus.STATUS_BATCH_OK if current batch is processed successfully. When there are no more batches to process, POForEach notifies each inner plan that accumulation is done, it makes a final call to get result and out of the while loop. At the end, POForEach returns the result to its successor in reducer plan. The operators that called POForEach doesn't need to know whether POForEach gets its result through regular mode or accumulative mode. + + === AccumulativeBag === + . An implementation of DataBag use by POPackage for processing data in accumulative mode. This bag doesn't contain all tuples from iterator. Instead, it wrapps up AccumultiveTupleBuffer, which contains iterator to pull tuples out in batches. Call the iterator() of this call only gives you the tuples for current batch. + + === AccumulativeTupleBuffer === + . An underlying buffer that is shared by all AccumulativeBags (one bag for group by, multiple bags for cogroup) generated by POPackage. POPackage has an inner class which implements this interface. POPackage creates an instance of this buffer and set it into the AccumulativeBags. This buffer has methods to retrieve next batch of tuples, which in turn calls methods of POPackage to read tuples out of iterator, and put them in an internal list. The AccumulativeBag has access to that list to return iterator of tuples. + + === POPackage === + . If its "accumulative" flag is set, it creates AccumulativeBag and AccumulativeTupleBuffer as opposed to creating default tuple bags. It then sets AccumulativeTupleBuffer into AccumulativeBag, and set ACcumulativeBag into the tuple in result. + POPackage also has an inner class which implements AccumulativeTupleBuffer interface. + + === ExpressionOperator === + . Add new methods getChildExpression(), containUDF() and accumChild(). The accumChild() is called by all expression operators that has more than one child operator. The expression operator needs to drive all child operators that contain UDF to process batched data. If it is in accumulative mode, accumChild() returns POStatus.STATUS_BATCH_OK. If it is not in accumulative mode, accumChild() returns null. + + === POUserFunc === + . Which method of UDF to call is changed based on its state. If accumulative flag is on, and accumulation is started, it calls accumulate(), if accumulation is ended, it calls getValue(), followed by cleanup(). If accumulative flag is off, call exec(). + + === Other ExpressionOperators === + . All of the following operations are changed to call accumChild() in getNext(), if it returns null, then call its regular logic. + * GreaterThanExpr + * EqualToExpr + * LTOrEqualToExpr + * LessThanExpr + * NotEqualToExpr + * GTOrEqualToExpr + * Divide + * Add + * Mod + * Multiply + * Subtract + * POOr + * POAnd + * POBinCond + * PORegexp + + === Buildin UDFs === + . The following UDFs are changed to implement Accumulator interface. + . + * MAX + + * IntMax + + * LongMax + + * DoubleMax + + * FloatMax + + * StringMax + + * SUM + + * IntSum + + * LongSum + + * DoubleSum + * FloatSUM + + * MIN + + * IntMin + + * DoubleMin + + * LongMin + + * FloatMin + + * StringMin + + * AVG + + * IntAvg + + * LongAvg + + * FloatAvg + + * DoubleAvg[[DoubleAvg|]] + + * COUNT_STAR + * COUNT +