Author: szita Date: Mon May 29 21:45:33 2017 New Revision: 1796703 URL: http://svn.apache.org/viewvc?rev=1796703&view=rev Log: PIG-5207: BugFix e2e tests fail on spark (szita)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1796703&r1=1796702&r2=1796703&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Mon May 29 21:45:33 2017 @@ -109,6 +109,8 @@ OPTIMIZATIONS Â BUG FIXES +PIG-5207: BugFix e2e tests fail on spark (szita) + PIG-5194: HiveUDF fails with Spark exec type (szita) PIG-5231: PigStorage with -schema may produce inconsistent outputs with more fields (knoguchi) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1796703&r1=1796702&r2=1796703&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Mon May 29 21:45:33 2017 @@ -24,6 +24,7 @@ import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,6 +41,8 @@ import org.apache.pig.impl.plan.PlanExce import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.MultiMap; +import com.google.common.collect.HashBiMap; + /** * * The base class for all types of physical plans. @@ -304,6 +307,16 @@ public class PhysicalPlan extends Operat } } + //Fix order of edges in mToEdges lists + Map<PhysicalOperator, PhysicalOperator> invertedMatches = HashBiMap.create(matches).inverse(); + for (PhysicalOperator newOp : clone.mToEdges.keySet()) { + List<PhysicalOperator> newList = clone.mToEdges.get(newOp); + if (newList.size() > 1) { + List<PhysicalOperator> originalList = this.mToEdges.get(invertedMatches.get(newOp)); + Collections.sort(newList, new EdgeOrderHelper(originalList,invertedMatches)); + } + } + return clone; } @@ -315,4 +328,21 @@ public class PhysicalPlan extends Operat { opmap = null; } + + + private static class EdgeOrderHelper implements Comparator<PhysicalOperator> { + + private final Map<PhysicalOperator, PhysicalOperator> invertedMatches; + private final List<PhysicalOperator> originalList; + + public EdgeOrderHelper(List<PhysicalOperator> originalList, Map<PhysicalOperator, PhysicalOperator> invertedMatches) { + this.originalList = originalList; + this.invertedMatches = invertedMatches; + } + + @Override + public int compare(PhysicalOperator o1, PhysicalOperator o2) { + return originalList.indexOf(invertedMatches.get(o1)) - originalList.indexOf(invertedMatches.get(o2)); + } + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1796703&r1=1796702&r2=1796703&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Mon May 29 21:45:33 2017 @@ -296,8 +296,13 @@ public class CombinerOptimizer extends S ); newProj.setResultType(DataType.BAG); - PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0); - pplan.disconnect(udfInput, combineUdf); + for (PhysicalOperator originalUdfInput : pplan.getPredecessors(combineUdf).toArray(new PhysicalOperator[0])) { + if (pplan.getPredecessors(originalUdfInput) != null) { + pplan.trimAbove(originalUdfInput); + } + pplan.remove(originalUdfInput); + } + pplan.add(newProj); pplan.connect(newProj, combineUdf); i++;