Author: rohini
Date: Wed Mar 25 16:42:40 2015
New Revision: 1669150
URL: http://svn.apache.org/r1669150
Log:
PIG-4479: Pig script with union within nested splits followed by join failed on
Tez (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
pig/trunk/test/e2e/pig/tests/nightly.conf
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1669150&r1=1669149&r2=1669150&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 25 16:42:40 2015
@@ -56,6 +56,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4479: Pig script with union within nested splits followed by join failed
on Tez (rohini)
+
PIG-4457: Error is thrown by JobStats.getOutputSize() when storing to a MySql
table (rohini)
PIG-4475: Keys in AvroMapWrapper are not proper Pig types (rdsr via daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1669150&r1=1669149&r2=1669150&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Wed Mar 25 16:42:40 2015
@@ -447,7 +447,7 @@ public class TezDagBuilder extends TezOp
setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf,
pkgTezOp);
LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
- combinePlan, pkgTezOp, combPack);
+ combinePlan, null, pkgTezOp, combPack);
lrDiscoverer.visit();
combinePlan.remove(combPack);
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1669150&r1=1669149&r2=1669150&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
Wed Mar 25 16:42:40 2015
@@ -30,6 +30,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -67,11 +68,13 @@ public class TezPOPackageAnnotator exten
List<TezOperator> preds = this.mPlan.getPredecessors(pkgTezOp);
for (Iterator<TezOperator> it = preds.iterator(); it.hasNext();) {
TezOperator predTezOp = it.next();
+ TezOperator predTezOpVertexGrp = null;
if (predTezOp.isVertexGroup()) {
+ predTezOpVertexGrp = predTezOp;
// Just get one of the inputs to vertex group
predTezOp =
getPlan().getOperator(predTezOp.getVertexGroupMembers().get(0));
}
- lrFound += patchPackage(predTezOp, pkgTezOp, pkg);
+ lrFound += patchPackage(predTezOp, predTezOpVertexGrp, pkgTezOp,
pkg);
if(lrFound == pkg.getNumInps()) {
break;
}
@@ -79,13 +82,19 @@ public class TezPOPackageAnnotator exten
if(lrFound != pkg.getNumInps()) {
int errCode = 2086;
- String msg = "Unexpected problem during optimization. Could not
find all LocalRearrange operators.";
+ String msg = "Unexpected problem during optimization. "
+ + "Could not find all LocalRearrange operators. Expected: "
+ + pkg.getNumInps() + ", Found: " + lrFound;
throw new OptimizerException(msg, errCode, PigException.BUG);
}
}
- private int patchPackage(TezOperator predTezOp, TezOperator pkgTezOp,
POPackage pkg) throws VisitorException {
- LoRearrangeDiscoverer lrDiscoverer = new
LoRearrangeDiscoverer(predTezOp.plan, pkgTezOp, pkg);
+ private int patchPackage(TezOperator predTezOp,
+ TezOperator predTezOpVertexGrp,
+ TezOperator pkgTezOp,
+ POPackage pkg) throws VisitorException {
+ LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
+ predTezOp.plan, predTezOpVertexGrp, pkgTezOp, pkg);
lrDiscoverer.visit();
// let our caller know if we managed to patch
// the package
@@ -131,13 +140,24 @@ public class TezPOPackageAnnotator exten
private int loRearrangeFound = 0;
private TezOperator pkgTezOp;
private POPackage pkg;
+ private TezOperator predTezOpVertexGrp;
+ private boolean isPOSplit;
- public LoRearrangeDiscoverer(PhysicalPlan plan, TezOperator pkgTezOp,
POPackage pkg) {
- super(plan, new DepthFirstWalker<PhysicalOperator,
PhysicalPlan>(plan));
+ public LoRearrangeDiscoverer(PhysicalPlan predPlan, TezOperator
predTezOpVertexGrp, TezOperator pkgTezOp, POPackage pkg) {
+ super(predPlan, new DepthFirstWalker<PhysicalOperator,
PhysicalPlan>(predPlan));
this.pkgTezOp = pkgTezOp;
this.pkg = pkg;
+ this.predTezOpVertexGrp = predTezOpVertexGrp;
+ }
+
+ @Override
+ public void visitSplit(POSplit spl) throws VisitorException {
+ isPOSplit = true;
+ super.visitSplit(spl);
}
+
+
@Override
public void visitLocalRearrange(POLocalRearrange lrearrange) throws
VisitorException {
POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
@@ -160,17 +180,24 @@ public class TezPOPackageAnnotator exten
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer,
Integer>>>();
- if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
- // something is wrong - we should not be getting key info
- // for the same index from two different Local Rearranges
- int errCode = 2087;
- String msg = "Unexpected problem during optimization." +
- " Found index:" + lrearrange.getIndex() +
- " in multiple LocalRearrange operators.";
- throw new OptimizerException(msg, errCode, PigException.BUG);
+ Integer index = Integer.valueOf(lrearrange.getIndex());
+ if(keyInfo.get(index) != null) {
+ if (isPOSplit && predTezOpVertexGrp != null ) {
+ // Case of POSplit having more than one member of the
vertex group
+ loRearrangeFound--;
+ } else {
+ // something is wrong - we should not be getting key info
+ // for the same index from two different Local Rearranges
+ int errCode = 2087;
+ String msg = "Unexpected problem during optimization." +
+ " Found index:" + lrearrange.getIndex() +
+ " in multiple LocalRearrange operators.";
+ throw new OptimizerException(msg, errCode,
PigException.BUG);
+ }
}
- keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+
+ keyInfo.put(index,
new Pair<Boolean, Map<Integer, Integer>>(
lrearrange.isProjectStar(),
lrearrange.getProjectedColsMap()));
pkg.getPkgr().setKeyInfo(keyInfo);
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1669150&r1=1669149&r2=1669150&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
Wed Mar 25 16:42:40 2015
@@ -65,7 +65,11 @@ public class MultiQueryOptimizerTez exte
// Detect diamond shape, we cannot merge it into split, since
Tez
// does not handle double edge between vertexes
- // TODO: PIG-3876 to handle this by writing to same edge
+ // TODO:
+ // - Vertex groups handles double edges though. For the
case where the
+ // double edges are unioned (successor is a union vertex),
+ // try merge into split if union optimizer is turned on.
+ // - PIG-3876 to handle this by writing to same edge
Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>();
Set<TezOperator> toMergeSuccessors = new
HashSet<TezOperator>();
mergedSuccessors.addAll(successors);
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL:
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1669150&r1=1669149&r2=1669150&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Wed Mar 25 16:42:40 2015
@@ -1537,6 +1537,22 @@ d = cross a, c;
e = union b, d;
store e into ':OUTPATH:';\,
},
+ { ## Multiple splits
+ 'num' => 16,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa:float);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name,
age, gpa:float);
+c = filter a by gpa >= 4;
+c1 = foreach c generate *;
+c2 = foreach c generate *;
+c3 = union c1, c2;
+d = filter a by gpa < 4;
+d1 = foreach d generate *;
+d2 = foreach d generate *;
+d3 = union d1, d2;
+a1 = union c3, d3;
+e = join a1 by name, b by name;
+store e into ':OUTPATH:';\,
+ },
]
},
{