Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Fri Apr  
1 20:19:53 2016
@@ -22,6 +22,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1097,6 +1098,10 @@ public class TestEvalPipeline {
         }
 
         Util.createInputFile(cluster, "table", inpString);
+        StringWriter writer = new StringWriter();
+        if (cluster.getExecType().name().equals("TEZ")) {
+            Util.createLogAppender("testNoCombinerInReducer", writer, 
Class.forName("org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder"));
+        }
 
         pigServer.registerQuery("a = LOAD 'table' AS (i:int);");
         pigServer.registerQuery("b = group a ALL;");
@@ -1116,6 +1121,10 @@ public class TestEvalPipeline {
             Assert.assertTrue(DataType.compare(expectedBag.size(), 
resultBagSize) == 0);
         }
 
+        if (cluster.getExecType().name().equals("TEZ")) {
+            Assert.assertTrue(writer.toString().contains("Turning off combiner 
in reducer"));
+            Util.removeLogAppender("testNoCombinerInReducer", 
Class.forName("org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder"));
+        }
         Util.deleteFile(cluster, "table");
     }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri Apr  1 
20:19:53 2016
@@ -1370,6 +1370,39 @@ public class TestGrunt {
         validate(query, false, msgs.toArray(new String[0]));
     }
 
+    @Test
+    public void testWithInlineOp() throws Throwable {
+        // specifying schema inside inline-op makes PigScriptParser.jj to read
+        // to the end of file
+        String query = "a = load 'i1' as (f1:chararray);" +
+               "b = foreach (foreach a generate f1 as b1) generate b1; " +
+               "dump b; ";
+
+        ArrayList<String> msgs = new ArrayList<String>();                //
+        validate(query, true, msgs.toArray(new String[0]));
+    }
+
+    /*
+     * Following test currently fails.  Insead of making further changes to
+     * PigScriptParser.jj, leaving it till we move out of javacc in PIG-2597
+
+    @Test
+    public void testWithInlineOpWithNestedForeach() throws Throwable {
+        // This one currently fails because "{}" is treated as
+        // IN_BLOCK in PigScriptParser.jj which jumps to PIG_END and ignore
+        // ") generate *; " part of the code.
+        // In order to support this test, we need to add parenthesis matching
+        // everywhere in PigScriptParser.jj (or stop using it)
+        //
+        String query = "a = load 'i1' as (f1:chararray);" +
+               "b = group a ALL; " +
+               "c = foreach ( foreach b {b1 = limit a 3; generate 1, b1;} ) 
generate *; " +
+               "dump c;";
+        ArrayList<String> msgs = new ArrayList<String>();                //
+        validate(query, true, msgs.toArray(new String[0]));
+    }
+    */
+
 
 
     private void validate(String query, boolean syntaxOk,

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestMergeForEachOptimization.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMergeForEachOptimization.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/test/org/apache/pig/test/TestMergeForEachOptimization.java 
(original)
+++ 
pig/branches/spark/test/org/apache/pig/test/TestMergeForEachOptimization.java 
Fri Apr  1 20:19:53 2016
@@ -55,37 +55,37 @@ public class TestMergeForEachOptimizatio
     LogicalPlan plan = null;
     PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
     PigServer pigServer = null;
-  
+
     @Before
     public void setup() throws ExecException {
         pigServer = new PigServer( pc );
     }
-    
+
     @After
     public void tearDown() {
-        
+
     }
-    
+
     /**
      * Basic test case. Two simple FOREACH statements can be merged to one.
-     * @throws Exception 
+     * @throws Exception
      */
-    @Test   
+    @Test
     public void testSimple() throws Exception  {
         String query = "A = load 'file.txt' as (a, b, c);" +
          "B = foreach A generate a+b, c-b;" +
          "C = foreach B generate $0+5, $1;" +
-         "store C into 'empty';";  
+         "store C into 'empty';";
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        
+
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
         int outputExprCount1 = getOutputExprCount( newLogicalPlan );
         LOForEach foreach1 = getForEachOperator( newLogicalPlan );
         Assert.assertTrue( foreach1.getAlias().equals( "C" ) );
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
-        
+
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
         Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
         int outputExprCount2 = getOutputExprCount( newLogicalPlan );
@@ -93,26 +93,26 @@ public class TestMergeForEachOptimizatio
         LOForEach foreach2 = getForEachOperator( newLogicalPlan );
         Assert.assertTrue( foreach2.getAlias().equals( "C" ) );
     }
-    
+
     /**
      * Test more complex case where the first for each in the script has inner 
plan.
-     * @throws Exception 
+     * @throws Exception
      */
     @Test
     public void testComplex() throws Exception {
         String query = "A = load 'file.txt' as (a:int, b, 
c:bag{t:tuple(c0:int,c1:int)});" +
          "B = foreach A { S = ORDER c BY $0; generate $0, COUNT(S), SIZE(S); 
};" +
-         "C = foreach B generate $2+5 as x, $0-$1/2 as y;" + "store C into 
'empty';" ;  
+         "C = foreach B generate $2+5 as x, $0-$1/2 as y;" + "store C into 
'empty';" ;
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        
+
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
         int outputExprCount1 = getOutputExprCount( newLogicalPlan );
         LOForEach foreach1 = getForEachOperator( newLogicalPlan );
         Assert.assertTrue( foreach1.getAlias().equals( "C" ) );
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
-        
+
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
         // The number of FOREACHes didn't change because one is genereated 
because of type cast and
         // one is reduced because of the merge.
@@ -125,10 +125,10 @@ public class TestMergeForEachOptimizatio
         Assert.assertTrue(newSchema.getField(0).alias.equals("x"));
         Assert.assertTrue(newSchema.getField(1).alias.equals("y"));
     }
