Liyun,
    I was out and could not reply soon. Please send questions like this to
dev@ so that other committers can also look into and answer.

  There is no bug here and behavior is as expected.

 C = group B by id;

D = foreach C {

E = order B by f desc;

F = E.f;

generate group, myudfs.AccumulativeSumBag(F);

};


There is no secondary key optimization applied in this mapreduce plan at
all.  Usually order by (POSort) above would usually be removed and replaced
with secondary key sort. But in this case, the output of group by is just
stored into HDFS as there is a split. Then on the map phase of the join, it
is loaded twice (once for D and once for G) and the inner foreach of those
processed and then joined in the reduce. since the UDF is executing in the
map it is not run as accumulator, but run as normal EvalFunc and exec() is
called.

 In this case it would have been better if the foreach statements where
executed in the reduce of the group by and two different outputs stored.
But the MultiQueryOptimizer is not applied if secondary key optimization is
possible.

MultiQueryOptimizer.java

if (successor.getUseSecondaryKey()) {

                log.debug("Splittee " + successor.getOperatorKey().getId()

                        + " uses secondary key, do not merge it");

                continue;

            }

In this case, both the splits have same secondary key as they both do order
B by f desc;  (which is not the case most of the time) and is possible to
apply MultiQueryOptimizer. But currently MultiQueryOptimizer does not have
that intelligence to check and merge into one plan if all of them have same
secondary key.

Regards,
Rohini


On Tue, May 19, 2015 at 1:53 AM, Zhang, Liyun <[email protected]> wrote:

