Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1737436&r1=1737435&r2=1737436&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Fri Apr 1 20:19:53 2016 @@ -22,6 +22,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -1097,6 +1098,10 @@ public class TestEvalPipeline { } Util.createInputFile(cluster, "table", inpString); + StringWriter writer = new StringWriter(); + if (cluster.getExecType().name().equals("TEZ")) { + Util.createLogAppender("testNoCombinerInReducer", writer, Class.forName("org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder")); + } pigServer.registerQuery("a = LOAD 'table' AS (i:int);"); pigServer.registerQuery("b = group a ALL;"); @@ -1116,6 +1121,10 @@ public class TestEvalPipeline { Assert.assertTrue(DataType.compare(expectedBag.size(), resultBagSize) == 0); } + if (cluster.getExecType().name().equals("TEZ")) { + Assert.assertTrue(writer.toString().contains("Turning off combiner in reducer")); + Util.removeLogAppender("testNoCombinerInReducer", Class.forName("org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder")); + } Util.deleteFile(cluster, "table"); }
Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1737436&r1=1737435&r2=1737436&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri Apr 1 20:19:53 2016 @@ -1370,6 +1370,39 @@ public class TestGrunt { validate(query, false, msgs.toArray(new String[0])); } + @Test + public void testWithInlineOp() throws Throwable { + // specifying schema inside inline-op makes PigScriptParser.jj to read + // to the end of file + String query = "a = load 'i1' as (f1:chararray);" + + "b = foreach (foreach a generate f1 as b1) generate b1; " + + "dump b; "; + + ArrayList<String> msgs = new ArrayList<String>(); // + validate(query, true, msgs.toArray(new String[0])); + } + + /* + * Following test currently fails. Insead of making further changes to + * PigScriptParser.jj, leaving it till we move out of javacc in PIG-2597 + + @Test + public void testWithInlineOpWithNestedForeach() throws Throwable { + // This one currently fails because "{}" is treated as + // IN_BLOCK in PigScriptParser.jj which jumps to PIG_END and ignore + // ") generate *; " part of the code. + // In order to support this test, we need to add parenthesis matching + // everywhere in PigScriptParser.jj (or stop using it) + // + String query = "a = load 'i1' as (f1:chararray);" + + "b = group a ALL; " + + "c = foreach ( foreach b {b1 = limit a 3; generate 1, b1;} ) generate *; " + + "dump c;"; + ArrayList<String> msgs = new ArrayList<String>(); // + validate(query, true, msgs.toArray(new String[0])); + } + */ + private void validate(String query, boolean syntaxOk, Modified: pig/branches/spark/test/org/apache/pig/test/TestMergeForEachOptimization.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMergeForEachOptimization.java?rev=1737436&r1=1737435&r2=1737436&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMergeForEachOptimization.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMergeForEachOptimization.java Fri Apr 1 20:19:53 2016 @@ -55,37 +55,37 @@ public class TestMergeForEachOptimizatio LogicalPlan plan = null; PigContext pc = new PigContext( ExecType.LOCAL, new Properties() ); PigServer pigServer = null; - + @Before public void setup() throws ExecException { pigServer = new PigServer( pc ); } - + @After public void tearDown() { - + } - + /** * Basic test case. Two simple FOREACH statements can be merged to one. - * @throws Exception + * @throws Exception */ - @Test + @Test public void testSimple() throws Exception { String query = "A = load 'file.txt' as (a, b, c);" + "B = foreach A generate a+b, c-b;" + "C = foreach B generate $0+5, $1;" + - "store C into 'empty';"; + "store C into 'empty';"; LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query); - + int forEachCount1 = getForEachOperatorCount( newLogicalPlan ); int outputExprCount1 = getOutputExprCount( newLogicalPlan ); LOForEach foreach1 = getForEachOperator( newLogicalPlan ); Assert.assertTrue( foreach1.getAlias().equals( "C" ) ); - + PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 ); optimizer.optimize(); - + int forEachCount2 = getForEachOperatorCount( newLogicalPlan ); Assert.assertEquals( 1, forEachCount1 - forEachCount2 ); int outputExprCount2 = getOutputExprCount( newLogicalPlan ); @@ -93,26 +93,26 @@ public class TestMergeForEachOptimizatio LOForEach foreach2 = getForEachOperator( newLogicalPlan ); Assert.assertTrue( foreach2.getAlias().equals( "C" ) ); } - + /** * Test more complex case where the first for each in the script has inner plan. - * @throws Exception + * @throws Exception */ @Test public void testComplex() throws Exception { String query = "A = load 'file.txt' as (a:int, b, c:bag{t:tuple(c0:int,c1:int)});" + "B = foreach A { S = ORDER c BY $0; generate $0, COUNT(S), SIZE(S); };" + - "C = foreach B generate $2+5 as x, $0-$1/2 as y;" + "store C into 'empty';" ; + "C = foreach B generate $2+5 as x, $0-$1/2 as y;" + "store C into 'empty';" ; LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query); - + int forEachCount1 = getForEachOperatorCount( newLogicalPlan ); int outputExprCount1 = getOutputExprCount( newLogicalPlan ); LOForEach foreach1 = getForEachOperator( newLogicalPlan ); Assert.assertTrue( foreach1.getAlias().equals( "C" ) ); - + PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 ); optimizer.optimize(); - + int forEachCount2 = getForEachOperatorCount( newLogicalPlan ); // The number of FOREACHes didn't change because one is genereated because of type cast and // one is reduced because of the merge. @@ -125,10 +125,10 @@ public class TestMergeForEachOptimizatio Assert.assertTrue(newSchema.getField(0).alias.equals("x")); Assert.assertTrue(newSchema.getField(1).alias.equals("y")); } - + /** * One output of first foreach was referred more than once in the second foreach - * @throws Exception + * @throws Exception */ @Test public void testDuplicateInputs() throws Exception { @@ -136,84 +136,84 @@ public class TestMergeForEachOptimizatio "A1 = foreach A generate (int)a0 as a0, (double)a1 as a1;" + "B = group A1 all;" + "C = foreach B generate A1;" + - "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" + "store D into 'empty';" ; + "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" + "store D into 'empty';" ; LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query); - + Operator store = newLogicalPlan.getSinks().get(0); int forEachCount1 = getForEachOperatorCount( newLogicalPlan ); LOForEach foreach1 = (LOForEach)newLogicalPlan.getPredecessors(store).get(0); Assert.assertTrue( foreach1.getAlias().equals( "D" ) ); - + PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 ); optimizer.optimize(); - + int forEachCount2 = getForEachOperatorCount( newLogicalPlan ); // The number of FOREACHes didn't change because one is genereated because of type cast and // one is reduced because of the merge. Assert.assertEquals( 1, forEachCount1 - forEachCount2 ); - + LOForEach foreach2 = (LOForEach)newLogicalPlan.getPredecessors(store).get(0); Assert.assertTrue( foreach2.getAlias().equals( "D" ) ); } - + /** * Not all consecutive FOREACHes can be merged. In this case, the second FOREACH statment * has inner plan, which cannot be merged with one before it. - * @throws Exception + * @throws Exception */ @Test public void testNegative1() throws Exception { String query = "A = LOAD 'file.txt' as (a, b, c, d:bag{t:tuple(c0:int,c1:int)});" + "B = FOREACH A GENERATE a+5 AS u, b-c/2 AS v, d AS w;" + "C = FOREACH B { S = ORDER w BY $0; GENERATE $0 as x, COUNT(S) as y; };" + - "store C into 'empty';"; + "store C into 'empty';"; LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query); - + int forEachCount1 = getForEachOperatorCount( newLogicalPlan ); - + PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 ); optimizer.optimize(); int forEachCount2 = getForEachOperatorCount( newLogicalPlan ); - + // Actually MergeForEach optimization is happening here. A new foreach will be inserted after A because - // of typ casting. The inserted one and the one in B can be merged due to this optimization. However, + // of typ casting. The inserted one and the one in B can be merged due to this optimization. However, // the plan cannot be further optimized because C has inner plan. Assert.assertEquals( forEachCount1, forEachCount2 ); } - + /** * MergeForEach Optimization is off if the first statement has a FLATTEN operator. - * @throws Exception + * @throws Exception */ @Test public void testNegative2() throws Exception { String query = "A = LOAD 'file.txt' as (a, b, c);" + "B = FOREACH A GENERATE FLATTEN(a), b, c;" + - "C = FOREACH B GENERATE $0, $1+$2;" + "store C into 'empty';" ; + "C = FOREACH B GENERATE $0, $1+$2;" + "store C into 'empty';" ; LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query); - + int forEachCount1 = getForEachOperatorCount( newLogicalPlan ); - + PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 ); optimizer.optimize(); - + int forEachCount2 = getForEachOperatorCount( newLogicalPlan ); Assert.assertEquals( 2, forEachCount1 ); Assert.assertEquals( 2, forEachCount2 ); } - - + + /** * Ensure that join input order does not get reversed (PIG-1672) - * @throws Exception + * @throws Exception */ - @Test + @Test public void testJoinInputOrder() throws Exception { String query = "l1 = load 'y' as (a);" + "l2 = load 'z' as (a1,b1,c1,d1);" + "f1 = foreach l2 generate a1, b1, c1, d1;" + "f2 = foreach f1 generate a1, b1, c1;" + - "j1 = join f2 by a1, l1 by a using 'replicated';" + "store j1 into 'empty';" ; + "j1 = join f2 by a1, l1 by a using 'replicated';" + "store j1 into 'empty';" ; LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query); int forEachCount1 = getForEachOperatorCount( newLogicalPlan ); @@ -225,15 +225,15 @@ public class TestMergeForEachOptimizatio } LOForEach foreachL2 = (LOForEach)newLogicalPlan.getSuccessors(l2).get(0); foreachL2 = (LOForEach)newLogicalPlan.getSuccessors(foreachL2).get(0); - + int outputExprCount1 = ((LOGenerate)foreachL2.getInnerPlan().getSinks().get(0)).getOutputPlans().size(); - + PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 ); optimizer.optimize(); - + int forEachCount2 = getForEachOperatorCount( newLogicalPlan ); - Assert.assertEquals( 1, forEachCount1 - forEachCount2 ); - + Assert.assertEquals( 0, forEachCount1 - forEachCount2 ); + loads = newLogicalPlan.getSources(); l2 = null; for (Operator load : loads) { @@ -241,21 +241,21 @@ public class TestMergeForEachOptimizatio l2 = load; } foreachL2 = (LOForEach)newLogicalPlan.getSuccessors(l2).get(0); - + int outputExprCount2 = ((LOGenerate)foreachL2.getInnerPlan().getSinks().get(0)).getOutputPlans().size(); - + Assert.assertTrue( outputExprCount1 == outputExprCount2 ); Assert.assertTrue( foreachL2.getAlias().equals( "f2" ) ); - + LOJoin join = (LOJoin)getOperator(newLogicalPlan, LOJoin.class); LogicalRelationalOperator leftInp = (LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(0); - assertEquals("join child left", leftInp.getAlias(), "f2"); - + assertEquals("join child left", leftInp.getAlias(), "f2"); + LogicalRelationalOperator rightInp = (LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(1); - assertEquals("join child right", rightInp.getAlias(), "l1"); - + assertEquals("join child right", rightInp.getAlias(), "l1"); + } private int getForEachOperatorCount(LogicalPlan plan) { @@ -268,7 +268,7 @@ public class TestMergeForEachOptimizatio } return count; } - + private int getOutputExprCount(LogicalPlan plan) throws IOException { LOForEach foreach = getForEachOperator( plan ); LogicalPlan inner = foreach.getInnerPlan(); @@ -276,7 +276,7 @@ public class TestMergeForEachOptimizatio LOGenerate gen = (LOGenerate)ops.get( 0 ); return gen.getOutputPlans().size(); } - + private LOForEach getForEachOperator(LogicalPlan plan) throws IOException { Iterator<Operator> ops = plan.getOperators(); while( ops.hasNext() ) { @@ -290,7 +290,7 @@ public class TestMergeForEachOptimizatio } return null; } - + /** * returns first operator that is an instance of given class c * @param plan @@ -303,41 +303,42 @@ public class TestMergeForEachOptimizatio while( ops.hasNext() ) { Operator op = ops.next(); if( op.getClass().equals(c)) { - return op; + return op; } } return null; } - + public class MyPlanOptimizer extends LogicalPlanOptimizer { protected MyPlanOptimizer(OperatorPlan p, int iterations) { super(p, iterations, new HashSet<String>()); } - - protected List<Set<Rule>> buildRuleSets() { + + @Override + protected List<Set<Rule>> buildRuleSets() { List<Set<Rule>> ls = new ArrayList<Set<Rule>>(); - + Set<Rule> s = new HashSet<Rule>(); // add split filter rule Rule r = new LoadTypeCastInserter( "TypeCastInserter" ); s.add(r); ls.add(s); - + // Split Set // This set of rules does splitting of operators only. // It does not move operators s = new HashSet<Rule>(); r = new AddForEach( "AddForEach" ); - s.add(r); + s.add(r); ls.add(s); - + s = new HashSet<Rule>(); r = new MergeForEach("MergeForEach"); - s.add(r); + s.add(r); ls.add(s); return ls; } - } + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1737436&r1=1737435&r2=1737436&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java Fri Apr 1 20:19:53 2016 @@ -575,7 +575,7 @@ public class TestMultiQueryCompiler { LogicalPlan lp = checkLogicalPlan(2, 1, 7); - PhysicalPlan pp = checkPhysicalPlan(lp, 2, 1, 11); + PhysicalPlan pp = checkPhysicalPlan(lp, 2, 1, 13); checkMRPlan(pp, 1, 1, 2); Modified: pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1737436&r1=1737435&r2=1737436&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Fri Apr 1 20:19:53 2016 @@ -318,6 +318,8 @@ public class TestNewPlanFilterAboveForea Assert.assertTrue( filter instanceof LOFilter ); Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 ); Assert.assertTrue( fe instanceof LOForEach ); + fe = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } @@ -335,6 +337,8 @@ public class TestNewPlanFilterAboveForea Assert.assertTrue( load instanceof LOLoad ); Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( fe instanceof LOForEach ); + fe = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 ); Assert.assertTrue( filter instanceof LOFilter ); Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 ); @@ -354,6 +358,8 @@ public class TestNewPlanFilterAboveForea Assert.assertTrue( load instanceof LOLoad ); Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( fe instanceof LOForEach ); + fe = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 ); Assert.assertTrue( filter instanceof LOFilter ); Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 ); @@ -375,6 +381,8 @@ public class TestNewPlanFilterAboveForea Assert.assertTrue( filter instanceof LOFilter ); Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 ); Assert.assertTrue( fe instanceof LOForEach ); + fe = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } @@ -395,6 +403,8 @@ public class TestNewPlanFilterAboveForea Assert.assertTrue( filter instanceof LOFilter ); Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 ); Assert.assertTrue( fe instanceof LOForEach ); + fe = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } @@ -414,6 +424,8 @@ public class TestNewPlanFilterAboveForea Assert.assertTrue( filter instanceof LOFilter ); Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 ); Assert.assertTrue( fe instanceof LOForEach ); + fe = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } @@ -433,6 +445,8 @@ public class TestNewPlanFilterAboveForea Assert.assertTrue( filter instanceof LOFilter ); Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 ); Assert.assertTrue( fe instanceof LOForEach ); + fe = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 ); Assert.assertTrue( store instanceof LOStore ); } Modified: pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterRule.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1737436&r1=1737435&r2=1737436&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterRule.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestNewPlanFilterRule.java Fri Apr 1 20:19:53 2016 @@ -488,7 +488,9 @@ public class TestNewPlanFilterRule { Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); - Operator group = newLogicalPlan.getSuccessors( load ).get( 0 ); + Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); + Operator group = newLogicalPlan.getSuccessors( fe ).get( 0 ); Assert.assertTrue( group instanceof LOCogroup ); Operator filter = newLogicalPlan.getSuccessors( group ).get( 0 ); Assert.assertTrue( filter instanceof LOFilter ); Modified: pig/branches/spark/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java?rev=1737436&r1=1737435&r2=1737436&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java Fri Apr 1 20:19:53 2016 @@ -21,6 +21,7 @@ import static org.apache.pig.newplan.log import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.HashSet; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -39,6 +40,7 @@ import org.apache.pig.newplan.logical.ex import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer; import org.apache.pig.newplan.logical.optimizer.SchemaResetter; import org.apache.pig.newplan.logical.relational.LOFilter; +import org.apache.pig.newplan.logical.relational.LOForEach; import org.apache.pig.newplan.logical.relational.LOJoin; import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE; import org.apache.pig.newplan.logical.relational.LOLoad; @@ -172,6 +174,11 @@ public class TestNewPlanLogicalOptimizer expected.add(DA); expected.connect(A, DA); + // A = foreach + LOForEach foreachA = org.apache.pig.newplan.logical.Util.addForEachAfter(expected, DA, 0, new HashSet<Integer>()); + foreachA.setAlias("A"); + foreachA.neverUseForRealSetSchema(aschema); + // B = load LogicalSchema bschema = new LogicalSchema(); bschema.addField(new LogicalSchema.LogicalFieldSchema( @@ -193,6 +200,11 @@ public class TestNewPlanLogicalOptimizer expected.add(DB); expected.connect(B, DB); + // B = foreach + LOForEach foreachB = org.apache.pig.newplan.logical.Util.addForEachAfter(expected, DB, 0, new HashSet<Integer>()); + foreachB.setAlias("B"); + foreachB.neverUseForRealSetSchema(bschema); + // C = join LogicalSchema cschema = new LogicalSchema(); cschema.addField(new LogicalSchema.LogicalFieldSchema( @@ -221,8 +233,8 @@ public class TestNewPlanLogicalOptimizer mm.put(1, bprojplan); C.neverUseForRealSetSchema(cschema); expected.add(C); - expected.connect(DA, C); - expected.connect(DB, C); + expected.connect(foreachA, C); + expected.connect(foreachB, C); // D = filter LogicalExpressionPlan filterPlan = new LogicalExpressionPlan(); Modified: pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1737436&r1=1737435&r2=1737436&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java Fri Apr 1 20:19:53 2016 @@ -64,25 +64,25 @@ public class TestNewPlanPushDownForeachF } /** - * + * * A simple filter UDF for testing * */ static public class MyFilterFunc extends FilterFunc { - + @Override public Boolean exec(Tuple input) { return false; } } - + /** * Old plan is empty, so is the optimized new plan. */ @Test public void testErrorEmptyInput() throws Exception { LogicalPlan newLogicalPlan = migrateAndOptimizePlan( "" ); - + Assert.assertTrue( newLogicalPlan.getOperators().hasNext() == false ); } @@ -100,7 +100,7 @@ public class TestNewPlanPushDownForeachF List<Operator> nexts = newLogicalPlan.getSuccessors( load ); Assert.assertTrue( nexts != null && nexts.size() == 1 ); } - + @Test public void testForeachNoFlatten() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -108,28 +108,30 @@ public class TestNewPlanPushDownForeachF "C = order B by $0, $1;" + "D = store C into 'dummy';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 ); Assert.assertTrue( sort instanceof LOSort ); } - + @Test public void testForeachNoSuccessors() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = foreach A generate flatten($1);" + "Store B into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); } - + @Test public void testForeachStreaming() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -137,61 +139,65 @@ public class TestNewPlanPushDownForeachF "C = stream B through `" + "pc -l" + "`;" + "Store C into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); } - + @Test public void testForeachDistinct() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = foreach A generate flatten($1);" + "C = distinct B;" + "store C into 'output';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); } - + @Test public void testForeachForeach() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + - "B = foreach A generate $0, $1, flatten(1);" + + "B = foreach A generate $0, $1, flatten(1);" + "C = foreach B generate $0;" + "store C into 'output';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) ); foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); Assert.assertTrue( !OptimizerUtils.hasFlatten( (LOForEach)foreach ) ); } - + @Test public void testForeachFilter() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + - "B = foreach A generate $0, $1, flatten($2);" + + "B = foreach A generate $0, $1, flatten($2);" + "C = filter B by $1 < 18;" + "store C into 'output';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) ); } @@ -200,15 +206,17 @@ public class TestNewPlanPushDownForeachF String query = "A = load 'myfile' as (name, age, gpa);" + "B = foreach A generate $0, $1, flatten($2);" + "split B into C if $1 < 18, D if $1 >= 18;" + - "store C into 'output1';" + + "store C into 'output1';" + "store D into 'output2';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) ); } @@ -218,13 +226,15 @@ public class TestNewPlanPushDownForeachF "B = foreach A generate $0, $1, flatten($2);" + "C = limit B 10;" + "store C into 'output';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) ); } @@ -235,24 +245,26 @@ public class TestNewPlanPushDownForeachF "C = load 'anotherfile' as (name, age, preference);" + "D = union B, C;" + "store D into 'output';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator load = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) load = loads.get( 0 ); else load = loads.get( 1 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) ); } - + @Test public void testForeachCogroup() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -260,7 +272,7 @@ public class TestNewPlanPushDownForeachF "C = load 'anotherfile' as (name, age, preference);" + "D = cogroup B by $0, C by $0;" + "store D into 'output';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -268,32 +280,36 @@ public class TestNewPlanPushDownForeachF Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator load = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) load = loads.get( 0 ); else load = loads.get( 1 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) ); } - + @Test public void testForeachGroupBy() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = foreach A generate $0, $1, flatten($2);" + "C = group B by $0;" + "store C into 'output';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) ); } - + @Test public void testForeachSort() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + @@ -301,16 +317,18 @@ public class TestNewPlanPushDownForeachF "C = order B by $0, $1;" + "D = store C into 'dummy';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); - Operator sort = newLogicalPlan.getSuccessors( load ).get( 0 ); + Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); + Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 ); Assert.assertTrue( sort instanceof LOSort ); - Operator foreach = newLogicalPlan.getSuccessors( sort ).get( 0 ); + foreach = newLogicalPlan.getSuccessors( sort ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) ); } - + /** * Non-pure-projection, not optimizable. */ @@ -321,16 +339,18 @@ public class TestNewPlanPushDownForeachF "C = order B by $0, $1;" + "D = store C into 'dummy';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 ); Assert.assertTrue( sort instanceof LOSort ); } - - + + /** * If the flattened field is referenced in the sort condition, then no optimization can be done. */ @@ -341,7 +361,7 @@ public class TestNewPlanPushDownForeachF "C = order B by $0, $3;" + "D = store C into 'dummy';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); @@ -360,49 +380,55 @@ public class TestNewPlanPushDownForeachF "store C into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 ); Assert.assertTrue( sort instanceof LOSort ); } - + @Test public void testForeachUDFSort() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + "B = foreach A generate $0, $1, " + Identity.class.getName() + "($2) ;" + "C = order B by $0, $1;" + "store C into 'output';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 ); Assert.assertTrue( sort instanceof LOSort ); } - + @Test public void testForeachCastSort() throws Exception { String query = "A = load 'myfile' as (name, age, gpa);" + - "B = foreach A generate (chararray)$0, $1, flatten($2);" + + "B = foreach A generate (chararray)$0, $1, flatten($2);" + "C = order B by $0, $1;" + "store C into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 ); Assert.assertTrue( sort instanceof LOSort ); } - + @Test public void testForeachCross() throws Exception { String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" + @@ -413,13 +439,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -454,7 +480,7 @@ public class TestNewPlanPushDownForeachF Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -487,16 +513,16 @@ public class TestNewPlanPushDownForeachF "store F into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + // No optimization about foreach flatten. Operator store = newLogicalPlan.getSinks().get( 0 ); Operator limit = newLogicalPlan.getPredecessors(store).get(0); Operator cross = newLogicalPlan.getPredecessors(limit).get(0); Assert.assertTrue( cross instanceof LOCross ); } - + /** - * This actually is a valid case, even though the optimization may not provide any performance benefit. However, detecting + * This actually is a valid case, even though the optimization may not provide any performance benefit. However, detecting * such a case requires more coding. Thus, we allow optimization to go thru in this case. */ @Test @@ -509,13 +535,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -547,13 +573,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -571,7 +597,7 @@ public class TestNewPlanPushDownForeachF op = newLogicalPlan.getSuccessors( op ).get( 0 ); Assert.assertTrue( op instanceof LOLimit ); } - + /** * Cast should NOT matter to cross. This is a valid positive test case. */ @@ -585,13 +611,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -609,7 +635,7 @@ public class TestNewPlanPushDownForeachF op = newLogicalPlan.getSuccessors( op ).get( 0 ); Assert.assertTrue( op instanceof LOLimit ); } - + @Test public void testForeachFRJoin() throws Exception { String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" + @@ -619,7 +645,7 @@ public class TestNewPlanPushDownForeachF "E = limit D 10;" + "store E into 'output';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); List<Operator> loads = newLogicalPlan.getSources(); @@ -627,7 +653,7 @@ public class TestNewPlanPushDownForeachF Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -662,7 +688,7 @@ public class TestNewPlanPushDownForeachF Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -693,7 +719,7 @@ public class TestNewPlanPushDownForeachF "E = join B by $0, D by $0 using 'replicated';" + "F = limit E 10;" + "store F into 'output';"; - + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); // No optimization about foreach flatten. @@ -702,7 +728,7 @@ public class TestNewPlanPushDownForeachF Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 ); Assert.assertTrue( join instanceof LOJoin ); } - + /** * Valid positive test case, even though the benefit from the optimization is questionable. However, putting in additinal check for * this condition requires extra coding. @@ -717,13 +743,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -756,13 +782,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -795,13 +821,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -830,13 +856,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -854,7 +880,7 @@ public class TestNewPlanPushDownForeachF op = newLogicalPlan.getSuccessors( op ).get( 0 ); Assert.assertTrue( op instanceof LOLimit ); } - + @Test public void testForeachInnerJoin1() throws Exception { String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" + @@ -865,13 +891,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -911,9 +937,9 @@ public class TestNewPlanPushDownForeachF Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 ); Assert.assertTrue( join instanceof LOJoin ); } - + /** - * This is actually a valid positive test case, even though the benefit of such optimization is questionable. However, + * This is actually a valid positive test case, even though the benefit of such optimization is questionable. However, * checking for such condition requires additional coding effort. */ @Test @@ -926,13 +952,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -964,13 +990,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -1002,13 +1028,13 @@ public class TestNewPlanPushDownForeachF "store E into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + List<Operator> loads = newLogicalPlan.getSources(); Assert.assertTrue( loads.size() == 2 ); Assert.assertTrue( loads.get( 0 ) instanceof LOLoad ); Assert.assertTrue( loads.get( 1 ) instanceof LOLoad ); Operator op = null; - if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) + if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) op = loads.get( 0 ); else op = loads.get( 1 ); @@ -1045,7 +1071,7 @@ public class TestNewPlanPushDownForeachF Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 ); Assert.assertTrue( join instanceof LOJoin ); } - + // See PIG-1374 @Test public void testForeachRequiredField() throws Exception { @@ -1055,7 +1081,7 @@ public class TestNewPlanPushDownForeachF "store C into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); @@ -1065,7 +1091,7 @@ public class TestNewPlanPushDownForeachF Operator sort = newLogicalPlan.getSuccessors( foreach1 ).get( 0 ); Assert.assertTrue( sort instanceof LOSort ); } - + // See PIG-1706 @Test public void testForeachWithUserDefinedSchema() throws Exception { @@ -1076,13 +1102,13 @@ public class TestNewPlanPushDownForeachF "store d into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator store = newLogicalPlan.getSinks().get( 0 ); LOForEach foreach = (LOForEach)newLogicalPlan.getPredecessors(store).get(0); Assert.assertTrue(foreach.getSchema().getField(1).alias.equals("q1")); Assert.assertTrue(foreach.getSchema().getField(2).alias.equals("q2")); } - + // See PIG-1751 @Test public void testForeachWithUserDefinedSchema2() throws Exception { @@ -1093,7 +1119,7 @@ public class TestNewPlanPushDownForeachF "store d into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator store = newLogicalPlan.getSinks().get( 0 ); Operator op = newLogicalPlan.getPredecessors(store).get(0); Assert.assertTrue(op instanceof LOJoin); @@ -1112,7 +1138,7 @@ public class TestNewPlanPushDownForeachF Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); - Assert.assertTrue( "Field \"a1\" is dropped by ColumnMapKeyPrune" + + Assert.assertTrue( "Field \"a1\" is dropped by ColumnMapKeyPrune" + "even though it should be stored", ((LOLoad)load).getSchema().getField("a1") != null ); } @@ -1143,6 +1169,7 @@ public class TestNewPlanPushDownForeachF addPlanTransformListener(new ProjectionPatcher()); } + @Override protected List<Set<Rule>> buildRuleSets() { List<Set<Rule>> ls = new ArrayList<Set<Rule>>(); @@ -1178,24 +1205,25 @@ public class TestNewPlanPushDownForeachF protected MyPlanOptimizer(OperatorPlan p, int iterations) { super(p, iterations, new HashSet<String>()); } - - protected List<Set<Rule>> buildRuleSets() { + + @Override + protected List<Set<Rule>> buildRuleSets() { List<Set<Rule>> ls = new ArrayList<Set<Rule>>(); - + Set<Rule> s = new HashSet<Rule>(); // add split filter rule Rule r = new LoadTypeCastInserter( "TypeCastInserter" ); s.add(r); ls.add(s); - + s = new HashSet<Rule>(); r = new PushDownForEachFlatten( "PushDownForEachFlatten" ); - s.add(r); + s.add(r); ls.add(s); - + return ls; } - } + } private LogicalPlan migrateAndOptimizePlan(String query) throws Exception { PigServer pigServer = new PigServer( pc ); @@ -1212,16 +1240,18 @@ public class TestNewPlanPushDownForeachF "C = order B by $0, $1;" + "D = store C into 'dummy';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator load = newLogicalPlan.getSources().get( 0 ); Assert.assertTrue( load instanceof LOLoad ); Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 ); Assert.assertTrue( foreach instanceof LOForEach ); + foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 ); + Assert.assertTrue( foreach instanceof LOForEach ); Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 ); Assert.assertTrue( sort instanceof LOSort ); - + } - + @Test // See PIG-3826 public void testOuterJoin() throws Exception { @@ -1232,7 +1262,7 @@ public class TestNewPlanPushDownForeachF "t3 = join B by id LEFT OUTER, t2 by id;" + "store t3 into 'output';"; LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query ); - + Operator store = newLogicalPlan.getSinks().get( 0 ); Operator join = newLogicalPlan.getPredecessors(store).get(0); Assert.assertTrue( join instanceof LOJoin );