-    
+
     /**
      * One output of first foreach was referred more than once in the second 
foreach
-     * @throws Exception 
+     * @throws Exception
      */
     @Test
     public void testDuplicateInputs() throws Exception {
@@ -136,84 +136,84 @@ public class TestMergeForEachOptimizatio
          "A1 = foreach A generate (int)a0 as a0, (double)a1 as a1;" +
          "B = group A1 all;" +
          "C = foreach B generate A1;" +
-         "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" + "store D into 
'empty';" ;  
+         "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" + "store D into 
'empty';" ;
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        
+
         Operator store = newLogicalPlan.getSinks().get(0);
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
         LOForEach foreach1 = 
(LOForEach)newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue( foreach1.getAlias().equals( "D" ) );
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
-        
+
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
         // The number of FOREACHes didn't change because one is genereated 
because of type cast and
         // one is reduced because of the merge.
         Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
-        
+
         LOForEach foreach2 = 
(LOForEach)newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue( foreach2.getAlias().equals( "D" ) );
     }
-    
+
     /**
      * Not all consecutive FOREACHes can be merged. In this case, the second 
FOREACH statment
      * has inner plan, which cannot be merged with one before it.
-     * @throws Exception 
+     * @throws Exception
      */
     @Test
     public void testNegative1() throws Exception {
         String query = "A = LOAD 'file.txt' as (a, b, c, 
d:bag{t:tuple(c0:int,c1:int)});" +
          "B = FOREACH A GENERATE a+5 AS u, b-c/2 AS v, d AS w;" +
          "C = FOREACH B { S = ORDER w BY $0; GENERATE $0 as x, COUNT(S) as y; 
};" +
-         "store C into 'empty';";  
+         "store C into 'empty';";
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        
+
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
-        
+
         // Actually MergeForEach optimization is happening here. A new foreach 
will be inserted after A because
-        // of typ casting. The inserted one and the one in B can be merged due 
to this optimization. However, 
+        // of typ casting. The inserted one and the one in B can be merged due 
to this optimization. However,
         // the plan cannot be further optimized because C has inner plan.
         Assert.assertEquals( forEachCount1, forEachCount2 );
     }