>  Hi Rohini:
>
>    I found a problem when executing following script in mr mode:
>
>
>
> *testAccumulator.join.pig*
>
>
>
> REGISTER /home/zly/prj/oss/kellyzly/pig/bin/myudfs.jar;
>
> A = load './testAccumulator.txt' as (id:int,f);
>
> B = foreach A generate id, f, id as t;
>
> C = group B by id;
>
> D = foreach C {
>
> E = order B by f desc;
>
> F = E.f;
>
> generate group, myudfs.AccumulativeSumBag(F);
>
> };
>
> G = foreach C {
>
> E = order B by f desc;
>
> F = E.f;
>
> generate group, myudfs.AccumulativeSumBag(F);
>
> };
>
> H = join D by group, G by group;
>
> store H into 'testAccumulator.join.out';
>
> explain H
>
>
>
> cat myudfs/AccumulativeSumBag.java:
>
> package myudfs;
>
>
>
> import java.io.IOException;
>
> import java.util.Iterator;
>
> import org.apache.pig.EvalFunc;
>
> import org.apache.pig.Accumulator;
>
> import org.apache.pig.data.DataBag;
>
> import org.apache.pig.data.Tuple;
>
>
>
> /**
>
> * This class is for testing of accumulator udfs
>
> *
>
> */
>
> public class AccumulativeSumBag extends EvalFunc<String> implements
> Accumulator<String>
>
> {
>
>
>
>     StringBuffer sb;
>
>
>
>     public AccumulativeSumBag() {
>
>     }
>
>
>
>     public void accumulate(Tuple tuple) throws IOException {
>
>         DataBag databag = (DataBag)tuple.get(0);
>
>         if(databag == null)
>
>             return;
>
>
>
>         if (sb == null) {
>
>             sb = new StringBuffer();
>
>         }
>
>
>
>         Iterator<Tuple> iterator = databag.iterator();
>
>         while(iterator.hasNext()) {
>
>             Tuple t = iterator.next();
>
>             if (t.size()>1 && t.get(1) == null) {
>
>                 continue;
>
>             }
>
>
>
>             sb.append(t.toString());
>
>         }
>
>     }
>
>
>
>     public String getValue() {
>
>         if (sb != null && sb.length()>0) {
>
>             return sb.toString();
>
>         }
>
>         return null;
>
>     }
>
>
>
>     public void cleanup() {
>
>         sb = null;
>
>     }
>
>
>
>     public String exec(Tuple tuple) throws IOException {
>
>         throw new IOException("exec() should not be called");
>
>     }
>
> }
>
>
>
> the error message is:
>
> ava.lang.Exception: org.apache.pig.backend.executionengine.ExecException:
> ERROR 0: Exception while executing (Name: H: Local
> Rearrange[tuple]{int}(fals     e) - scope-117 Operator Key: scope-117):
> org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught
> error from UDF: myudfs.AccumulativeSum     Bag [exec() should not be called]
>
> 5619         at
> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
>
> 5620         at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
>
> 5621 Caused by: org.apache.pig.backend.executionengine.ExecException:
> ERROR 0: Exception while executing (Name: H: Local
> Rearrange[tuple]{int}(false) - scope     -117 Operator Key: scope-117):
> org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught
> error from UDF: myudfs.AccumulativeSumBag [exec(     ) should not be called]
>
> 5622         at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:316)
>
> 5623         at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNextTuple(POLocalRearrange.java:291)
>
> 5624         at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion.getNextTuple(POUnion.java:167)
>
> 5625         at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:279)
>
> 5626         at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:274)
>
> 5627         at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
>
> 5628         at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>
> 5629         at
> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
>
> 5630         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
>
> 5631         at
> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
>
> 5632         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>
> 5633         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>
> 5634         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> 5635         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> 5636         at java.lang.Thread.run(Thread.java:744)
>
>
>
> Following is the physical plan and mr plan.
>
> #-----------------------------------------------
>
> # Physical Plan:
>
> #-----------------------------------------------
>
> H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
>
> |
>
> |---H: New For Each(true,true)[tuple] - scope-56
>
>     |   |
>
>     |   Project[bag][1] - scope-54
>
>     |   |
>
>     |   Project[bag][2] - scope-55
>
>     |
>
>     |---H: Package(Packager)[tuple]{int} - scope-49
>
>         |
>
>         |---H: Global Rearrange[tuple] - scope-48
>
>             |
>
>             |---H: Local Rearrange[tuple]{int}(false) - scope-50
>
>             |   |   |
>
>             |   |   Project[int][0] - scope-51
>
>             |   |
>
>             |   |---D: New For Each(false,false)[bag] - scope-32
>
>             |       |   |
>
>             |       |   Project[int][0] - scope-22
>
>             |       |   |
>
>             |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] -
> scope-25
>
>             |       |   |
>
>             |       |   |---RelationToExpressionProject[bag][*] - scope-24
>
>             |       |       |
>
>             |       |       |---F: New For Each(false)[bag] - scope-31
>
>             |       |           |   |
>
>             |       |           |   Project[bytearray][1] - scope-29
>
>             |       |           |
>
>             |       |           |---E: POSort[bag]() - scope-28
>
>             |       |               |   |
>
>             |       |               |   Project[bytearray][1] - scope-27
>
>             |       |               |
>
>             |       |               |---Project[bag][1] - scope-26
>
>             |       |
>
>             |       |---C: Filter[bag] - scope-20
>
>             |           |   |
>
>             |           |   Constant(true) - scope-21
>
>             |           |
>
>             |           |---C: Split - scope-19    // here an implicit
> Split is generated
>
>             |               |
>
>             |               |---C: Package(Packager)[tuple]{int} - scope-16
>
>             |                   |
>
>             |                   |---C: Global Rearrange[tuple] - scope-15
>
>             |                       |
>
>             |                       |---C: Local
> Rearrange[tuple]{int}(false) - scope-17
>
>             |                           |   |
>
>             |                           |   Project[int][0] - scope-18
>
>             |                           |
>
>             |                           |---B: New For
> Each(false,false,false)[bag] - scope-14
>
>             |                               |   |
>
>             |                               |   Project[int][0] - scope-7
>
>            |                               |   |
>
>             |                               |   Project[bytearray][1] -
> scope-9
>
>             |                               |   |
>
>             |                               |
> POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
>
>             |                               |   |
>
>             |                               |   |---Project[int][0] -
> scope-11
>
>             |                               |
>
>             |                               |---A: New For
> Each(false,false)[bag] - scope-6
>
>             |                                   |   |
>
>             |                                   |   Cast[int] - scope-2
>
>             |                                   |   |
>
>             |                                   |
> |---Project[bytearray][0] - scope-1
>
>             |                                   |   |
>
>             |                                   |   Project[bytearray][1]
> - scope-4
>
>             |                                   |
>
>             |                                   |---A: Load(hdfs://
> zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
> - scope-0
>
>             |
>
>             |---H: Local Rearrange[tuple]{int}(false) - scope-52
>
>                 |   |
>
>                 |   Project[int][0] - scope-53
>
>                 |
>
>                 |---G: New For Each(false,false)[bag] - scope-45
>
>                     |   |
>
>                     |   Project[int][0] - scope-35
>
>                     |   |
>
>                     |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] -
> scope-38
>
>                     |   |
>
>                     |   |---RelationToExpressionProject[bag][*] - scope-37
>
>                     |       |
>
>                     |       |---F: New For Each(false)[bag] - scope-44
>
>                     |           |   |
>
>                     |           |   Project[bytearray][1] - scope-42
>
>                     |           |
>
>                     |           |---E: POSort[bag]() - scope-41
>
>                     |               |   |
>
>                     |               |   Project[bytearray][1] - scope-40
>
>                     |               |
>
>                     |               |---Project[bag][1] - scope-39
>
>                     |
>
>                     |---C: Filter[bag] - scope-33
>
>                         |   |
>
>                         |   Constant(true) - scope-34
>
>                         |
>
>                         |---C: Split - scope-19
>
>                             |
>
>                             |---C: Package(Packager)[tuple]{int} - scope-16
>
>                                 |
>
>                                 |---C: Global Rearrange[tuple] - scope-15
>
>                                     |
>
>                                     |---C: Local
> Rearrange[tuple]{int}(false) - scope-17
>
>                                         |   |
>
>                                         |   Project[int][0] - scope-18
>
>                                         |
>
>                                         |---B: New For
> Each(false,false,false)[bag] - scope-14
>
>                                             |   |
>
>                                             |   Project[int][0] - scope-7
>
>                                             |   |
>
>                                             |   Project[bytearray][1] -
> scope-9
>
>                                             |   |
>
>                                             |
> POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
>
>                                             |   |
>
>                                             |   |---Project[int][0] -
> scope-11
>
>                                             |
>
>                                             |---A: New For
> Each(false,false)[bag] - scope-6
>
>                                                 |   |
>
>                                                 |   Cast[int] - scope-2
>
>                                                 |   |
>
>                                                 |
> |---Project[bytearray][0] - scope-1
>
>                                                 |   |
>
>                                                 |   Project[bytearray][1]
> - scope-4
>
>                                                 |
>
>                                                 |---A: Load(hdfs://
> zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
> - scope-0
>
>
>
> #--------------------------------------------------
>
> # Map Reduce Plan
>
> #--------------------------------------------------
>
> MapReduce node scope-58
>
> Map Plan
>
> C: Local Rearrange[tuple]{int}(false) - scope-17
>
> |   |
>
> |   Project[int][0] - scope-18
>
> |
>
> |---B: New For Each(false,false,false)[bag] - scope-14
>
>     |   |
>
>     |   Project[int][0] - scope-7
>
>     |   |
>
>     |   Project[bytearray][1] - scope-9
>
>     |   |
>
>     |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] -
> scope-12
>
>     |   |
>
>     |   |---Project[int][0] - scope-11
>
>     |
>
>     |---A: New For Each(false,false)[bag] - scope-6
>
>         |   |
>
>         |   Cast[int] - scope-2
>
>         |   |
>
>         |   |---Project[bytearray][0] - scope-1
>
>         |   |
>
>         |   Project[bytearray][1] - scope-4
>
>         |
>
>         |---A: Load(hdfs://
> zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
> - scope-0--------
>
> Reduce Plan
>
> Store(hdfs://
> zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage)
> - scope-59
>
> |
>
> |---C: Package(Packager)[tuple]{int} - scope-16--------
>
> Global sort: false
>
> ----------------
>
>
>
> MapReduce node scope-64
>
> Map Plan
>
> Union[tuple] - scope-65
>
> |
>
> |---H: Local Rearrange[tuple]{int}(false) - scope-50
>
> |   |   |
>
> |   |   Project[int][0] - scope-51
>
> |   |
>
> |   |---D: New For Each(false,false)[bag] - scope-32
>
> |       |   |
>
> |       |   Project[int][0] - scope-22
>
> |       |   |
>
> |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25
>
> |       |   |
>
> |       |   |---RelationToExpressionProject[bag][*] - scope-24
>
> |       |       |
>
> |       |       |---F: New For Each(false)[bag] - scope-31
>
> |       |           |   |
>
> |       |           |   Project[bytearray][1] - scope-29
>
> |       |           |
>
> |       |           |---E: POSort[bag]() - scope-28   // POSort should be
> deleted in  SecondaryKeyOptimizerUtil.java#applySecondaryKeySort . but it
> is not deleted because the POSplit(scope-19 in physical plan ) makes group
> and foreach in different operators
>
> |       |               |   |
>
> |       |               |   Project[bytearray][1] - scope-27
>
> |       |               |
>
> |       |               |---Project[bag][1] - scope-26
>
> |       |
>
> |       |---Load(hdfs://
> zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage)
> - scope-60
>
> |
>
> |---H: Local Rearrange[tuple]{int}(false) - scope-52
>
>     |   |
>
>     |   Project[int][0] - scope-53
>
>     |
>
>     |---G: New For Each(false,false)[bag] - scope-45
>
>         |   |
>
>         |   Project[int][0] - scope-35
>
>         |   |
>
>         |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38
>
>         |   |
>
>         |   |---RelationToExpressionProject[bag][*] - scope-37
>
>         |       |
>
>         |       |---F: New For Each(false)[bag] - scope-44
>
>         |           |   |
>
>         |           |   Project[bytearray][1] - scope-42
>
>         |           |
>
>         |           |---E: POSort[bag]() - scope-41
>
>         |               |   |
>
>         |               |   Project[bytearray][1] - scope-40
>
>         |               |
>
>         |               |---Project[bag][1] - scope-39
>
>         |
>
>         |---Load(hdfs://
> zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage)
> - scope-62--------
>
> Reduce Plan
>
> H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
>
> |
>
> |---H: Package(JoinPackager(true,true))[tuple]{int} - scope-49--------
>
> Global sort: false
>
> ----------------
>
>
>
> The reason why it fails is POSort is not deleted when secondary key sort
> is enables(POSort should be deleted in
> SecondaryKeyOptimizerUtil.java#applySecondaryKeySort). If POSplit is not
> deleted, the value “foundUDF “ in AccumulatorOptimizerUtil#addAccumulator
> is false and po_foreach.setAccumulative will not be called. This causes 
> “Caught
> error from UDF: myudfs.AccumulativeSum     Bag [exec() should not be called]”.
> Because an implicit POSplit is generated, when poSplit is encounted in mr
> plan, a new mr Operator is generated(POSplit(scope-19) splits the physical
> plan into MapReduceNode scope-58  and MapReduceNode scope-64) . So
> SecondaryKeyOptimizerUtil.java#applySecondaryKeySort does not work.
>
>
>
> AccumulatorOptimizerUtil#addAccumulator
>
>   public static void addAccumulator(PhysicalPlan plan) {
>
>         // See if this is a map-reduce job
>
>         List<PhysicalOperator> pos = plan.getRoots();
>
>         if (pos == null || pos.size() == 0) {
>
>             return;
>
>         }
>
>
>
>        // See if this is a POPackage
>
>         PhysicalOperator po_package = pos.get(0);
>
>         if (!po_package.getClass().equals(POPackage.class)) {
>
>             return;
>
>         }
>
>
>
>         Packager pkgr = ((POPackage) po_package).getPkgr();
>
>         // Check that this is a standard package, not a subclass
>
>         if (!pkgr.getClass().equals(Packager.class)) {
>
>             return;
>
>         }
>
>
>
>         // if POPackage is for distinct, just return
>
>         if (pkgr.isDistinct()) {
>
>             return;
>
>         }
>
>
>
>         // if any input to POPackage is inner, just return
>
>         boolean[] isInner = pkgr.getInner();
>
>         for (boolean b: isInner) {
>
>             if (b) {
>
>                 return;
>
>             }
>
>         }
>
>
>
>         List<PhysicalOperator> l = plan.getSuccessors(po_package);
>
>         // there should be only one POForEach
>
>         if (l == null || l.size() == 0 || l.size() > 1) {
>
>             return;
>
>         }
>
>
>
>         PhysicalOperator po_foreach = l.get(0);
>
>         if (!(po_foreach instanceof POForEach)) {
>
>             return;
>
>         }
>
>
>
>         boolean foundUDF = false;
>
>         List<PhysicalPlan> list = ((POForEach)po_foreach).getInputPlans();
>
>         for (PhysicalPlan p: list) {
>
>             PhysicalOperator po = p.getLeaves().get(0);
>
>
>
>             // only expression operators are allowed
>
>             if (!(po instanceof ExpressionOperator)) {
>
>                 return;
>
>             }
>
>
>
>             if (((ExpressionOperator)po).containUDF()) {
>
>                 foundUDF = true;
>
>             }
>
>
>
>             if (!check(po)) {
>
>                 return;
>
>             }
>
>         }
>
>
>
>         if (foundUDF) {
>
>             // if all tests are passed, reducer can run in accumulative
> mode
>
>             LOG.info("Reducer is to run in accumulative mode.");
>
>             po_package.setAccumulative();
>
>             po_foreach.setAccumulative();
>
>         }
>
>     }
>
>
>
>
>
> My question: is it a bug  or pig does not deal with this kind of script
> case when implicit posplit is generated when secondary key optimization
> enables ?
>
>
>
>
>
>
>
> Kelly Zhang/Zhang,Liyun
>
> Best Regards
>
>
>

Reply via email to