Author: xuefu
Date: Wed May 20 13:23:21 2015
New Revision: 1680561

URL: http://svn.apache.org/r1680561
Log:
PIG-4552: Fix TestForEachNestedPlanLocal for Spark engine (Mohit via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1680561&r1=1680560&r2=1680561&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Wed May 20 13:23:21 2015
@@ -458,21 +458,23 @@ public class SparkLauncher extends Launc
                        SparkPigStats sparkStats, JobConf jobConf) throws 
IOException,
                        InterruptedException {
                Set<Integer> seenJobIDs = new HashSet<Integer>();
-               if (sparkPlan != null) {
-                       List<SparkOperator> leaves = sparkPlan.getLeaves();
-                       Collections.sort(leaves);
-                       Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new 
HashMap();
-                       if (LOG.isDebugEnabled())
-                           LOG.debug("Converting " + leaves.size() + " Spark 
Operators");
-                       for (SparkOperator leaf : leaves) {
-                               new PhyPlanSetter(leaf.physicalPlan).visit();
-                               Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = 
new HashMap();
-                               sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
-                                               physicalOpToRdds, convertMap, 
seenJobIDs, sparkStats,
-                                               jobConf);
-                       }
-               } else {
-                       throw new RuntimeException("sparkPlan is null");
+               if (sparkPlan == null) {
+                       throw new RuntimeException("SparkPlan is null.");
+               }
+
+               List<SparkOperator> leaves = sparkPlan.getLeaves();
+               Collections.sort(leaves);
+               Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Converting " + leaves.size() + " Spark 
Operators to RDDs");
+               }
+
+               for (SparkOperator leaf : leaves) {
+                       new PhyPlanSetter(leaf.physicalPlan).visit();
+                       Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new 
HashMap();
+                       sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
+                                       physicalOpToRdds, convertMap, 
seenJobIDs, sparkStats,
+                                       jobConf);
                }
        }
 
@@ -509,21 +511,21 @@ public class SparkLauncher extends Launc
                                                                        + 
"sparkOperator:{}",
                                                        
sparkOperator.physicalPlan.getLeaves().size(),
                                                        sparkOperator.name()));
-               } else {
-                       PhysicalOperator leafPO = leafPOs.get(0);
-                       try {
-                               physicalToRDD(sparkOperator.physicalPlan, 
leafPO, physicalOpRdds,
-                                               predecessorRDDs, convertMap);
-                               sparkOpRdds.put(sparkOperator.getOperatorKey(),
-                                               
physicalOpRdds.get(leafPO.getOperatorKey()));
-                       } catch(Exception e) {
-                               if( e instanceof  SparkException) {
-                                       LOG.info("throw SparkException, error 
founds when running " +
-                                                       "rdds in spark");
-                               }
-                               exception = e;
-                               isFail = true;
+               }
+
+               PhysicalOperator leafPO = leafPOs.get(0);
+               try {
+                       physicalToRDD(sparkOperator.physicalPlan, leafPO, 
physicalOpRdds,
+                                       predecessorRDDs, convertMap);
+                       sparkOpRdds.put(sparkOperator.getOperatorKey(),
+                                       
physicalOpRdds.get(leafPO.getOperatorKey()));
+               } catch(Exception e) {
+                       if( e instanceof  SparkException) {
+                               LOG.info("throw SparkException, error founds 
when running " +
+                                               "rdds in spark");
                        }
+                       exception = e;
+                       isFail = true;
                }
 
                List<POStore> poStores = PlanHelper.getPhysicalOperators(
@@ -541,10 +543,10 @@ public class SparkLauncher extends Launc
                         conf, exception);
             }
         } else {
-                             LOG.info(String
-                                             
.format(String.format("sparkOperator:{} does not have POStore or" +
-                    " sparkOperator has more than 1 POStore. {} is the size of 
POStore."),
-                    sparkOperator.name(), poStores.size()));
+                       LOG.info(String
+                                       .format(String.format("sparkOperator:{} 
does not have POStore or" +
+                                                                       " 
sparkOperator has more than 1 POStore. {} is the size of POStore."),
+                                                       sparkOperator.name(), 
poStores.size()));
                }
        }
 
@@ -557,9 +559,10 @@ public class SparkLauncher extends Launc
         RDD<Tuple> nextRDD = null;
         List<PhysicalOperator> predecessors = plan
                 .getPredecessors(physicalOperator);
-        if (predecessors != null) {
+        if (predecessors != null && predecessors.size() > 1) {
             Collections.sort(predecessors);
         }
+
         List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
                if (predecessors != null) {
                        for (PhysicalOperator predecessor : predecessors) {

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1680561&r1=1680560&r2=1680561&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
Wed May 20 13:23:21 2015
@@ -100,14 +100,8 @@ public class TestForEachNestedPlanLocal
         pig.registerQuery("D = foreach C {"
                 + "crossed = cross user, session;"
                 + "generate crossed;" + "}");
-        Iterator<Tuple> expectedItr = expectedResults.iterator();
         Iterator<Tuple> actualItr = pig.openIterator("D");
-        while (expectedItr.hasNext() && actualItr.hasNext()) {
-            Tuple expectedTuple = expectedItr.next();
-            Tuple actualTuple = actualItr.next();
-            assertEquals(expectedTuple, actualTuple);
-        }
-        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+        Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
     }
 
     @Test
@@ -130,14 +124,8 @@ public class TestForEachNestedPlanLocal
                 + "crossed = cross user, distinct_session;"
                 + "filtered = filter crossed by user::region == 
distinct_session::region;"
                 + "generate filtered;" + "}");
-        Iterator<Tuple> expectedItr = expectedResults.iterator();
         Iterator<Tuple> actualItr = pig.openIterator("D");
-        while (expectedItr.hasNext() && actualItr.hasNext()) {
-            Tuple expectedTuple = expectedItr.next();
-            Tuple actualTuple = actualItr.next();
-            assertEquals(expectedTuple, actualTuple);
-        }
-        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+        Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
     }
 
     @Test
@@ -161,14 +149,8 @@ public class TestForEachNestedPlanLocal
         pig.registerQuery("D = foreach C {"
                 + "crossed = cross user, session, profile;"
                 + "generate crossed;" + "}");
-        Iterator<Tuple> expectedItr = expectedResults.iterator();
         Iterator<Tuple> actualItr = pig.openIterator("D");
-        while (expectedItr.hasNext() && actualItr.hasNext()) {
-            Tuple expectedTuple = expectedItr.next();
-            Tuple actualTuple = actualItr.next();
-            assertEquals(expectedTuple, actualTuple);
-        }
-        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+        Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
     }
 
     /*


Reply via email to