-    
+
     /**
      * MergeForEach Optimization is off if the first statement has a FLATTEN 
operator.
-     * @throws Exception 
+     * @throws Exception
      */
     @Test
     public void testNegative2() throws Exception {
         String query = "A = LOAD 'file.txt' as (a, b, c);" +
          "B = FOREACH A GENERATE FLATTEN(a), b, c;" +
-         "C = FOREACH B GENERATE $0, $1+$2;" + "store C into 'empty';" ;  
+         "C = FOREACH B GENERATE $0, $1+$2;" + "store C into 'empty';" ;
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-        
+
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
-        
+
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
         Assert.assertEquals( 2, forEachCount1 );
         Assert.assertEquals( 2, forEachCount2 );
     }
-    
-    
+
+
     /**
      * Ensure that join input order does not get reversed (PIG-1672)
-     * @throws Exception 
+     * @throws Exception
      */
-    @Test   
+    @Test
     public void testJoinInputOrder() throws Exception  {
         String query = "l1 = load 'y' as (a);" +
          "l2 = load 'z' as (a1,b1,c1,d1);" +
          "f1 = foreach l2 generate a1, b1, c1, d1;" +
          "f2 = foreach f1 generate a1, b1, c1;" +
-         "j1 = join f2 by a1, l1 by a using 'replicated';" + "store j1 into 
'empty';" ;  
+         "j1 = join f2 by a1, l1 by a using 'replicated';" + "store j1 into 
'empty';" ;
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
 
         int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
@@ -225,15 +225,15 @@ public class TestMergeForEachOptimizatio
         }
         LOForEach foreachL2 = 
(LOForEach)newLogicalPlan.getSuccessors(l2).get(0);
         foreachL2 = (LOForEach)newLogicalPlan.getSuccessors(foreachL2).get(0);
-        
+
         int outputExprCount1 = 
((LOGenerate)foreachL2.getInnerPlan().getSinks().get(0)).getOutputPlans().size();
-               
+
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
-        
+
         int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
-        Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
-        
+        Assert.assertEquals( 0, forEachCount1 - forEachCount2 );
+
         loads = newLogicalPlan.getSources();
         l2 = null;
         for (Operator load : loads) {
@@ -241,21 +241,21 @@ public class TestMergeForEachOptimizatio
                 l2 = load;
         }
         foreachL2 = (LOForEach)newLogicalPlan.getSuccessors(l2).get(0);
-        
+
         int outputExprCount2 = 
((LOGenerate)foreachL2.getInnerPlan().getSinks().get(0)).getOutputPlans().size();
-        
+
         Assert.assertTrue( outputExprCount1 == outputExprCount2 );
         Assert.assertTrue( foreachL2.getAlias().equals( "f2" ) );
-        
+
         LOJoin join = (LOJoin)getOperator(newLogicalPlan, LOJoin.class);
         LogicalRelationalOperator leftInp =
             
(LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(0);
-        assertEquals("join child left", leftInp.getAlias(), "f2"); 
-        
+        assertEquals("join child left", leftInp.getAlias(), "f2");
+
         LogicalRelationalOperator rightInp =
             
(LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(1);
-        assertEquals("join child right", rightInp.getAlias(), "l1"); 
-        
+        assertEquals("join child right", rightInp.getAlias(), "l1");
+
     }
 
     private int getForEachOperatorCount(LogicalPlan plan) {
@@ -268,7 +268,7 @@ public class TestMergeForEachOptimizatio
         }
         return count;
     }
-       
+
     private int getOutputExprCount(LogicalPlan plan) throws IOException {
         LOForEach foreach = getForEachOperator( plan );
         LogicalPlan inner = foreach.getInnerPlan();
@@ -276,7 +276,7 @@ public class TestMergeForEachOptimizatio
         LOGenerate gen = (LOGenerate)ops.get( 0 );
         return gen.getOutputPlans().size();
     }
-    
+
     private LOForEach getForEachOperator(LogicalPlan plan) throws IOException {
         Iterator<Operator> ops = plan.getOperators();
         while( ops.hasNext() ) {
@@ -290,7 +290,7 @@ public class TestMergeForEachOptimizatio
         }
         return null;
     }
