Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanPushUpFilter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanPushUpFilter.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanPushUpFilter.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanPushUpFilter.java Mon Mar 28 
16:58:33 2016
@@ -36,6 +36,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOCross;
 import org.apache.pig.newplan.logical.relational.LODistinct;
 import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
@@ -75,18 +76,18 @@ public class TestNewPlanPushUpFilter {
      * A simple filter UDF for testing
      */
     static public class MyFilterFunc extends FilterFunc {
-        
+
         @Override
         public Boolean exec(Tuple input) {
             return false;
         }
     }
-    
+
     @Test
     // Empty plan, nothing to update
     public void testErrorEmptyInput() throws Exception {
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( "" );
-        
+
         Assert.assertTrue( !newLogicalPlan.getOperators().hasNext() );
     }
 
@@ -94,61 +95,69 @@ public class TestNewPlanPushUpFilter {
     //Test to ensure that the right exception is thrown when the input list is 
empty
     public void testErrorNonFilterInput() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);store A into 
'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator op = newLogicalPlan.getSources().get(0);
         Assert.assertTrue( op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue( op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue( op instanceof LOStore );
         Assert.assertTrue( newLogicalPlan.getSuccessors(op) == null );
     }
-    
+
     @Test
     public void testFilterLoad() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
             "B = filter A by $1 < 18;" +
             "store B into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get(0);
         Assert.assertTrue( op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue( op instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterStreaming() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
             "B = stream A through `" + "ps -u" + "`;" +
             "C = filter B by $1 < 18;" +
-            "D = STORE C into 'dummy';";        
-        
+            "D = STORE C into 'dummy';";
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStream );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
     }
-    
+
     @Test
     public void testFilterSort() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
             "B = order A by $1, $2;" +
             "C = filter B by $1 < 18;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOSort );
@@ -162,12 +171,14 @@ public class TestNewPlanPushUpFilter {
             "B = order A by $1, $2;" +
             "C = filter B by 1 == 1;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOSort );
@@ -181,31 +192,35 @@ public class TestNewPlanPushUpFilter {
             "B = order A by $1, $2;" +
             "C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOSort );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterDistinct() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
             "B = distinct A;" +
             "C = filter B by $1 < 18;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LODistinct );
@@ -219,12 +234,14 @@ public class TestNewPlanPushUpFilter {
             "B = distinct A;" +
             "C = filter B by 1 == 1;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LODistinct );
@@ -238,31 +255,35 @@ public class TestNewPlanPushUpFilter {
             "B = distinct A;" +
             "C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LODistinct );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterFilter() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
             "B = filter A by $0 != 'name';" +
             "C = filter B by $1 < 18;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         Assert.assertTrue( ((LOFilter)op).getAlias().equals( "B" ) );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
@@ -278,12 +299,14 @@ public class TestNewPlanPushUpFilter {
             "split A into B if $1 < 18, C if $1 >= 18;" +
             "C = filter B by $1 < 10;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOSplit );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOSplitOutput );
@@ -299,12 +322,14 @@ public class TestNewPlanPushUpFilter {
             "B = limit A 10;" +
             "C = filter B by $1 < 18;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOLimit );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
@@ -319,7 +344,7 @@ public class TestNewPlanPushUpFilter {
             "C = union A, B;" +
             "D = filter C by $1 < 18;" +
             "E = STORE D into'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -336,21 +361,25 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
 
-        Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  foreachA instanceof LOForEach );
+        Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
         Assert.assertTrue(  filterA instanceof LOFilter );
-        Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  foreachB instanceof LOForEach );
+        Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
         Assert.assertTrue(  filterB instanceof LOFilter );
-        
+
         Operator unionA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
         Assert.assertTrue(  unionA instanceof LOUnion );
         Operator unionB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
         Assert.assertTrue(  unionB instanceof LOUnion );
         Assert.assertTrue(  unionB == unionA );
-        
+
         Operator store = newLogicalPlan.getSuccessors(unionA).get( 0 );
         Assert.assertTrue(  store instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterConstantConditionUnion() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -358,7 +387,7 @@ public class TestNewPlanPushUpFilter {
             "C = union A, B;" +
             "D = filter C by 1 == 1;" +
             "E = STORE D into'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -375,21 +404,25 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
 
-        Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  foreachA instanceof LOForEach );
+        Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
         Assert.assertTrue(  filterA instanceof LOFilter );
-        Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  foreachB instanceof LOForEach );
+        Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
         Assert.assertTrue(  filterB instanceof LOFilter );
-        
+
         Operator unionA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
         Assert.assertTrue(  unionA instanceof LOUnion );
         Operator unionB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
         Assert.assertTrue(  unionB instanceof LOUnion );
         Assert.assertTrue(  unionB == unionA );
-        
+
         Operator store = newLogicalPlan.getSuccessors(unionA).get( 0 );
         Assert.assertTrue(  store instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterUDFUnion() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -397,7 +430,7 @@ public class TestNewPlanPushUpFilter {
             "C = union A, B;" +
             "D = filter C by " + MyFilterFunc.class.getName() + "() ;" +
             "E = STORE D into'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -414,21 +447,25 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
 
-        Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  foreachA instanceof LOForEach );
+        Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
         Assert.assertTrue(  filterA instanceof LOFilter );
-        Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  foreachB instanceof LOForEach );
+        Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
         Assert.assertTrue(  filterB instanceof LOFilter );
-        
+
         Operator unionA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
         Assert.assertTrue(  unionA instanceof LOUnion );
         Operator unionB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
         Assert.assertTrue(  unionB instanceof LOUnion );
         Assert.assertTrue(  unionB == unionA );
-        
+
         Operator store = newLogicalPlan.getSuccessors(unionA).get( 0 );
         Assert.assertTrue(  store instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterCross() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -437,7 +474,7 @@ public class TestNewPlanPushUpFilter {
             "D = filter C by $5 < 18;" +
             "E = limit D 10;" +
             "F = STORE E into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -445,12 +482,14 @@ public class TestNewPlanPushUpFilter {
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOCross );
@@ -465,7 +504,7 @@ public class TestNewPlanPushUpFilter {
             "C = cross A, B;" +
             "D = filter C by $1 < 18;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -473,12 +512,14 @@ public class TestNewPlanPushUpFilter {
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue( op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOCross );
@@ -486,7 +527,7 @@ public class TestNewPlanPushUpFilter {
         Assert.assertTrue(  op instanceof LOStore );
     }
 
-    
+
     @Test
     public void testFilterCross2() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -494,7 +535,7 @@ public class TestNewPlanPushUpFilter {
             "C = cross A, B;" +
             "D = filter C by $1 < 18 and $5 < 18;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -511,14 +552,18 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
 
-        Operator op = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  foreachA instanceof LOForEach );
+        Operator op = newLogicalPlan.getSuccessors(foreachA).get( 0 );
         Assert.assertTrue(  op instanceof LOCross );
-        op = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  foreachB instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(foreachB).get( 0 );
         Assert.assertTrue(  op instanceof LOCross );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
     }
-    
+
     @Test
     public void testFilterConstantConditionCross() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -526,7 +571,7 @@ public class TestNewPlanPushUpFilter {
             "C = cross A, B;" +
             "D = filter C by 1 == 1;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -543,13 +588,17 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
         Operator op = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOCross );
     }
-    
+
     @Test
     public void testFilterUDFCross() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -557,7 +606,7 @@ public class TestNewPlanPushUpFilter {
             "C = cross A, B;" +
             "D = filter C by " + MyFilterFunc.class.getName() + "($0) ;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -565,19 +614,21 @@ public class TestNewPlanPushUpFilter {
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOCross );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterCogroup() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -585,7 +636,7 @@ public class TestNewPlanPushUpFilter {
             "C = cogroup A by $0, B by $0;" +
             "D = filter C by $0 < 'name';" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -602,22 +653,26 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
 
-        Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  foreachA instanceof LOForEach );
+        Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
         Assert.assertTrue(  filterA instanceof LOFilter );
-        
-        Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+
+        Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  foreachB instanceof LOForEach );
+        Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
         Assert.assertTrue(  filterB instanceof LOFilter );
-        
+
         Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
         Assert.assertTrue(  cogrpA instanceof LOCogroup );
         Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
         Assert.assertTrue(  cogrpB instanceof LOCogroup );
         Assert.assertTrue(  cogrpB == cogrpA );
-        
+
         Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 );
         Assert.assertTrue(  store instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterConstantConditionCogroup() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -625,7 +680,7 @@ public class TestNewPlanPushUpFilter {
             "C = cogroup A by $0, B by $0;" +
             "D = filter C by 1 == 1;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -642,23 +697,27 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
 
-        Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  foreachA instanceof LOForEach );
+        Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
         Assert.assertTrue(  filterA instanceof LOFilter );
-        
-        Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+
+        Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  foreachB instanceof LOForEach );
+        Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
         Assert.assertTrue(  filterB instanceof LOFilter );
-        
+
         Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
         Assert.assertTrue(  cogrpA instanceof LOCogroup );
         Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
         Assert.assertTrue(  cogrpB instanceof LOCogroup );
         Assert.assertTrue(  cogrpB == cogrpA );
-        
+
         Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 );
         Assert.assertTrue(  store instanceof LOStore );
     }
-    
-    
+
+
     @Test
     public void testFilterUDFCogroup() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -666,7 +725,7 @@ public class TestNewPlanPushUpFilter {
             "C = cogroup A by $0, B by $0;" +
             "D = filter C by " + MyFilterFunc.class.getName() + "($1) ;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -683,18 +742,22 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
 
-        Operator cogroupA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  foreachA instanceof LOForEach );
+        Operator cogroupA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
         Assert.assertTrue(  cogroupA instanceof LOCogroup );
-        
-        Operator cogroupB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+
+        Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  foreachB instanceof LOForEach );
+        Operator cogroupB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
         Assert.assertTrue(  cogroupB instanceof LOCogroup );
-        
+
         Operator filter = newLogicalPlan.getSuccessors( cogroupA ).get( 0 );
         Assert.assertTrue(  filter instanceof LOFilter );
         filter = newLogicalPlan.getSuccessors( cogroupB ).get( 0 );
         Assert.assertTrue(  cogroupB instanceof LOCogroup );
         Assert.assertTrue(  cogroupB == cogroupA );
-        
+
         Operator store = newLogicalPlan.getSuccessors(filter).get( 0 );
         Assert.assertTrue(  store instanceof LOStore );
     }
@@ -706,7 +769,7 @@ public class TestNewPlanPushUpFilter {
             "C = cogroup A by $0, B by $0 outer;" +
             "D = filter C by $0 < 'name';" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -723,22 +786,26 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
 
-        Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  foreachA instanceof LOForEach );
+        Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
         Assert.assertTrue(  filterA instanceof LOFilter );
-        
-        Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+
+        Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  foreachB instanceof LOForEach );
+        Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
         Assert.assertTrue(  filterB instanceof LOFilter );
