Author: rohini
Date: Wed Mar 25 16:42:40 2015
New Revision: 1669150

URL: http://svn.apache.org/r1669150
Log:
PIG-4479: Pig script with union within nested splits followed by join failed on 
Tez (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
    pig/trunk/test/e2e/pig/tests/nightly.conf

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1669150&r1=1669149&r2=1669150&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 25 16:42:40 2015
@@ -56,6 +56,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4479: Pig script with union within nested splits followed by join failed 
on Tez (rohini)
+
 PIG-4457: Error is thrown by JobStats.getOutputSize() when storing to a MySql 
table (rohini)
 
 PIG-4475: Keys in AvroMapWrapper are not proper Pig types (rdsr via daijy)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1669150&r1=1669149&r2=1669150&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Wed Mar 25 16:42:40 2015
@@ -447,7 +447,7 @@ public class TezDagBuilder extends TezOp
         setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, 
pkgTezOp);
 
         LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
-                combinePlan, pkgTezOp, combPack);
+                combinePlan, null, pkgTezOp, combPack);
         lrDiscoverer.visit();
 
         combinePlan.remove(combPack);

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1669150&r1=1669149&r2=1669150&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
 Wed Mar 25 16:42:40 2015
@@ -30,6 +30,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -67,11 +68,13 @@ public class TezPOPackageAnnotator exten
         List<TezOperator> preds = this.mPlan.getPredecessors(pkgTezOp);
         for (Iterator<TezOperator> it = preds.iterator(); it.hasNext();) {
             TezOperator predTezOp = it.next();
+            TezOperator predTezOpVertexGrp = null;
             if (predTezOp.isVertexGroup()) {
+                predTezOpVertexGrp = predTezOp;
                 // Just get one of the inputs to vertex group
                 predTezOp = 
getPlan().getOperator(predTezOp.getVertexGroupMembers().get(0));
             }
-            lrFound += patchPackage(predTezOp, pkgTezOp, pkg);
+            lrFound += patchPackage(predTezOp, predTezOpVertexGrp, pkgTezOp, 
pkg);
             if(lrFound == pkg.getNumInps()) {
                 break;
             }
@@ -79,13 +82,19 @@ public class TezPOPackageAnnotator exten
 
         if(lrFound != pkg.getNumInps()) {
             int errCode = 2086;
-            String msg = "Unexpected problem during optimization. Could not 
find all LocalRearrange operators.";
+            String msg = "Unexpected problem during optimization. "
+                    + "Could not find all LocalRearrange operators. Expected: "
+                    + pkg.getNumInps() + ", Found: " + lrFound;
             throw new OptimizerException(msg, errCode, PigException.BUG);
         }
     }
 
-    private int patchPackage(TezOperator predTezOp, TezOperator pkgTezOp, 
POPackage pkg) throws VisitorException {
-        LoRearrangeDiscoverer lrDiscoverer = new 
LoRearrangeDiscoverer(predTezOp.plan, pkgTezOp, pkg);
+    private int patchPackage(TezOperator predTezOp,
+            TezOperator predTezOpVertexGrp,
+            TezOperator pkgTezOp,
+            POPackage pkg) throws VisitorException {
+        LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
+                predTezOp.plan, predTezOpVertexGrp, pkgTezOp, pkg);
         lrDiscoverer.visit();
         // let our caller know if we managed to patch
         // the package
@@ -131,13 +140,24 @@ public class TezPOPackageAnnotator exten
         private int loRearrangeFound = 0;
         private TezOperator pkgTezOp;
         private POPackage pkg;
+        private TezOperator predTezOpVertexGrp;
+        private boolean isPOSplit;
 
-        public LoRearrangeDiscoverer(PhysicalPlan plan, TezOperator pkgTezOp, 
POPackage pkg) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
+        public LoRearrangeDiscoverer(PhysicalPlan predPlan, TezOperator 
predTezOpVertexGrp, TezOperator pkgTezOp, POPackage pkg) {
+            super(predPlan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(predPlan));
             this.pkgTezOp = pkgTezOp;
             this.pkg = pkg;
+            this.predTezOpVertexGrp = predTezOpVertexGrp;
+        }
+
+        @Override
+        public void visitSplit(POSplit spl) throws VisitorException {
+            isPOSplit = true;
+            super.visitSplit(spl);
         }
 
+
+
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws 
VisitorException {
             POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
@@ -160,17 +180,24 @@ public class TezPOPackageAnnotator exten
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, 
Integer>>>();
 
-            if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
-                // something is wrong - we should not be getting key info
-                // for the same index from two different Local Rearranges
-                int errCode = 2087;
-                String msg = "Unexpected problem during optimization." +
-                        " Found index:" + lrearrange.getIndex() +
-                        " in multiple LocalRearrange operators.";
-                throw new OptimizerException(msg, errCode, PigException.BUG);
+            Integer index = Integer.valueOf(lrearrange.getIndex());
+            if(keyInfo.get(index) != null) {
+                if (isPOSplit && predTezOpVertexGrp != null ) {
+                    // Case of POSplit having more than one member of the 
vertex group
+                    loRearrangeFound--;
+                } else {
+                    // something is wrong - we should not be getting key info
+                    // for the same index from two different Local Rearranges
+                    int errCode = 2087;
+                    String msg = "Unexpected problem during optimization." +
+                            " Found index:" + lrearrange.getIndex() +
+                            " in multiple LocalRearrange operators.";
+                    throw new OptimizerException(msg, errCode, 
PigException.BUG);
+                }
 
             }
-            keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+
+            keyInfo.put(index,
                     new Pair<Boolean, Map<Integer, Integer>>(
                             lrearrange.isProjectStar(), 
lrearrange.getProjectedColsMap()));
             pkg.getPkgr().setKeyInfo(keyInfo);

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1669150&r1=1669149&r2=1669150&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 Wed Mar 25 16:42:40 2015
@@ -65,7 +65,11 @@ public class MultiQueryOptimizerTez exte
 
                 // Detect diamond shape, we cannot merge it into split, since 
Tez
                 // does not handle double edge between vertexes
-                // TODO: PIG-3876 to handle this by writing to same edge
+                // TODO:
+                //    - Vertex groups handles double edges though. For the 
case where the
+                //      double edges are unioned (successor is a union vertex),
+                //      try merge into split if union optimizer is turned on.
+                //    - PIG-3876 to handle this by writing to same edge
                 Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>();
                 Set<TezOperator> toMergeSuccessors = new 
HashSet<TezOperator>();
                 mergedSuccessors.addAll(successors);

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1669150&r1=1669149&r2=1669150&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Wed Mar 25 16:42:40 2015
@@ -1537,6 +1537,22 @@ d = cross a, c;
 e = union b, d;
 store e into ':OUTPATH:';\,
             },
+            { ## Multiple splits
+            'num' => 16,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa:float);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, 
age, gpa:float);
+c = filter a by gpa >= 4;
+c1 = foreach c generate *;
+c2 = foreach c generate *;
+c3 = union c1, c2;
+d = filter a by gpa < 4;
+d1 = foreach d generate *;
+d2 = foreach d generate *;
+d3 = union d1, d2;
+a1 = union c3, d3;
+e = join a1 by name, b by name;
+store e into ':OUTPATH:';\,
+            },
                ]
                },
                {


Reply via email to