-    
+
     /**
      * returns first operator that is an instance of given class c
      * @param plan
@@ -303,41 +303,42 @@ public class TestMergeForEachOptimizatio
         while( ops.hasNext() ) {
             Operator op = ops.next();
             if( op.getClass().equals(c)) {
-                return op;          
+                return op;
             }
         }
         return null;
     }
-    
+
 
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
             super(p, iterations, new HashSet<String>());
         }
-        
-        protected List<Set<Rule>> buildRuleSets() {            
+
+        @Override
+        protected List<Set<Rule>> buildRuleSets() {
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
-            
+
             Set<Rule> s = new HashSet<Rule>();
             // add split filter rule
             Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
             s.add(r);
             ls.add(s);
-             
+
             // Split Set
             // This set of rules does splitting of operators only.
             // It does not move operators
             s = new HashSet<Rule>();
             r = new AddForEach( "AddForEach" );
-            s.add(r);            
+            s.add(r);
             ls.add(s);
-            
+
             s = new HashSet<Rule>();
             r = new MergeForEach("MergeForEach");
-            s.add(r);            
+            s.add(r);
             ls.add(s);
 
             return ls;
         }
-    }    
+    }
 }

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java Fri 
Apr  1 20:19:53 2016
@@ -575,7 +575,7 @@ public class TestMultiQueryCompiler {
 
             LogicalPlan lp = checkLogicalPlan(2, 1, 7);
 
-            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 1, 11);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 1, 13);
 
             checkMRPlan(pp, 1, 1, 2);
 

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java 
(original)
+++ 
pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java 
Fri Apr  1 20:19:53 2016
@@ -318,6 +318,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( filter instanceof LOFilter );
         Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }
@@ -335,6 +337,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( load instanceof LOLoad );
         Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( filter instanceof LOFilter );
         Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 );
@@ -354,6 +358,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( load instanceof LOLoad );
         Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( filter instanceof LOFilter );
         Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 );
@@ -375,6 +381,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( filter instanceof LOFilter );
         Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }
@@ -395,6 +403,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( filter instanceof LOFilter );
         Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }
@@ -414,6 +424,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( filter instanceof LOFilter );
         Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }
@@ -433,6 +445,8 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( filter instanceof LOFilter );
         Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
         Assert.assertTrue( fe instanceof LOForEach );
+        fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }

Modified: pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterRule.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterRule.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterRule.java Fri 
Apr  1 20:19:53 2016
@@ -488,7 +488,9 @@ public class TestNewPlanFilterRule {
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
-        Operator group = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
+        Operator group = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( group instanceof LOCogroup );
         Operator filter = newLogicalPlan.getSuccessors( group ).get( 0 );
         Assert.assertTrue( filter instanceof LOFilter );

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java 
(original)
+++ 
pig/branches/spark/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java 
Fri Apr  1 20:19:53 2016
@@ -21,6 +21,7 @@ import static org.apache.pig.newplan.log
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
 import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
 import org.apache.pig.newplan.logical.relational.LOLoad;
@@ -172,6 +174,11 @@ public class TestNewPlanLogicalOptimizer
             expected.add(DA);
             expected.connect(A, DA);
 
+            // A = foreach
+            LOForEach foreachA = 
org.apache.pig.newplan.logical.Util.addForEachAfter(expected, DA, 0, new 
HashSet<Integer>());
+            foreachA.setAlias("A");
+            foreachA.neverUseForRealSetSchema(aschema);
+
             // B = load
             LogicalSchema bschema = new LogicalSchema();
             bschema.addField(new LogicalSchema.LogicalFieldSchema(
@@ -193,6 +200,11 @@ public class TestNewPlanLogicalOptimizer
             expected.add(DB);
             expected.connect(B, DB);
 
+            // B = foreach
+            LOForEach foreachB = 
org.apache.pig.newplan.logical.Util.addForEachAfter(expected, DB, 0, new 
HashSet<Integer>());
+            foreachB.setAlias("B");
+            foreachB.neverUseForRealSetSchema(bschema);
+
             // C = join
             LogicalSchema cschema = new LogicalSchema();
             cschema.addField(new LogicalSchema.LogicalFieldSchema(
@@ -221,8 +233,8 @@ public class TestNewPlanLogicalOptimizer
             mm.put(1, bprojplan);
             C.neverUseForRealSetSchema(cschema);
             expected.add(C);
-            expected.connect(DA, C);
-            expected.connect(DB, C);
+            expected.connect(foreachA, C);
+            expected.connect(foreachB, C);
 
             // D = filter
             LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1737436&r1=1737435&r2=1737436&view=diff
==============================================================================
--- 
pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
 (original)
+++ 
pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
 Fri Apr  1 20:19:53 2016
@@ -64,25 +64,25 @@ public class TestNewPlanPushDownForeachF
     }
 
     /**
-     * 
+     *
      * A simple filter UDF for testing
      *
      */
     static public class MyFilterFunc extends FilterFunc {
-        
+
         @Override
         public Boolean exec(Tuple input) {
             return false;
         }
     }