-        
+
         Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
         Assert.assertTrue(  cogrpA instanceof LOCogroup );
         Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
         Assert.assertTrue(  cogrpB instanceof LOCogroup );
         Assert.assertTrue(  cogrpB == cogrpA );
-        
+
         Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 );
         Assert.assertTrue(  store instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterConstantConditionCogroupOuter() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -746,7 +813,7 @@ public class TestNewPlanPushUpFilter {
             "C = cogroup A by $0, B by $0 outer;" +
             "D = filter C by 1 == 1;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -763,22 +830,26 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
 
-        Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  foreachA instanceof LOForEach );
+        Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
         Assert.assertTrue(  filterA instanceof LOFilter );
-        
-        Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+
+        Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  foreachB instanceof LOForEach );
+        Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
         Assert.assertTrue(  filterB instanceof LOFilter );
-        
+
         Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
         Assert.assertTrue(  cogrpA instanceof LOCogroup );
         Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
         Assert.assertTrue(  cogrpB instanceof LOCogroup );
         Assert.assertTrue(  cogrpB == cogrpA );
-        
+
         Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 );
         Assert.assertTrue(  store instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterUDFCogroupOuter() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -786,7 +857,7 @@ public class TestNewPlanPushUpFilter {
             "C = cogroup A by $0, B by $0 outer;" +
             "D = filter C by " + MyFilterFunc.class.getName() + "() ;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -803,72 +874,82 @@ public class TestNewPlanPushUpFilter {
             loadB = loads.get( 0 );
         }
 
-        Operator cogroupA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
+        Assert.assertTrue(  foreachA instanceof LOForEach );
+        Operator cogroupA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
         Assert.assertTrue(  cogroupA instanceof LOCogroup );
-        
-        Operator cogroupB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+
+        Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
+        Assert.assertTrue(  foreachB instanceof LOForEach );
+        Operator cogroupB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
         Assert.assertTrue(  cogroupB instanceof LOCogroup );
-        
+
         Operator filter = newLogicalPlan.getSuccessors( cogroupA ).get( 0 );
         Assert.assertTrue(  filter instanceof LOFilter );
         filter = newLogicalPlan.getSuccessors( cogroupB ).get( 0 );
         Assert.assertTrue(  filter instanceof LOFilter );
         Assert.assertTrue(  cogroupB == cogroupA );
-        
+
         Operator store = newLogicalPlan.getSuccessors(filter).get( 0 );
         Assert.assertTrue(  store instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterGroupBy() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
             "B = group A by $0;" +
             "C = filter B by $0 < 'name';" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOCogroup );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterConstantConditionGroupBy() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
             "B = group A by $0;" +
             "C = filter B by 1 == 1;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOCogroup );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterUDFGroupBy() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
             "B = group A by $0;" +
             "C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue( op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOCogroup );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
@@ -882,50 +963,56 @@ public class TestNewPlanPushUpFilter {
             "B = group A by $0 outer;" +
             "C = filter B by $0 < 'name';" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOCogroup );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterConstantConditionGroupByOuter() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
             "B = group A by $0 outer;" +
             "C = filter B by 1 == 1;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOCogroup );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStore );
     }
