Hi all: Any one can help see this issue? If it is a bug, I will file a jira for it . If pig code does not deal with this kind of script, please tell me.
Kelly Zhang/Zhang,Liyun Best Regards From: Rohini Palaniswamy [mailto:[email protected]] Sent: Thursday, May 21, 2015 5:59 AM To: Zhang, Liyun; [email protected] Cc: Mohit Sabharwal; [email protected]; Xuefu Zhang Subject: Re: A problem about implicit POSPLIT found in mr mode Liyun, I was out and could not reply soon. Please send questions like this to dev@ so that other committers can also look into and answer. There is no bug here and behavior is as expected. C = group B by id; D = foreach C { E = order B by f desc; F = E.f; generate group, myudfs.AccumulativeSumBag(F); }; There is no secondary key optimization applied in this mapreduce plan at all. Usually order by (POSort) above would usually be removed and replaced with secondary key sort. But in this case, the output of group by is just stored into HDFS as there is a split. Then on the map phase of the join, it is loaded twice (once for D and once for G) and the inner foreach of those processed and then joined in the reduce. since the UDF is executing in the map it is not run as accumulator, but run as normal EvalFunc and exec() is called. In this case it would have been better if the foreach statements where executed in the reduce of the group by and two different outputs stored. But the MultiQueryOptimizer is not applied if secondary key optimization is possible. MultiQueryOptimizer.java if (successor.getUseSecondaryKey()) { log.debug("Splittee " + successor.getOperatorKey().getId() + " uses secondary key, do not merge it"); continue; } In this case, both the splits have same secondary key as they both do order B by f desc; (which is not the case most of the time) and is possible to apply MultiQueryOptimizer. But currently MultiQueryOptimizer does not have that intelligence to check and merge into one plan if all of them have same secondary key. Regards, Rohini On Tue, May 19, 2015 at 1:53 AM, Zhang, Liyun <[email protected]<mailto:[email protected]>> wrote: Hi Rohini: I found a problem when executing following script in mr mode: testAccumulator.join.pig REGISTER /home/zly/prj/oss/kellyzly/pig/bin/myudfs.jar; A = load './testAccumulator.txt' as (id:int,f); B = foreach A generate id, f, id as t; C = group B by id; D = foreach C { E = order B by f desc; F = E.f; generate group, myudfs.AccumulativeSumBag(F); }; G = foreach C { E = order B by f desc; F = E.f; generate group, myudfs.AccumulativeSumBag(F); }; H = join D by group, G by group; store H into 'testAccumulator.join.out'; explain H cat myudfs/AccumulativeSumBag.java: package myudfs; import java.io.IOException; import java.util.Iterator; import org.apache.pig.EvalFunc; import org.apache.pig.Accumulator; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; /** * This class is for testing of accumulator udfs * */ public class AccumulativeSumBag extends EvalFunc<String> implements Accumulator<String> { StringBuffer sb; public AccumulativeSumBag() { } public void accumulate(Tuple tuple) throws IOException { DataBag databag = (DataBag)tuple.get(0); if(databag == null) return; if (sb == null) { sb = new StringBuffer(); } Iterator<Tuple> iterator = databag.iterator(); while(iterator.hasNext()) { Tuple t = iterator.next(); if (t.size()>1 && t.get(1) == null) { continue; } sb.append(t.toString()); } } public String getValue() { if (sb != null && sb.length()>0) { return sb.toString(); } return null; } public void cleanup() { sb = null; } public String exec(Tuple tuple) throws IOException { throw new IOException("exec() should not be called"); } } the error message is: ava.lang.Exception: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: H: Local Rearrange[tuple]{int}(fals e) - scope-117 Operator Key: scope-117): org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: myudfs.AccumulativeSum Bag [exec() should not be called] 5619 at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 5620 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 5621 Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: H: Local Rearrange[tuple]{int}(false) - scope -117 Operator Key: scope-117): org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: myudfs.AccumulativeSumBag [exec( ) should not be called] 5622 at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:316) 5623 at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNextTuple(POLocalRearrange.java:291) 5624 at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion.getNextTuple(POUnion.java:167) 5625 at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:279) 5626 at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:274) 5627 at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64) 5628 at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) 5629 at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) 5630 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340) 5631 at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) 5632 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 5633 at java.util.concurrent.FutureTask.run(FutureTask.java:262) 5634 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 5635 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 5636 at java.lang.Thread.run(Thread.java:744) Following is the physical plan and mr plan. #----------------------------------------------- # Physical Plan: #----------------------------------------------- H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57 | |---H: New For Each(true,true)[tuple] - scope-56 | | | Project[bag][1] - scope-54 | | | Project[bag][2] - scope-55 | |---H: Package(Packager)[tuple]{int} - scope-49 | |---H: Global Rearrange[tuple] - scope-48 | |---H: Local Rearrange[tuple]{int}(false) - scope-50 | | | | | Project[int][0] - scope-51 | | | |---D: New For Each(false,false)[bag] - scope-32 | | | | | Project[int][0] - scope-22 | | | | | POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25 | | | | | |---RelationToExpressionProject[bag][*] - scope-24 | | | | | |---F: New For Each(false)[bag] - scope-31 | | | | | | | Project[bytearray][1] - scope-29 | | | | | |---E: POSort[bag]() - scope-28 | | | | | | | Project[bytearray][1] - scope-27 | | | | | |---Project[bag][1] - scope-26 | | | |---C: Filter[bag] - scope-20 | | | | | Constant(true) - scope-21 | | | |---C: Split - scope-19 // here an implicit Split is generated | | | |---C: Package(Packager)[tuple]{int} - scope-16 | | | |---C: Global Rearrange[tuple] - scope-15 | | | |---C: Local Rearrange[tuple]{int}(false) - scope-17 | | | | | Project[int][0] - scope-18 | | | |---B: New For Each(false,false,false)[bag] - scope-14 | | | | | Project[int][0] - scope-7 | | | | | Project[bytearray][1] - scope-9 | | | | | POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12 | | | | | |---Project[int][0] - scope-11 | | | |---A: New For Each(false,false)[bag] - scope-6 | | | | | Cast[int] - scope-2 | | | | | |---Project[bytearray][0] - scope-1 | | | | | Project[bytearray][1] - scope-4 | | | |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage<http://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage>) - scope-0 | |---H: Local Rearrange[tuple]{int}(false) - scope-52 | | | Project[int][0] - scope-53 | |---G: New For Each(false,false)[bag] - scope-45 | | | Project[int][0] - scope-35 | | | POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38 | | | |---RelationToExpressionProject[bag][*] - scope-37 | | | |---F: New For Each(false)[bag] - scope-44 | | | | | Project[bytearray][1] - scope-42 | | | |---E: POSort[bag]() - scope-41 | | | | | Project[bytearray][1] - scope-40 | | | |---Project[bag][1] - scope-39 | |---C: Filter[bag] - scope-33 | | | Constant(true) - scope-34 | |---C: Split - scope-19 | |---C: Package(Packager)[tuple]{int} - scope-16 | |---C: Global Rearrange[tuple] - scope-15 | |---C: Local Rearrange[tuple]{int}(false) - scope-17 | | | Project[int][0] - scope-18 | |---B: New For Each(false,false,false)[bag] - scope-14 | | | Project[int][0] - scope-7 | | | Project[bytearray][1] - scope-9 | | | POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12 | | | |---Project[int][0] - scope-11 | |---A: New For Each(false,false)[bag] - scope-6 | | | Cast[int] - scope-2 | | | |---Project[bytearray][0] - scope-1 | | | Project[bytearray][1] - scope-4 | |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage<http://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage>) - scope-0 #-------------------------------------------------- # Map Reduce Plan #-------------------------------------------------- MapReduce node scope-58 Map Plan C: Local Rearrange[tuple]{int}(false) - scope-17 | | | Project[int][0] - scope-18 | |---B: New For Each(false,false,false)[bag] - scope-14 | | | Project[int][0] - scope-7 | | | Project[bytearray][1] - scope-9 | | | POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12 | | | |---Project[int][0] - scope-11 | |---A: New For Each(false,false)[bag] - scope-6 | | | Cast[int] - scope-2 | | | |---Project[bytearray][0] - scope-1 | | | Project[bytearray][1] - scope-4 | |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage<http://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage>) - scope-0-------- Reduce Plan Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage<http://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage>) - scope-59 | |---C: Package(Packager)[tuple]{int} - scope-16-------- Global sort: false ---------------- MapReduce node scope-64 Map Plan Union[tuple] - scope-65 | |---H: Local Rearrange[tuple]{int}(false) - scope-50 | | | | | Project[int][0] - scope-51 | | | |---D: New For Each(false,false)[bag] - scope-32 | | | | | Project[int][0] - scope-22 | | | | | POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25 | | | | | |---RelationToExpressionProject[bag][*] - scope-24 | | | | | |---F: New For Each(false)[bag] - scope-31 | | | | | | | Project[bytearray][1] - scope-29 | | | | | |---E: POSort[bag]() - scope-28 // POSort should be deleted in SecondaryKeyOptimizerUtil.java#applySecondaryKeySort . but it is not deleted because the POSplit(scope-19 in physical plan ) makes group and foreach in different operators | | | | | | | Project[bytearray][1] - scope-27 | | | | | |---Project[bag][1] - scope-26 | | | |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage<http://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage>) - scope-60 | |---H: Local Rearrange[tuple]{int}(false) - scope-52 | | | Project[int][0] - scope-53 | |---G: New For Each(false,false)[bag] - scope-45 | | | Project[int][0] - scope-35 | | | POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38 | | | |---RelationToExpressionProject[bag][*] - scope-37 | | | |---F: New For Each(false)[bag] - scope-44 | | | | | Project[bytearray][1] - scope-42 | | | |---E: POSort[bag]() - scope-41 | | | | | Project[bytearray][1] - scope-40 | | | |---Project[bag][1] - scope-39 | |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage<http://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage>) - scope-62-------- Reduce Plan H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57 | |---H: Package(JoinPackager(true,true))[tuple]{int} - scope-49-------- Global sort: false ---------------- The reason why it fails is POSort is not deleted when secondary key sort is enables(POSort should be deleted in SecondaryKeyOptimizerUtil.java#applySecondaryKeySort). If POSplit is not deleted, the value “foundUDF “ in AccumulatorOptimizerUtil#addAccumulator is false and po_foreach.setAccumulative will not be called. This causes “Caught error from UDF: myudfs.AccumulativeSum Bag [exec() should not be called]”. Because an implicit POSplit is generated, when poSplit is encounted in mr plan, a new mr Operator is generated(POSplit(scope-19) splits the physical plan into MapReduceNode scope-58 and MapReduceNode scope-64) . So SecondaryKeyOptimizerUtil.java#applySecondaryKeySort does not work. AccumulatorOptimizerUtil#addAccumulator public static void addAccumulator(PhysicalPlan plan) { // See if this is a map-reduce job List<PhysicalOperator> pos = plan.getRoots(); if (pos == null || pos.size() == 0) { return; } // See if this is a POPackage PhysicalOperator po_package = pos.get(0); if (!po_package.getClass().equals(POPackage.class)) { return; } Packager pkgr = ((POPackage) po_package).getPkgr(); // Check that this is a standard package, not a subclass if (!pkgr.getClass().equals(Packager.class)) { return; } // if POPackage is for distinct, just return if (pkgr.isDistinct()) { return; } // if any input to POPackage is inner, just return boolean[] isInner = pkgr.getInner(); for (boolean b: isInner) { if (b) { return; } } List<PhysicalOperator> l = plan.getSuccessors(po_package); // there should be only one POForEach if (l == null || l.size() == 0 || l.size() > 1) { return; } PhysicalOperator po_foreach = l.get(0); if (!(po_foreach instanceof POForEach)) { return; } boolean foundUDF = false; List<PhysicalPlan> list = ((POForEach)po_foreach).getInputPlans(); for (PhysicalPlan p: list) { PhysicalOperator po = p.getLeaves().get(0); // only expression operators are allowed if (!(po instanceof ExpressionOperator)) { return; } if (((ExpressionOperator)po).containUDF()) { foundUDF = true; } if (!check(po)) { return; } } if (foundUDF) { // if all tests are passed, reducer can run in accumulative mode LOG.info("Reducer is to run in accumulative mode."); po_package.setAccumulative(); po_foreach.setAccumulative(); } } My question: is it a bug or pig does not deal with this kind of script case when implicit posplit is generated when secondary key optimization enables ? Kelly Zhang/Zhang,Liyun Best Regards