-    
+
     /**
      * Old plan is empty, so is the optimized new plan.
      */
     @Test
     public void testErrorEmptyInput() throws Exception {
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( "" );
-        
+
         Assert.assertTrue( newLogicalPlan.getOperators().hasNext() ==  false );
     }
 
@@ -100,7 +100,7 @@ public class TestNewPlanPushDownForeachF
         List<Operator> nexts = newLogicalPlan.getSuccessors( load );
         Assert.assertTrue( nexts != null && nexts.size() == 1 );
 }
-    
+
     @Test
     public void testForeachNoFlatten() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -108,28 +108,30 @@ public class TestNewPlanPushDownForeachF
         "C = order B by $0, $1;" +
          "D = store C into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
+
     @Test
     public void testForeachNoSuccessors() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
                        "B = foreach A generate flatten($1);" +
                        "Store B into 'output';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
     }
-    
+
     @Test
     public void testForeachStreaming() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -137,61 +139,65 @@ public class TestNewPlanPushDownForeachF
         "C = stream B through `" + "pc -l" + "`;" +
         "Store C into 'output';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
     }
-    
+
     @Test
     public void testForeachDistinct() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
         "B = foreach A generate flatten($1);" +
         "C = distinct B;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
     }
-    
+
     @Test
     public void testForeachForeach() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
-        "B = foreach A generate $0, $1, flatten(1);" +        
+        "B = foreach A generate $0, $1, flatten(1);" +
         "C = foreach B generate $0;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
         foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( !OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
-    
+
 
     @Test
     public void testForeachFilter() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
-        "B = foreach A generate $0, $1, flatten($2);" +        
+        "B = foreach A generate $0, $1, flatten($2);" +
         "C = filter B by $1 < 18;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
 
@@ -200,15 +206,17 @@ public class TestNewPlanPushDownForeachF
         String query = "A = load 'myfile' as (name, age, gpa);" +
         "B = foreach A generate $0, $1, flatten($2);" +
         "split B into C if $1 < 18, D if $1 >= 18;" +
-        "store C into 'output1';" + 
+        "store C into 'output1';" +
         "store D into 'output2';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
 
@@ -218,13 +226,15 @@ public class TestNewPlanPushDownForeachF
         "B = foreach A generate $0, $1, flatten($2);" +
         "C = limit B 10;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
 
@@ -235,24 +245,26 @@ public class TestNewPlanPushDownForeachF
         "C = load 'anotherfile' as (name, age, preference);" +
         "D = union B, C;" +
         "store D into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator load = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             load = loads.get( 0 );
         else
             load = loads.get( 1 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
-    
+
     @Test
     public void testForeachCogroup() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -260,7 +272,7 @@ public class TestNewPlanPushDownForeachF
         "C = load 'anotherfile' as (name, age, preference);" +
         "D = cogroup B by $0, C by $0;" +
         "store D into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -268,32 +280,36 @@ public class TestNewPlanPushDownForeachF
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator load = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             load = loads.get( 0 );
         else
             load = loads.get( 1 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
-    
+
     @Test
     public void testForeachGroupBy() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
         "B = foreach A generate $0, $1, flatten($2);" +
         "C = group B by $0;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
-    
+
     @Test
     public void testForeachSort() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -301,16 +317,18 @@ public class TestNewPlanPushDownForeachF
         "C = order B by $0, $1;" +
         "D = store C into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
-        Operator sort = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
+        Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
-        Operator foreach = newLogicalPlan.getSuccessors( sort ).get( 0 );
+        foreach = newLogicalPlan.getSuccessors( sort ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
         Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
     }
-    
+
     /**
      * Non-pure-projection, not optimizable.
      */
@@ -321,16 +339,18 @@ public class TestNewPlanPushDownForeachF
         "C = order B by $0, $1;" +
          "D = store C into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
-    
+
+
     /**
      * If the flattened field is referenced in the sort condition, then no 
optimization can be done.
      */
@@ -341,7 +361,7 @@ public class TestNewPlanPushDownForeachF
         "C = order B by $0, $3;" +
         "D = store C into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
@@ -360,49 +380,55 @@ public class TestNewPlanPushDownForeachF
         "store C into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
+
     @Test
     public void testForeachUDFSort() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
         "B = foreach A generate $0, $1, " + Identity.class.getName() + "($2) 
;" +
         "C = order B by $0, $1;" +
         "store C into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
+
     @Test
     public void testForeachCastSort() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa);" +
-        "B = foreach A generate (chararray)$0, $1, flatten($2);" +        
+        "B = foreach A generate (chararray)$0, $1, flatten($2);" +
         "C = order B by $0, $1;" +
         "store C into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
+
     @Test
     public void testForeachCross() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, 
point_score));" +
@@ -413,13 +439,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -454,7 +480,7 @@ public class TestNewPlanPushDownForeachF
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -487,16 +513,16 @@ public class TestNewPlanPushDownForeachF
         "store F into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         // No optimization about foreach flatten.
         Operator store = newLogicalPlan.getSinks().get( 0 );
         Operator limit = newLogicalPlan.getPredecessors(store).get(0);
         Operator cross = newLogicalPlan.getPredecessors(limit).get(0);
         Assert.assertTrue( cross instanceof LOCross );
     }