-    
+
     @Test
     public void testFilterUDFGroupByOuter() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
             "B = group A by $0 outer;" +
             "C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" +
             "D = STORE C into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator op = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue(  op instanceof LOLoad );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOCogroup );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
@@ -941,7 +1028,7 @@ public class TestNewPlanPushUpFilter {
             "D = filter C by $0 < 'name';" +
             "E = limit D 10;" +
             "F = STORE E into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -949,19 +1036,21 @@ public class TestNewPlanPushUpFilter {
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOLimit );
     }
-    
+
     @Test
     public void testFilterFRJoin1() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -970,7 +1059,7 @@ public class TestNewPlanPushUpFilter {
             "D = filter C by $4 < 'name';" +
             "E = limit D 10;" +
             "F = STORE E into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -978,19 +1067,21 @@ public class TestNewPlanPushUpFilter {
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOLimit );
     }
-    
+
     @Test
     // Constant filter condition, the filter will be pushed up to the first 
branch of join.
     public void testFilterConstantConditionFRJoin() throws Exception {
@@ -999,7 +1090,7 @@ public class TestNewPlanPushUpFilter {
             "C = join A by $0, B by $0 using 'replicated';" +
             "D = filter C by 1 == 1;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -1007,19 +1098,21 @@ public class TestNewPlanPushUpFilter {
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStore );
     }
