Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanPushUpFilter.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanPushUpFilter.java?rev=1736904&r1=1736903&r2=1736904&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestNewPlanPushUpFilter.java (original) +++ pig/trunk/test/org/apache/pig/test/TestNewPlanPushUpFilter.java Mon Mar 28 16:58:33 2016 @@ -36,6 +36,7 @@ import org.apache.pig.newplan.logical.re import org.apache.pig.newplan.logical.relational.LOCross; import org.apache.pig.newplan.logical.relational.LODistinct; import org.apache.pig.newplan.logical.relational.LOFilter; +import org.apache.pig.newplan.logical.relational.LOForEach; import org.apache.pig.newplan.logical.relational.LOJoin; import org.apache.pig.newplan.logical.relational.LOLimit; import org.apache.pig.newplan.logical.relational.LOLoad; @@ -75,18 +76,18 @@ public class TestNewPlanPushUpFilter { * A simple filter UDF for testing */ static public class MyFilterFunc extends FilterFunc { - + @Override public Boolean exec(Tuple input) { return false; } } - + @Test // Empty plan, nothing to update public void testErrorEmptyInput() throws Exception { LogicalPlan newLogicalPlan = migrateAndOptimizePlan( "" ); - + Assert.assertTrue( !newLogicalPlan.getOperators().hasNext() ); } @@ -94,61 +95,69 @@ public class TestNewPlanPushUpFilter { //Test to ensure that the right exception is thrown when the input list is empty public void testErrorNonFilterInput() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);store A into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator op = newLogicalPlan.getSources().get(0); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); Assert.assertTrue( newLogicalPlan.getSuccessors(op) == null ); } - + @Test public void testFilterLoad() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = filter A by $1 < 18;" + "store B into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get(0); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + @Test public void testFilterStreaming() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = stream A through `" + "ps -u" + "`;" + "C = filter B by $1 < 18;" + - "D = STORE C into 'dummy';"; - + "D = STORE C into 'dummy';"; + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStream ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); } - + @Test public void testFilterSort() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = order A by $1, $2;" + "C = filter B by $1 < 18;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOSort ); @@ -162,12 +171,14 @@ public class TestNewPlanPushUpFilter { "B = order A by $1, $2;" + "C = filter B by 1 == 1;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOSort ); @@ -181,31 +192,35 @@ public class TestNewPlanPushUpFilter { "B = order A by $1, $2;" + "C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOSort ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + @Test public void testFilterDistinct() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = distinct A;" + "C = filter B by $1 < 18;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LODistinct ); @@ -219,12 +234,14 @@ public class TestNewPlanPushUpFilter { "B = distinct A;" + "C = filter B by 1 == 1;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LODistinct ); @@ -238,31 +255,35 @@ public class TestNewPlanPushUpFilter { "B = distinct A;" + "C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LODistinct ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + @Test public void testFilterFilter() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = filter A by $0 != 'name';" + "C = filter B by $1 < 18;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); Assert.assertTrue( ((LOFilter)op).getAlias().equals( "B" ) ); op = newLogicalPlan.getSuccessors(op).get( 0 ); @@ -278,12 +299,14 @@ public class TestNewPlanPushUpFilter { "split A into B if $1 < 18, C if $1 >= 18;" + "C = filter B by $1 < 10;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOSplit ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOSplitOutput ); @@ -299,12 +322,14 @@ public class TestNewPlanPushUpFilter { "B = limit A 10;" + "C = filter B by $1 < 18;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOLimit ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); @@ -319,7 +344,7 @@ public class TestNewPlanPushUpFilter { "C = union A, B;" + "D = filter C by $1 < 18;" + "E = STORE D into'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -336,21 +361,25 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } - Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( foreachA instanceof LOForEach ); + Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 ); Assert.assertTrue( filterA instanceof LOFilter ); - Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( foreachB instanceof LOForEach ); + Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 ); Assert.assertTrue( filterB instanceof LOFilter ); - + Operator unionA = newLogicalPlan.getSuccessors( filterA ).get( 0 ); Assert.assertTrue( unionA instanceof LOUnion ); Operator unionB = newLogicalPlan.getSuccessors( filterB ).get( 0 ); Assert.assertTrue( unionB instanceof LOUnion ); Assert.assertTrue( unionB == unionA ); - + Operator store = newLogicalPlan.getSuccessors(unionA).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } - + @Test public void testFilterConstantConditionUnion() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -358,7 +387,7 @@ public class TestNewPlanPushUpFilter { "C = union A, B;" + "D = filter C by 1 == 1;" + "E = STORE D into'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -375,21 +404,25 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } - Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( foreachA instanceof LOForEach ); + Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 ); Assert.assertTrue( filterA instanceof LOFilter ); - Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( foreachB instanceof LOForEach ); + Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 ); Assert.assertTrue( filterB instanceof LOFilter ); - + Operator unionA = newLogicalPlan.getSuccessors( filterA ).get( 0 ); Assert.assertTrue( unionA instanceof LOUnion ); Operator unionB = newLogicalPlan.getSuccessors( filterB ).get( 0 ); Assert.assertTrue( unionB instanceof LOUnion ); Assert.assertTrue( unionB == unionA ); - + Operator store = newLogicalPlan.getSuccessors(unionA).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } - + @Test public void testFilterUDFUnion() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -397,7 +430,7 @@ public class TestNewPlanPushUpFilter { "C = union A, B;" + "D = filter C by " + MyFilterFunc.class.getName() + "() ;" + "E = STORE D into'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -414,21 +447,25 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } - Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( foreachA instanceof LOForEach ); + Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 ); Assert.assertTrue( filterA instanceof LOFilter ); - Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( foreachB instanceof LOForEach ); + Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 ); Assert.assertTrue( filterB instanceof LOFilter ); - + Operator unionA = newLogicalPlan.getSuccessors( filterA ).get( 0 ); Assert.assertTrue( unionA instanceof LOUnion ); Operator unionB = newLogicalPlan.getSuccessors( filterB ).get( 0 ); Assert.assertTrue( unionB instanceof LOUnion ); Assert.assertTrue( unionB == unionA ); - + Operator store = newLogicalPlan.getSuccessors(unionA).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } - + @Test public void testFilterCross() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -437,7 +474,7 @@ public class TestNewPlanPushUpFilter { "D = filter C by $5 < 18;" + "E = limit D 10;" + "F = STORE E into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -445,12 +482,14 @@ public class TestNewPlanPushUpFilter { Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOCross ); @@ -465,7 +504,7 @@ public class TestNewPlanPushUpFilter { "C = cross A, B;" + "D = filter C by $1 < 18;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -473,12 +512,14 @@ public class TestNewPlanPushUpFilter { Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOCross ); @@ -486,7 +527,7 @@ public class TestNewPlanPushUpFilter { Assert.assertTrue( op instanceof LOStore ); } - + @Test public void testFilterCross2() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -494,7 +535,7 @@ public class TestNewPlanPushUpFilter { "C = cross A, B;" + "D = filter C by $1 < 18 and $5 < 18;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -511,14 +552,18 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } - Operator op = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( foreachA instanceof LOForEach ); + Operator op = newLogicalPlan.getSuccessors(foreachA).get( 0 ); Assert.assertTrue( op instanceof LOCross ); - op = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( foreachB instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(foreachB).get( 0 ); Assert.assertTrue( op instanceof LOCross ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); } - + @Test public void testFilterConstantConditionCross() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -526,7 +571,7 @@ public class TestNewPlanPushUpFilter { "C = cross A, B;" + "D = filter C by 1 == 1;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -543,13 +588,17 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } Operator op = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOCross ); } - + @Test public void testFilterUDFCross() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -557,7 +606,7 @@ public class TestNewPlanPushUpFilter { "C = cross A, B;" + "D = filter C by " + MyFilterFunc.class.getName() + "($0) ;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -565,19 +614,21 @@ public class TestNewPlanPushUpFilter { Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOCross ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + @Test public void testFilterCogroup() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -585,7 +636,7 @@ public class TestNewPlanPushUpFilter { "C = cogroup A by $0, B by $0;" + "D = filter C by $0 < 'name';" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -602,22 +653,26 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } - Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( foreachA instanceof LOForEach ); + Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 ); Assert.assertTrue( filterA instanceof LOFilter ); - - Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + + Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( foreachB instanceof LOForEach ); + Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 ); Assert.assertTrue( filterB instanceof LOFilter ); - + Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 ); Assert.assertTrue( cogrpA instanceof LOCogroup ); Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 ); Assert.assertTrue( cogrpB instanceof LOCogroup ); Assert.assertTrue( cogrpB == cogrpA ); - + Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } - + @Test public void testFilterConstantConditionCogroup() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -625,7 +680,7 @@ public class TestNewPlanPushUpFilter { "C = cogroup A by $0, B by $0;" + "D = filter C by 1 == 1;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -642,23 +697,27 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } - Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( foreachA instanceof LOForEach ); + Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 ); Assert.assertTrue( filterA instanceof LOFilter ); - - Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + + Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( foreachB instanceof LOForEach ); + Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 ); Assert.assertTrue( filterB instanceof LOFilter ); - + Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 ); Assert.assertTrue( cogrpA instanceof LOCogroup ); Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 ); Assert.assertTrue( cogrpB instanceof LOCogroup ); Assert.assertTrue( cogrpB == cogrpA ); - + Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } - - + + @Test public void testFilterUDFCogroup() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -666,7 +725,7 @@ public class TestNewPlanPushUpFilter { "C = cogroup A by $0, B by $0;" + "D = filter C by " + MyFilterFunc.class.getName() + "($1) ;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -683,18 +742,22 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } - Operator cogroupA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( foreachA instanceof LOForEach ); + Operator cogroupA = newLogicalPlan.getSuccessors(foreachA).get( 0 ); Assert.assertTrue( cogroupA instanceof LOCogroup ); - - Operator cogroupB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + + Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( foreachB instanceof LOForEach ); + Operator cogroupB = newLogicalPlan.getSuccessors(foreachB).get( 0 ); Assert.assertTrue( cogroupB instanceof LOCogroup ); - + Operator filter = newLogicalPlan.getSuccessors( cogroupA ).get( 0 ); Assert.assertTrue( filter instanceof LOFilter ); filter = newLogicalPlan.getSuccessors( cogroupB ).get( 0 ); Assert.assertTrue( cogroupB instanceof LOCogroup ); Assert.assertTrue( cogroupB == cogroupA ); - + Operator store = newLogicalPlan.getSuccessors(filter).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } @@ -706,7 +769,7 @@ public class TestNewPlanPushUpFilter { "C = cogroup A by $0, B by $0 outer;" + "D = filter C by $0 < 'name';" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -723,22 +786,26 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } - Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( foreachA instanceof LOForEach ); + Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 ); Assert.assertTrue( filterA instanceof LOFilter ); - - Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + + Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( foreachB instanceof LOForEach ); + Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 ); Assert.assertTrue( filterB instanceof LOFilter ); - + Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 ); Assert.assertTrue( cogrpA instanceof LOCogroup ); Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 ); Assert.assertTrue( cogrpB instanceof LOCogroup ); Assert.assertTrue( cogrpB == cogrpA ); - + Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } - + @Test public void testFilterConstantConditionCogroupOuter() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -746,7 +813,7 @@ public class TestNewPlanPushUpFilter { "C = cogroup A by $0, B by $0 outer;" + "D = filter C by 1 == 1;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -763,22 +830,26 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } - Operator filterA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( foreachA instanceof LOForEach ); + Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 ); Assert.assertTrue( filterA instanceof LOFilter ); - - Operator filterB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + + Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( foreachB instanceof LOForEach ); + Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 ); Assert.assertTrue( filterB instanceof LOFilter ); - + Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 ); Assert.assertTrue( cogrpA instanceof LOCogroup ); Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 ); Assert.assertTrue( cogrpB instanceof LOCogroup ); Assert.assertTrue( cogrpB == cogrpA ); - + Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } - + @Test public void testFilterUDFCogroupOuter() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -786,7 +857,7 @@ public class TestNewPlanPushUpFilter { "C = cogroup A by $0, B by $0 outer;" + "D = filter C by " + MyFilterFunc.class.getName() + "() ;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -803,72 +874,82 @@ public class TestNewPlanPushUpFilter { loadB = loads.get( 0 ); } - Operator cogroupA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 ); + Assert.assertTrue( foreachA instanceof LOForEach ); + Operator cogroupA = newLogicalPlan.getSuccessors(foreachA).get( 0 ); Assert.assertTrue( cogroupA instanceof LOCogroup ); - - Operator cogroupB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + + Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 ); + Assert.assertTrue( foreachB instanceof LOForEach ); + Operator cogroupB = newLogicalPlan.getSuccessors(foreachB).get( 0 ); Assert.assertTrue( cogroupB instanceof LOCogroup ); - + Operator filter = newLogicalPlan.getSuccessors( cogroupA ).get( 0 ); Assert.assertTrue( filter instanceof LOFilter ); filter = newLogicalPlan.getSuccessors( cogroupB ).get( 0 ); Assert.assertTrue( filter instanceof LOFilter ); Assert.assertTrue( cogroupB == cogroupA ); - + Operator store = newLogicalPlan.getSuccessors(filter).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } - + @Test public void testFilterGroupBy() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = group A by $0;" + "C = filter B by $0 < 'name';" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOCogroup ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + @Test public void testFilterConstantConditionGroupBy() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = group A by $0;" + "C = filter B by 1 == 1;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOCogroup ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + @Test public void testFilterUDFGroupBy() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = group A by $0;" + "C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOCogroup ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); @@ -882,50 +963,56 @@ public class TestNewPlanPushUpFilter { "B = group A by $0 outer;" + "C = filter B by $0 < 'name';" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOCogroup ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + @Test public void testFilterConstantConditionGroupByOuter() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = group A by $0 outer;" + "C = filter B by 1 == 1;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOCogroup ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + @Test public void testFilterUDFGroupByOuter() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = group A by $0 outer;" + "C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" + "D = STORE C into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator op = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( op instanceof LOLoad ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOCogroup ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); @@ -941,7 +1028,7 @@ public class TestNewPlanPushUpFilter { "D = filter C by $0 < 'name';" + "E = limit D 10;" + "F = STORE E into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -949,19 +1036,21 @@ public class TestNewPlanPushUpFilter { Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOLimit ); } - + @Test public void testFilterFRJoin1() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -970,7 +1059,7 @@ public class TestNewPlanPushUpFilter { "D = filter C by $4 < 'name';" + "E = limit D 10;" + "F = STORE E into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -978,19 +1067,21 @@ public class TestNewPlanPushUpFilter { Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOLimit ); } - + @Test // Constant filter condition, the filter will be pushed up to the first branch of join. public void testFilterConstantConditionFRJoin() throws Exception { @@ -999,7 +1090,7 @@ public class TestNewPlanPushUpFilter { "C = join A by $0, B by $0 using 'replicated';" + "D = filter C by 1 == 1;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -1007,19 +1098,21 @@ public class TestNewPlanPushUpFilter { Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + @Test // UDF takes on argument, so it's constant. As a result, filter will pushed up to the first branch of the join. public void testFilterUDFFRJoin() throws Exception { @@ -1028,27 +1121,29 @@ public class TestNewPlanPushUpFilter { "C = join A by $0, B by $0 using 'replicated';" + "D = filter C by " + MyFilterFunc.class.getName() + "();" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + @Test // UDF takes all input, so filter connot be pushed up. public void testFilterUDFFRJoin1() throws Exception { @@ -1057,17 +1152,21 @@ public class TestNewPlanPushUpFilter { "C = join A by $0, B by $0 using 'replicated';" + "D = filter C by TupleSize(*) > 5;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = newLogicalPlan.getSuccessors( loads.get( 0 ) ).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); - + op = newLogicalPlan.getSuccessors( loads.get( 1 ) ).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); @@ -1081,20 +1180,22 @@ public class TestNewPlanPushUpFilter { "D = filter C by $0 < 'name';" + "E = limit D 10;" + "F = STORE E into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); @@ -1110,20 +1211,22 @@ public class TestNewPlanPushUpFilter { "D = filter C by $4 < 'name';" + "E = limit D 10;" + "F = STORE E into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); @@ -1134,25 +1237,27 @@ public class TestNewPlanPushUpFilter { @Test public void testFilterInnerJoin2() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + - "B = load 'anotherfile' as (name, age, preference);" + + "B = load 'anotherfile' as (name, age, preference);" + "C = join A by $0, B by $0;" + "D = filter C by $0 < 'jonh' OR $1 > 50;" + "E = limit D 10;" + "F = STORE E into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); @@ -1168,17 +1273,21 @@ public class TestNewPlanPushUpFilter { "D = filter C by $4 < 'name' AND $0 == 'joe';" + "E = limit D 10;" + "F = STORE E into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = newLogicalPlan.getSuccessors( loads.get( 0 ) ).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); op = newLogicalPlan.getSuccessors( loads.get( 1 ) ).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); @@ -1191,27 +1300,29 @@ public class TestNewPlanPushUpFilter { "C = join A by $0, B by $0;" + "D = filter C by " + MyFilterFunc.class.getName() + "() ;" + "E = STORE D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); op = newLogicalPlan.getSuccessors(op).get( 0 ); + Assert.assertTrue( op instanceof LOForEach ); + op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOJoin ); op = newLogicalPlan.getSuccessors(op).get( 0 ); Assert.assertTrue( op instanceof LOStore ); } - + // See PIG-1289 @Test public void testOutJoin() throws Exception { @@ -1220,15 +1331,15 @@ public class TestNewPlanPushUpFilter { "C = join A by name LEFT OUTER, B by name;" + "D = filter C by B::name is null;" + "store D into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan(query); - + Operator op = newLogicalPlan.getSinks().get(0); Assert.assertTrue( op instanceof LOStore ); op = newLogicalPlan.getPredecessors(op).get( 0 ); Assert.assertTrue( op instanceof LOFilter ); } - + // See PIG-1507 @Test public void testFullOutJoin() throws Exception { @@ -1237,9 +1348,9 @@ public class TestNewPlanPushUpFilter { "c = join A by d1 full outer, B by d2;" + "d = filter c by d2 is null;" + "store d into 'dummy';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan(query); - + Operator op = newLogicalPlan.getSinks().get(0); Assert.assertTrue( op instanceof LOStore ); op = newLogicalPlan.getPredecessors(op).get( 0 ); @@ -1260,7 +1371,7 @@ public class TestNewPlanPushUpFilter { "F = filter E by d1 > 5;" + "G = store F into 'dummy';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan(query); - + List<Operator> ops = newLogicalPlan.getSinks(); Assert.assertTrue( ops.size() == 1 ); Operator op = ops.get( 0 ); @@ -1279,32 +1390,34 @@ public class TestNewPlanPushUpFilter { optimizer.optimize(); return newLogicalPlan; } - + public class MyPlanOptimizer extends LogicalPlanOptimizer { protected MyPlanOptimizer(OperatorPlan p, int iterations) { super(p, iterations, new HashSet<String>()); } - + + @Override public void addPlanTransformListener(PlanTransformListener listener) { super.addPlanTransformListener(listener); } - - protected List<Set<Rule>> buildRuleSets() { + + @Override + protected List<Set<Rule>> buildRuleSets() { List<Set<Rule>> ls = new ArrayList<Set<Rule>>(); - + Set<Rule> s = new HashSet<Rule>(); // add split filter rule Rule r = new LoadTypeCastInserter( "TypeCastInserter" ); s.add(r); ls.add(s); - + s = new HashSet<Rule>(); r = new PushUpFilter( "PushUpFilter" ); - s.add(r); + s.add(r); ls.add(s); - + return ls; } - } + } }
Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1736904&r1=1736903&r2=1736904&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Mon Mar 28 16:58:33 2016 @@ -18,11 +18,11 @@ package org.apache.pig.test; +import static org.apache.pig.builtin.mock.Storage.resetData; import static org.apache.pig.builtin.mock.Storage.tuple; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.File; @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map.Entry; import java.util.Properties; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.ExecType; @@ -45,6 +44,8 @@ import org.apache.pig.backend.hadoop.dat import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.builtin.PigStorage; +import org.apache.pig.builtin.mock.Storage.Data; +import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; @@ -68,7 +69,7 @@ public class TestPigStorage { private static final String datadir = "build/test/tmpdata/"; @Before - public void setup() throws IOException { + public void setup() throws Exception { // some tests are in map-reduce mode and some in local - so before // each test, we will re-initialize FileLocalizer so that temp files // are created correctly depending on the ExecType in the test. @@ -656,36 +657,95 @@ public class TestPigStorage { @Test public void testIncompleteDataWithPigSchema() throws Exception { - File parent = new File(datadir, "incomplete_data_with_pig_schema_1"); - parent.deleteOnExit(); - parent.mkdirs(); - File tmpInput = File.createTempFile("tmp", "tmp"); - tmpInput.deleteOnExit(); - File outFile = new File(parent, "out"); - pig.registerQuery("a = load '"+Util.encodeEscape(tmpInput.getAbsolutePath())+"' as (x:int, y:chararray, z:chararray);"); - pig.store("a", outFile.getAbsolutePath(), "PigStorage('\\t', '-schema')"); - File schemaFile = new File(outFile, ".pig_schema"); + Data data = resetData(pig); + String schema = "{\"fields\":[{\"name\":\"x\",\"type\":10,\"schema\":null}," + + "{\"name\":\"y\",\"type\":55,\"schema\":null}," + + "{\"name\":\"z\",\"type\":55,\"schema\":null}]," + + "\"version\":0,\"sortKeys\":[],\"sortKeyOrders\":[]}"; - parent = new File(datadir, "incomplete_data_with_pig_schema_2"); + File parent = new File(datadir, "incomplete_data_with_pig_schema_2"); parent.deleteOnExit(); File inputDir = new File(parent, "input"); inputDir.mkdirs(); File inputSchemaFile = new File(inputDir, ".pig_schema"); - FileUtils.moveFile(schemaFile, inputSchemaFile); + Util.writeToFile(inputSchemaFile, new String[] {schema}); File inputFile = new File(inputDir, "data"); Util.writeToFile(inputFile, new String[]{"1"}); pig.registerQuery("a = load '"+Util.encodeEscape(inputDir.getAbsolutePath())+"';"); - Iterator<Tuple> it = pig.openIterator("a"); - assertTrue(it.hasNext()); - assertEquals(tuple(1,null,null), it.next()); - assertFalse(it.hasNext()); + pig.registerQuery("store a into 'actual' using mock.Storage();"); + data.set("expected", tuple(1, null, null)); + Assert.assertEquals(data.get("expected"), data.get("actual")); // Now, test with prune - pig.registerQuery("a = load '"+Util.encodeEscape(inputDir.getAbsolutePath())+"'; b = foreach a generate y, z;"); - it = pig.openIterator("b"); - assertTrue(it.hasNext()); - assertEquals(tuple(null,null), it.next()); - assertFalse(it.hasNext()); + data = resetData(pig); + data.set("expected", tuple(1, null)); + pig.registerQuery("a = load '"+Util.encodeEscape(inputDir.getAbsolutePath())+"'; b = foreach a generate x, z;"); + pig.registerQuery("store b into 'actual' using mock.Storage();"); + Assert.assertEquals(data.get("expected"), data.get("actual")); + +// TODO: TypeCaster should be adding a cast for this case but it always uses the file schema +// data = resetData(pig); +// data.set("expected", tuple(new DataByteArray("1"), null)); +// pig.registerQuery("a = load '"+Util.encodeEscape(inputDir.getAbsolutePath())+"' as (x: bytearray, y:bytearray, z:bytearray);"); +// pig.registerQuery("b = foreach a generate x, z;"); +// pig.registerQuery("store b into 'actual' using mock.Storage();"); +// Assert.assertEquals(data.get("expected"), data.get("actual")); + + schema = "{\"fields\":[{\"name\":\"x\",\"type\":50,\"schema\":null}," + + "{\"name\":\"y\",\"type\":50,\"schema\":null}," + + "{\"name\":\"z\",\"type\":50,\"schema\":null}]," + + "\"version\":0,\"sortKeys\":[],\"sortKeyOrders\":[]}"; + Util.writeToFile(inputSchemaFile, new String[] {schema}); + data = resetData(pig); + data.set("expected", tuple(new DataByteArray("1"), null)); + pig.registerQuery("a = load '"+Util.encodeEscape(inputDir.getAbsolutePath())+"' as (x: bytearray, y:bytearray, z:bytearray);"); + pig.registerQuery("b = foreach a generate x, z;"); + pig.registerQuery("store b into 'actual' using mock.Storage();"); + Assert.assertEquals(data.get("expected"), data.get("actual")); + } + + @Test + public void testIncompleteDataNoPigSchema() throws Exception { + + File inputFile = new File(datadir, "incomplete_data_no_pigschema"); + inputFile.deleteOnExit(); + Util.writeToFile(inputFile, new String[]{"1\t2", "2\t3"}); + Data data = resetData(pig); + + String query = "A = LOAD '"+ Util.encodeEscape(inputFile.getAbsolutePath()) + "' as (x, y, z);" + + "store A into 'actual' using mock.Storage();"; + + Util.registerMultiLineQuery(pig, query); + data.set("expected", + tuple(new DataByteArray("1"), new DataByteArray("2"), null), + tuple(new DataByteArray("2"), new DataByteArray("3"), null)); + + Assert.assertEquals(data.get("expected"), data.get("actual")); + + data = resetData(pig); + query = "A = LOAD '"+ Util.encodeEscape(inputFile.getAbsolutePath()) + + "' using " + PigExtendedStorage.class.getName() + " as (x, y, z);" + + "store A into 'actual' using mock.Storage();"; + + pig.registerQuery(query); + data.set("expected", + tuple(new DataByteArray("1"), new DataByteArray("2"), new DataByteArray("extracolumn")), + tuple(new DataByteArray("2"), new DataByteArray("3"), new DataByteArray("extracolumn"))); + + Assert.assertEquals(data.get("expected"), data.get("actual")); + + + } + + public static class PigExtendedStorage extends PigStorage { + + @Override + public Tuple getNext() throws IOException { + Tuple tuple = super.getNext(); + tuple.append(new DataByteArray("extracolumn")); + return tuple; + } + } Modified: pig/trunk/test/org/apache/pig/test/data/DotFiles/explain1.dot URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/DotFiles/explain1.dot?rev=1736904&r1=1736903&r2=1736904&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/DotFiles/explain1.dot (original) +++ pig/trunk/test/org/apache/pig/test/data/DotFiles/explain1.dot Mon Mar 28 16:58:33 2016 @@ -69,10 +69,46 @@ s5875509_in -> 253899 [style=invis]; 21192393 -> s5875509_out [style=invis]; 32545329 [label="LOLoad", style="filled", fillcolor="gray"]; 17330894 [label="LOStore", style="filled", fillcolor="gray"]; +s26867942_in [label="", style=invis, height=0, width=0]; +s26867942_out [label="", style=invis, height=0, width=0]; +subgraph cluster_26867942 { +label="LOForEach"labelloc=b; +s27525999_in [label="", style=invis, height=0, width=0]; +s27525999_out [label="", style=invis, height=0, width=0]; +subgraph cluster_27525999 { +label="LOGenerate"labelloc=b; +11705501 [label="Project0:(*)"]; +s27525999_in -> 11705501 [style=invis]; +4729773 [label="Project1:(*)"]; +s27525999_in -> 4729773 [style=invis]; +2861196 [label="Project2:(*)"]; +s27525999_in -> 2861196 [style=invis]; +4629854 [label="Project3:(*)"]; +s27525999_in -> 4629854 [style=invis]; +}; +11705501 -> s27525999_out [style=invis]; +4729773 -> s27525999_out [style=invis]; +2861196 -> s27525999_out [style=invis]; +4629854 -> s27525999_out [style=invis]; +14518777 [label="LOInnerLoad"]; +9263789 [label="LOInnerLoad"]; +3945981 [label="LOInnerLoad"]; +16555307 [label="LOInnerLoad"]; +14518777 -> s27525999_in [lhead=cluster_27525999] +9263789 -> s27525999_in [lhead=cluster_27525999] +3945981 -> s27525999_in [lhead=cluster_27525999] +16555307 -> s27525999_in [lhead=cluster_27525999] +s26867942_in -> 14518777 [style=invis]; +s26867942_in -> 9263789 [style=invis]; +s26867942_in -> 3945981 [style=invis]; +s26867942_in -> 16555307 [style=invis]; +}; +s27525999_out -> s26867942_out [style=invis]; s26567569_out -> 17330894 +s7897563_out -> s26567569_in [lhead=cluster_26567569] s18554240_out -> s7897563_in [lhead=cluster_7897563] s5875509_out -> s26867942_in [lhead=cluster_26867942] -s26867942_out -> s18554240_26867942_in [lhead=cluster_18554240_26867942] 32545329 -> s5875509_in [lhead=cluster_5875509] +s26867942_out -> s18554240_26867942_in [lhead=cluster_18554240_26867942] }