-    
+
     /**
-     * This actually is a valid case, even though the optimization may not 
provide any performance benefit. However, detecting 
+     * This actually is a valid case, even though the optimization may not 
provide any performance benefit. However, detecting
      * such a case requires more coding. Thus, we allow optimization to go 
thru in this case.
      */
     @Test
@@ -509,13 +535,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -547,13 +573,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -571,7 +597,7 @@ public class TestNewPlanPushDownForeachF
         op = newLogicalPlan.getSuccessors( op ).get( 0 );
         Assert.assertTrue( op instanceof LOLimit );
     }
-    
+
     /**
      * Cast should NOT matter to cross. This is a valid positive test case.
      */
@@ -585,13 +611,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -609,7 +635,7 @@ public class TestNewPlanPushDownForeachF
         op = newLogicalPlan.getSuccessors( op ).get( 0 );
         Assert.assertTrue( op instanceof LOLimit );
     }
-    
+
     @Test
     public void testForeachFRJoin() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, 
point_score));" +
@@ -619,7 +645,7 @@ public class TestNewPlanPushDownForeachF
         "E = limit D 10;" +
         "store E into 'output';";
 
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
@@ -627,7 +653,7 @@ public class TestNewPlanPushDownForeachF
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -662,7 +688,7 @@ public class TestNewPlanPushDownForeachF
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -693,7 +719,7 @@ public class TestNewPlanPushDownForeachF
         "E = join B by $0, D by $0 using 'replicated';" +
         "F = limit E 10;" +
         "store F into 'output';";
-        
+
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         // No optimization about foreach flatten.
@@ -702,7 +728,7 @@ public class TestNewPlanPushDownForeachF
         Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
         Assert.assertTrue( join instanceof LOJoin );
     }
-    
+
     /**
      * Valid positive test case, even though the benefit from the optimization 
is questionable. However, putting in additinal check for
      * this condition requires extra coding.
@@ -717,13 +743,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -756,13 +782,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -795,13 +821,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -830,13 +856,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -854,7 +880,7 @@ public class TestNewPlanPushDownForeachF
         op = newLogicalPlan.getSuccessors( op ).get( 0 );
         Assert.assertTrue( op instanceof LOLimit );
     }
-    
+
     @Test
     public void testForeachInnerJoin1() throws Exception {
         String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, 
point_score));" +
@@ -865,13 +891,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -911,9 +937,9 @@ public class TestNewPlanPushDownForeachF
         Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
         Assert.assertTrue( join instanceof LOJoin );
     }
-    
+
     /**
-     * This is actually a valid positive test case, even though the benefit of 
such optimization is questionable. However, 
+     * This is actually a valid positive test case, even though the benefit of 
such optimization is questionable. However,
      * checking for such condition requires additional coding effort.
      */
     @Test