-    
+
     @Test
     // UDF takes on argument, so it's constant. As a result, filter will 
pushed up to the first branch of the join.
     public void testFilterUDFFRJoin() throws Exception {
@@ -1028,27 +1121,29 @@ public class TestNewPlanPushUpFilter {
             "C = join A by $0, B by $0 using 'replicated';" +
             "D = filter C by " + MyFilterFunc.class.getName() + "();" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStore );
     }
-    
+
     @Test
     // UDF takes all input, so filter connot be pushed up.
     public void testFilterUDFFRJoin1() throws Exception {
@@ -1057,17 +1152,21 @@ public class TestNewPlanPushUpFilter {
             "C = join A by $0, B by $0 using 'replicated';" +
             "D = filter C by TupleSize(*) > 5;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = newLogicalPlan.getSuccessors( loads.get( 0 ) ).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
-        
+
         op = newLogicalPlan.getSuccessors( loads.get( 1 ) ).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
@@ -1081,20 +1180,22 @@ public class TestNewPlanPushUpFilter {
             "D = filter C by $0 < 'name';" +
             "E = limit D 10;" +
             "F = STORE E into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
@@ -1110,20 +1211,22 @@ public class TestNewPlanPushUpFilter {
             "D = filter C by $4 < 'name';" +
             "E = limit D 10;" +
             "F = STORE E into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
@@ -1134,25 +1237,27 @@ public class TestNewPlanPushUpFilter {
     @Test
     public void testFilterInnerJoin2() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
-            "B = load 'anotherfile' as (name, age, preference);" +  
+            "B = load 'anotherfile' as (name, age, preference);" +
             "C = join A by $0, B by $0;" +
             "D = filter C by $0 < 'jonh' OR $1 > 50;" +
             "E = limit D 10;" +
             "F = STORE E into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
@@ -1168,17 +1273,21 @@ public class TestNewPlanPushUpFilter {
             "D = filter C by $4 < 'name' AND $0 == 'joe';" +
             "E = limit D 10;" +
             "F = STORE E into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
 
         Operator op = newLogicalPlan.getSuccessors( loads.get( 0 ) ).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
         op = newLogicalPlan.getSuccessors( loads.get( 1 ) ).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
@@ -1191,27 +1300,29 @@ public class TestNewPlanPushUpFilter {
             "C = join A by $0, B by $0;" +
             "D = filter C by " + MyFilterFunc.class.getName() + "() ;" +
             "E = STORE D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
 
         op = newLogicalPlan.getSuccessors(op).get( 0 );
+        Assert.assertTrue(  op instanceof LOForEach );
+        op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOJoin );
         op = newLogicalPlan.getSuccessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOStore );
     }
-    
+
     // See PIG-1289
     @Test
     public void testOutJoin() throws Exception {
@@ -1220,15 +1331,15 @@ public class TestNewPlanPushUpFilter {
             "C = join A by name LEFT OUTER, B by name;" +
             "D = filter C by B::name is null;" +
             "store D into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan(query);
-        
+
         Operator op = newLogicalPlan.getSinks().get(0);
         Assert.assertTrue( op instanceof LOStore );
         op = newLogicalPlan.getPredecessors(op).get( 0 );
         Assert.assertTrue(  op instanceof LOFilter );
     }
-    
+
     // See PIG-1507
     @Test
     public void testFullOutJoin() throws Exception {
@@ -1237,9 +1348,9 @@ public class TestNewPlanPushUpFilter {
             "c = join A by d1 full outer, B by d2;" +
             "d = filter c by d2 is null;" +
             "store d into 'dummy';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan(query);
-        
+
         Operator op = newLogicalPlan.getSinks().get(0);
         Assert.assertTrue( op instanceof LOStore );
         op = newLogicalPlan.getPredecessors(op).get( 0 );
@@ -1260,7 +1371,7 @@ public class TestNewPlanPushUpFilter {
             "F = filter E by d1 > 5;" +
             "G = store F into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan(query);
-        
+
         List<Operator> ops = newLogicalPlan.getSinks();
         Assert.assertTrue( ops.size() == 1 );
         Operator op = ops.get( 0 );
@@ -1279,32 +1390,34 @@ public class TestNewPlanPushUpFilter {
         optimizer.optimize();
         return newLogicalPlan;
     }
-    
+
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
             super(p, iterations, new HashSet<String>());
         }
-        
+
+        @Override
         public void addPlanTransformListener(PlanTransformListener listener) {
             super.addPlanTransformListener(listener);
         }
-        
-       protected List<Set<Rule>> buildRuleSets() {            
+
+       @Override
+    protected List<Set<Rule>> buildRuleSets() {
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
-            
+
             Set<Rule> s = new HashSet<Rule>();
             // add split filter rule
             Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
             s.add(r);
             ls.add(s);
-             
+
             s = new HashSet<Rule>();
             r = new PushUpFilter( "PushUpFilter" );
-            s.add(r);    
+            s.add(r);
             ls.add(s);
-            
+
             return ls;
         }
-    }    
+    }
 }
 

Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Mon Mar 28 16:58:33 
2016
@@ -18,11 +18,11 @@
 
 package org.apache.pig.test;
 
+import static org.apache.pig.builtin.mock.Storage.resetData;
 import static org.apache.pig.builtin.mock.Storage.tuple;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
@@ -34,7 +34,6 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Properties;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
@@ -45,6 +44,8 @@ import org.apache.pig.backend.hadoop.dat
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -68,7 +69,7 @@ public class TestPigStorage  {
     private static final String datadir = "build/test/tmpdata/";
 
     @Before
-    public void setup() throws IOException {
+    public void setup() throws Exception {
         // some tests are in map-reduce mode and some in local - so before
         // each test, we will re-initialize FileLocalizer so that temp files
         // are created correctly depending on the ExecType in the test.
@@ -656,36 +657,95 @@ public class TestPigStorage  {
 
     @Test
     public void testIncompleteDataWithPigSchema() throws Exception {
-        File parent = new File(datadir, "incomplete_data_with_pig_schema_1");
-        parent.deleteOnExit();
-        parent.mkdirs();
-        File tmpInput = File.createTempFile("tmp", "tmp");
-        tmpInput.deleteOnExit();
-        File outFile = new File(parent, "out");
-        pig.registerQuery("a = load 
'"+Util.encodeEscape(tmpInput.getAbsolutePath())+"' as (x:int, y:chararray, 
z:chararray);");
-        pig.store("a", outFile.getAbsolutePath(), "PigStorage('\\t', 
'-schema')");
-        File schemaFile = new File(outFile, ".pig_schema");
+        Data data = resetData(pig);
+        String schema = 
"{\"fields\":[{\"name\":\"x\",\"type\":10,\"schema\":null},"
+                + "{\"name\":\"y\",\"type\":55,\"schema\":null},"
+                + "{\"name\":\"z\",\"type\":55,\"schema\":null}],"
+                + "\"version\":0,\"sortKeys\":[],\"sortKeyOrders\":[]}";
 
-        parent = new File(datadir, "incomplete_data_with_pig_schema_2");
+        File parent = new File(datadir, "incomplete_data_with_pig_schema_2");
         parent.deleteOnExit();
         File inputDir = new File(parent, "input");
         inputDir.mkdirs();
         File inputSchemaFile = new File(inputDir, ".pig_schema");
-        FileUtils.moveFile(schemaFile, inputSchemaFile);
+        Util.writeToFile(inputSchemaFile, new String[] {schema});
         File inputFile = new File(inputDir, "data");
         Util.writeToFile(inputFile, new String[]{"1"});
         pig.registerQuery("a = load 
'"+Util.encodeEscape(inputDir.getAbsolutePath())+"';");
-        Iterator<Tuple> it = pig.openIterator("a");
-        assertTrue(it.hasNext());
-        assertEquals(tuple(1,null,null), it.next());
-        assertFalse(it.hasNext());
+        pig.registerQuery("store a into 'actual' using mock.Storage();");
+        data.set("expected", tuple(1, null, null));
+        Assert.assertEquals(data.get("expected"), data.get("actual"));
 
         // Now, test with prune
-        pig.registerQuery("a = load 
'"+Util.encodeEscape(inputDir.getAbsolutePath())+"'; b = foreach a generate y, 
z;");
-        it = pig.openIterator("b");
-        assertTrue(it.hasNext());
-        assertEquals(tuple(null,null), it.next());
-        assertFalse(it.hasNext());
+        data = resetData(pig);
+        data.set("expected", tuple(1, null));
+        pig.registerQuery("a = load 
'"+Util.encodeEscape(inputDir.getAbsolutePath())+"'; b = foreach a generate x, 
z;");
+        pig.registerQuery("store b into 'actual' using mock.Storage();");
+        Assert.assertEquals(data.get("expected"), data.get("actual"));
+
+//        TODO: TypeCaster should be adding a cast for this case but it always 
uses the file schema
+//        data = resetData(pig);
+//        data.set("expected", tuple(new DataByteArray("1"), null));
+//        pig.registerQuery("a = load 
'"+Util.encodeEscape(inputDir.getAbsolutePath())+"' as (x: bytearray, 
y:bytearray, z:bytearray);");
+//        pig.registerQuery("b = foreach a generate x, z;");
+//        pig.registerQuery("store b into 'actual' using mock.Storage();");
+//        Assert.assertEquals(data.get("expected"), data.get("actual"));
+
+        schema = "{\"fields\":[{\"name\":\"x\",\"type\":50,\"schema\":null},"
+                + "{\"name\":\"y\",\"type\":50,\"schema\":null},"
+                + "{\"name\":\"z\",\"type\":50,\"schema\":null}],"
+                + "\"version\":0,\"sortKeys\":[],\"sortKeyOrders\":[]}";
+        Util.writeToFile(inputSchemaFile, new String[] {schema});
+        data = resetData(pig);
+        data.set("expected", tuple(new DataByteArray("1"), null));
+        pig.registerQuery("a = load 
'"+Util.encodeEscape(inputDir.getAbsolutePath())+"' as (x: bytearray, 
y:bytearray, z:bytearray);");
+        pig.registerQuery("b = foreach a generate x, z;");
+        pig.registerQuery("store b into 'actual' using mock.Storage();");
+        Assert.assertEquals(data.get("expected"), data.get("actual"));
+    }
+
+    @Test
+    public void testIncompleteDataNoPigSchema() throws Exception {
+
+        File inputFile = new File(datadir, "incomplete_data_no_pigschema");
+        inputFile.deleteOnExit();
+        Util.writeToFile(inputFile, new String[]{"1\t2", "2\t3"});
+        Data data = resetData(pig);
+
+        String query = "A = LOAD '"+ 
Util.encodeEscape(inputFile.getAbsolutePath()) + "' as (x, y, z);"
+                + "store A into 'actual' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pig, query);
+        data.set("expected",
+                tuple(new DataByteArray("1"), new DataByteArray("2"), null),
+                tuple(new DataByteArray("2"), new DataByteArray("3"), null));
+
+        Assert.assertEquals(data.get("expected"), data.get("actual"));
+
+        data = resetData(pig);
+        query = "A = LOAD '"+ Util.encodeEscape(inputFile.getAbsolutePath())
+                + "' using " +  PigExtendedStorage.class.getName() + " as (x, 
y, z);"
+                + "store A into 'actual' using mock.Storage();";
+
+        pig.registerQuery(query);
+        data.set("expected",
+                tuple(new DataByteArray("1"), new DataByteArray("2"), new 
DataByteArray("extracolumn")),
+                tuple(new DataByteArray("2"), new DataByteArray("3"), new 
DataByteArray("extracolumn")));
+
+        Assert.assertEquals(data.get("expected"), data.get("actual"));
+
+
+    }
+
+    public static class PigExtendedStorage extends PigStorage {
+
+        @Override
+        public Tuple getNext() throws IOException {
+            Tuple tuple = super.getNext();
+            tuple.append(new DataByteArray("extracolumn"));
+            return tuple;
+        }
+
     }
 
 

Modified: pig/trunk/test/org/apache/pig/test/data/DotFiles/explain1.dot
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/DotFiles/explain1.dot?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/DotFiles/explain1.dot (original)
+++ pig/trunk/test/org/apache/pig/test/data/DotFiles/explain1.dot Mon Mar 28 
16:58:33 2016
@@ -69,10 +69,46 @@ s5875509_in -> 253899 [style=invis];
 21192393 -> s5875509_out [style=invis];
 32545329 [label="LOLoad", style="filled", fillcolor="gray"];
 17330894 [label="LOStore", style="filled", fillcolor="gray"];
+s26867942_in [label="", style=invis, height=0, width=0];
+s26867942_out [label="", style=invis, height=0, width=0];
+subgraph cluster_26867942 {
+label="LOForEach"labelloc=b;
+s27525999_in [label="", style=invis, height=0, width=0];
+s27525999_out [label="", style=invis, height=0, width=0];
+subgraph cluster_27525999 {
+label="LOGenerate"labelloc=b;
+11705501 [label="Project0:(*)"];
+s27525999_in -> 11705501 [style=invis];
+4729773 [label="Project1:(*)"];
+s27525999_in -> 4729773 [style=invis];
+2861196 [label="Project2:(*)"];
+s27525999_in -> 2861196 [style=invis];
+4629854 [label="Project3:(*)"];
+s27525999_in -> 4629854 [style=invis];
+};
+11705501 -> s27525999_out [style=invis];
+4729773 -> s27525999_out [style=invis];
+2861196 -> s27525999_out [style=invis];
+4629854 -> s27525999_out [style=invis];
+14518777 [label="LOInnerLoad"];
+9263789 [label="LOInnerLoad"];
+3945981 [label="LOInnerLoad"];
+16555307 [label="LOInnerLoad"];
+14518777 -> s27525999_in [lhead=cluster_27525999]
+9263789 -> s27525999_in [lhead=cluster_27525999]
+3945981 -> s27525999_in [lhead=cluster_27525999]
+16555307 -> s27525999_in [lhead=cluster_27525999]
+s26867942_in -> 14518777 [style=invis];
+s26867942_in -> 9263789 [style=invis];
+s26867942_in -> 3945981 [style=invis];
+s26867942_in -> 16555307 [style=invis];
+};
+s27525999_out -> s26867942_out [style=invis];
 s26567569_out -> 17330894
+s7897563_out -> s26567569_in [lhead=cluster_26567569]
 s18554240_out -> s7897563_in [lhead=cluster_7897563]
 s5875509_out -> s26867942_in [lhead=cluster_26867942]
-s26867942_out -> s18554240_26867942_in [lhead=cluster_18554240_26867942]
 32545329 -> s5875509_in [lhead=cluster_5875509]
+s26867942_out -> s18554240_26867942_in [lhead=cluster_18554240_26867942]
 }
 


Reply via email to