Author: szita
Date: Mon May 29 21:45:33 2017
New Revision: 1796703
URL: http://svn.apache.org/viewvc?rev=1796703&view=rev
Log:
PIG-5207: BugFix e2e tests fail on spark (szita)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1796703&r1=1796702&r2=1796703&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 29 21:45:33 2017
@@ -109,6 +109,8 @@ OPTIMIZATIONS
Â
BUG FIXES
+PIG-5207: BugFix e2e tests fail on spark (szita)
+
PIG-5194: HiveUDF fails with Spark exec type (szita)
PIG-5231: PigStorage with -schema may produce inconsistent outputs with more
fields (knoguchi)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1796703&r1=1796702&r2=1796703&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
Mon May 29 21:45:33 2017
@@ -24,6 +24,7 @@ import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -40,6 +41,8 @@ import org.apache.pig.impl.plan.PlanExce
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
+import com.google.common.collect.HashBiMap;
+
/**
*
* The base class for all types of physical plans.
@@ -304,6 +307,16 @@ public class PhysicalPlan extends Operat
}
}
+ //Fix order of edges in mToEdges lists
+ Map<PhysicalOperator, PhysicalOperator> invertedMatches =
HashBiMap.create(matches).inverse();
+ for (PhysicalOperator newOp : clone.mToEdges.keySet()) {
+ List<PhysicalOperator> newList = clone.mToEdges.get(newOp);
+ if (newList.size() > 1) {
+ List<PhysicalOperator> originalList =
this.mToEdges.get(invertedMatches.get(newOp));
+ Collections.sort(newList, new
EdgeOrderHelper(originalList,invertedMatches));
+ }
+ }
+
return clone;
}
@@ -315,4 +328,21 @@ public class PhysicalPlan extends Operat
{
opmap = null;
}
+
+
+ private static class EdgeOrderHelper implements
Comparator<PhysicalOperator> {
+
+ private final Map<PhysicalOperator, PhysicalOperator> invertedMatches;
+ private final List<PhysicalOperator> originalList;
+
+ public EdgeOrderHelper(List<PhysicalOperator> originalList,
Map<PhysicalOperator, PhysicalOperator> invertedMatches) {
+ this.originalList = originalList;
+ this.invertedMatches = invertedMatches;
+ }
+
+ @Override
+ public int compare(PhysicalOperator o1, PhysicalOperator o2) {
+ return originalList.indexOf(invertedMatches.get(o1)) -
originalList.indexOf(invertedMatches.get(o2));
+ }
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1796703&r1=1796702&r2=1796703&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
Mon May 29 21:45:33 2017
@@ -296,8 +296,13 @@ public class CombinerOptimizer extends S
);
newProj.setResultType(DataType.BAG);
- PhysicalOperator udfInput =
pplan.getPredecessors(combineUdf).get(0);
- pplan.disconnect(udfInput, combineUdf);
+ for (PhysicalOperator originalUdfInput :
pplan.getPredecessors(combineUdf).toArray(new PhysicalOperator[0])) {
+ if (pplan.getPredecessors(originalUdfInput) != null) {
+ pplan.trimAbove(originalUdfInput);
+ }
+ pplan.remove(originalUdfInput);
+ }
+
pplan.add(newProj);
pplan.connect(newProj, combineUdf);
i++;