@@ -926,13 +952,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -964,13 +990,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -1002,13 +1028,13 @@ public class TestNewPlanPushDownForeachF
         "store E into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
         Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
         Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
         Operator op = null;
-        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) 
+        if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
             op = loads.get( 0 );
         else
             op = loads.get( 1 );
@@ -1045,7 +1071,7 @@ public class TestNewPlanPushDownForeachF
         Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
         Assert.assertTrue( join instanceof LOJoin );
     }
-    
+
     // See PIG-1374
     @Test
     public void testForeachRequiredField() throws Exception {
@@ -1055,7 +1081,7 @@ public class TestNewPlanPushDownForeachF
         "store C into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
@@ -1065,7 +1091,7 @@ public class TestNewPlanPushDownForeachF
         Operator sort = newLogicalPlan.getSuccessors( foreach1 ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
-    
+
     // See PIG-1706
     @Test
     public void testForeachWithUserDefinedSchema() throws Exception {
@@ -1076,13 +1102,13 @@ public class TestNewPlanPushDownForeachF
         "store d into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator store = newLogicalPlan.getSinks().get( 0 );
         LOForEach foreach = 
(LOForEach)newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue(foreach.getSchema().getField(1).alias.equals("q1"));
         Assert.assertTrue(foreach.getSchema().getField(2).alias.equals("q2"));
     }
-    
+
     // See PIG-1751
     @Test
     public void testForeachWithUserDefinedSchema2() throws Exception {
@@ -1093,7 +1119,7 @@ public class TestNewPlanPushDownForeachF
         "store d into 'output';";
 
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator store = newLogicalPlan.getSinks().get( 0 );
         Operator op = newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue(op instanceof LOJoin);
@@ -1112,7 +1138,7 @@ public class TestNewPlanPushDownForeachF
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
-        Assert.assertTrue( "Field \"a1\" is dropped by ColumnMapKeyPrune" + 
+        Assert.assertTrue( "Field \"a1\" is dropped by ColumnMapKeyPrune" +
                   "even though it should be stored",
                   ((LOLoad)load).getSchema().getField("a1") != null );
     }
@@ -1143,6 +1169,7 @@ public class TestNewPlanPushDownForeachF
             addPlanTransformListener(new ProjectionPatcher());
         }
 
+        @Override
         protected List<Set<Rule>> buildRuleSets() {
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
 
@@ -1178,24 +1205,25 @@ public class TestNewPlanPushDownForeachF
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
             super(p, iterations, new HashSet<String>());
         }
-        
-        protected List<Set<Rule>> buildRuleSets() {            
+
+        @Override
+        protected List<Set<Rule>> buildRuleSets() {
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
-            
+
             Set<Rule> s = new HashSet<Rule>();
             // add split filter rule
             Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
             s.add(r);
             ls.add(s);
-             
+
             s = new HashSet<Rule>();
             r = new PushDownForEachFlatten( "PushDownForEachFlatten" );
-            s.add(r);            
+            s.add(r);
             ls.add(s);
-            
+
             return ls;
         }
-    }    
+    }
 
     private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
        PigServer pigServer = new PigServer( pc );
@@ -1212,16 +1240,18 @@ public class TestNewPlanPushDownForeachF
         "C = order B by $0, $1;" +
         "D = store C into 'dummy';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
         Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
         Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
-        
+
     }
-    
+
     @Test
     // See PIG-3826
     public void testOuterJoin() throws Exception {
@@ -1232,7 +1262,7 @@ public class TestNewPlanPushDownForeachF
         "t3 = join B by id LEFT OUTER, t2 by id;" +
         "store t3 into 'output';";
         LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-        
+
         Operator store = newLogicalPlan.getSinks().get( 0 );
         Operator join = newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue( join instanceof LOJoin );


Reply via email to