Author: xuefu
Date: Wed May 20 13:23:21 2015
New Revision: 1680561
URL: http://svn.apache.org/r1680561
Log:
PIG-4552: Fix TestForEachNestedPlanLocal for Spark engine (Mohit via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1680561&r1=1680560&r2=1680561&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed May 20 13:23:21 2015
@@ -458,21 +458,23 @@ public class SparkLauncher extends Launc
SparkPigStats sparkStats, JobConf jobConf) throws
IOException,
InterruptedException {
Set<Integer> seenJobIDs = new HashSet<Integer>();
- if (sparkPlan != null) {
- List<SparkOperator> leaves = sparkPlan.getLeaves();
- Collections.sort(leaves);
- Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new
HashMap();
- if (LOG.isDebugEnabled())
- LOG.debug("Converting " + leaves.size() + " Spark
Operators");
- for (SparkOperator leaf : leaves) {
- new PhyPlanSetter(leaf.physicalPlan).visit();
- Map<OperatorKey, RDD<Tuple>> physicalOpToRdds =
new HashMap();
- sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
- physicalOpToRdds, convertMap,
seenJobIDs, sparkStats,
- jobConf);
- }
- } else {
- throw new RuntimeException("sparkPlan is null");
+ if (sparkPlan == null) {
+ throw new RuntimeException("SparkPlan is null.");
+ }
+
+ List<SparkOperator> leaves = sparkPlan.getLeaves();
+ Collections.sort(leaves);
+ Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Converting " + leaves.size() + " Spark
Operators to RDDs");
+ }
+
+ for (SparkOperator leaf : leaves) {
+ new PhyPlanSetter(leaf.physicalPlan).visit();
+ Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new
HashMap();
+ sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
+ physicalOpToRdds, convertMap,
seenJobIDs, sparkStats,
+ jobConf);
}
}
@@ -509,21 +511,21 @@ public class SparkLauncher extends Launc
+
"sparkOperator:{}",
sparkOperator.physicalPlan.getLeaves().size(),
sparkOperator.name()));
- } else {
- PhysicalOperator leafPO = leafPOs.get(0);
- try {
- physicalToRDD(sparkOperator.physicalPlan,
leafPO, physicalOpRdds,
- predecessorRDDs, convertMap);
- sparkOpRdds.put(sparkOperator.getOperatorKey(),
-
physicalOpRdds.get(leafPO.getOperatorKey()));
- } catch(Exception e) {
- if( e instanceof SparkException) {
- LOG.info("throw SparkException, error
founds when running " +
- "rdds in spark");
- }
- exception = e;
- isFail = true;
+ }
+
+ PhysicalOperator leafPO = leafPOs.get(0);
+ try {
+ physicalToRDD(sparkOperator.physicalPlan, leafPO,
physicalOpRdds,
+ predecessorRDDs, convertMap);
+ sparkOpRdds.put(sparkOperator.getOperatorKey(),
+
physicalOpRdds.get(leafPO.getOperatorKey()));
+ } catch(Exception e) {
+ if( e instanceof SparkException) {
+ LOG.info("throw SparkException, error founds
when running " +
+ "rdds in spark");
}
+ exception = e;
+ isFail = true;
}
List<POStore> poStores = PlanHelper.getPhysicalOperators(
@@ -541,10 +543,10 @@ public class SparkLauncher extends Launc
conf, exception);
}
} else {
- LOG.info(String
-
.format(String.format("sparkOperator:{} does not have POStore or" +
- " sparkOperator has more than 1 POStore. {} is the size of
POStore."),
- sparkOperator.name(), poStores.size()));
+ LOG.info(String
+ .format(String.format("sparkOperator:{}
does not have POStore or" +
+ "
sparkOperator has more than 1 POStore. {} is the size of POStore."),
+ sparkOperator.name(),
poStores.size()));
}
}
@@ -557,9 +559,10 @@ public class SparkLauncher extends Launc
RDD<Tuple> nextRDD = null;
List<PhysicalOperator> predecessors = plan
.getPredecessors(physicalOperator);
- if (predecessors != null) {
+ if (predecessors != null && predecessors.size() > 1) {
Collections.sort(predecessors);
}
+
List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
if (predecessors != null) {
for (PhysicalOperator predecessor : predecessors) {
Modified:
pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1680561&r1=1680560&r2=1680561&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
Wed May 20 13:23:21 2015
@@ -100,14 +100,8 @@ public class TestForEachNestedPlanLocal
pig.registerQuery("D = foreach C {"
+ "crossed = cross user, session;"
+ "generate crossed;" + "}");
- Iterator<Tuple> expectedItr = expectedResults.iterator();
Iterator<Tuple> actualItr = pig.openIterator("D");
- while (expectedItr.hasNext() && actualItr.hasNext()) {
- Tuple expectedTuple = expectedItr.next();
- Tuple actualTuple = actualItr.next();
- assertEquals(expectedTuple, actualTuple);
- }
- assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+ Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
}
@Test
@@ -130,14 +124,8 @@ public class TestForEachNestedPlanLocal
+ "crossed = cross user, distinct_session;"
+ "filtered = filter crossed by user::region ==
distinct_session::region;"
+ "generate filtered;" + "}");
- Iterator<Tuple> expectedItr = expectedResults.iterator();
Iterator<Tuple> actualItr = pig.openIterator("D");
- while (expectedItr.hasNext() && actualItr.hasNext()) {
- Tuple expectedTuple = expectedItr.next();
- Tuple actualTuple = actualItr.next();
- assertEquals(expectedTuple, actualTuple);
- }
- assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+ Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
}
@Test
@@ -161,14 +149,8 @@ public class TestForEachNestedPlanLocal
pig.registerQuery("D = foreach C {"
+ "crossed = cross user, session, profile;"
+ "generate crossed;" + "}");
- Iterator<Tuple> expectedItr = expectedResults.iterator();
Iterator<Tuple> actualItr = pig.openIterator("D");
- while (expectedItr.hasNext() && actualItr.hasNext()) {
- Tuple expectedTuple = expectedItr.next();
- Tuple actualTuple = actualItr.next();
- assertEquals(expectedTuple, actualTuple);
- }
- assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+ Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
}
/*