Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld Thu Nov 27 12:49:54 2014 @@ -2,88 +2,88 @@ # There are 1 DAGs in the session #-------------------------------------------------- #-------------------------------------------------- -# TEZ DAG plan: scope-119 +# TEZ DAG plan: pig-0_scope-1 #-------------------------------------------------- -Tez vertex scope-109 -> Tez vertex scope-110, -Tez vertex scope-103 -> Tez vertex scope-110, -Tez vertex scope-110 - +Tez vertex scope-108 -> Tez vertex scope-109, +Tez vertex scope-102 -> Tez vertex scope-109, Tez vertex scope-109 + +Tez vertex scope-108 # Plan on vertex -POValueOutputTez - scope-113 -> [scope-110] +POValueOutputTez - scope-112 -> [scope-109] | -|---c: New For Each(false,false)[bag] - scope-89 +|---c: New For Each(false,false)[bag] - scope-88 | | - | Cast[int] - scope-84 + | Cast[int] - scope-83 | | - | |---Project[bytearray][1] - scope-83 + | |---Project[bytearray][1] - scope-82 | | - | Cast[chararray] - scope-87 + | Cast[chararray] - scope-86 | | - | |---Project[bytearray][0] - scope-86 + | |---Project[bytearray][0] - scope-85 | - |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-82 -Tez vertex scope-103 + |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-81 +Tez vertex scope-102 # Plan on vertex -1-12: Split - scope-121 +1-12: Split - scope-119 | | -| a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-77 +| a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-76 | | -| |---a2: Filter[bag] - scope-72 +| |---a2: Filter[bag] - scope-71 | | | -| | Not[boolean] - scope-76 +| | Not[boolean] - scope-75 | | | -| | |---Greater Than[boolean] - scope-75 +| | |---Greater Than[boolean] - scope-74 | | | -| | |---Project[int][0] - scope-73 +| | |---Project[int][0] - scope-72 | | | -| | |---Constant(100) - scope-74 +| | |---Constant(100) - scope-73 | | -| POValueOutputTez - scope-112 -> [scope-110] +| POValueOutputTez - scope-111 -> [scope-109] | | -| |---a1: Filter[bag] - scope-78 +| |---a1: Filter[bag] - scope-77 | | | -| | Greater Than[boolean] - scope-81 +| | Greater Than[boolean] - scope-80 | | | -| | |---Project[int][0] - scope-79 +| | |---Project[int][0] - scope-78 | | | -| | |---Constant(100) - scope-80 +| | |---Constant(100) - scope-79 | -|---a: New For Each(false,false)[bag] - scope-70 +|---a: New For Each(false,false)[bag] - scope-69 | | - | Cast[int] - scope-65 + | Cast[int] - scope-64 | | - | |---Project[bytearray][0] - scope-64 + | |---Project[bytearray][0] - scope-63 | | - | Cast[chararray] - scope-68 + | Cast[chararray] - scope-67 | | - | |---Project[bytearray][1] - scope-67 + | |---Project[bytearray][1] - scope-66 | - |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-63 -Tez vertex scope-110 + |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-62 +Tez vertex scope-109 # Plan on vertex -1-13: Split - scope-120 +1-13: Split - scope-118 | | -| d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-96 +| d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-95 | | -| |---d: Filter[bag] - scope-92 +| |---d: Filter[bag] - scope-91 | | | -| | Greater Than[boolean] - scope-95 +| | Greater Than[boolean] - scope-94 | | | -| | |---Project[int][0] - scope-93 +| | |---Project[int][0] - scope-92 | | | -| | |---Constant(500) - scope-94 +| | |---Constant(500) - scope-93 | | -| e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-102 +| e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-101 | | -| |---e: Filter[bag] - scope-97 +| |---e: Filter[bag] - scope-96 | | | -| | Not[boolean] - scope-101 +| | Not[boolean] - scope-100 | | | -| | |---Greater Than[boolean] - scope-100 +| | |---Greater Than[boolean] - scope-99 | | | -| | |---Project[int][0] - scope-98 +| | |---Project[int][0] - scope-97 | | | -| | |---Constant(500) - scope-99 +| | |---Constant(500) - scope-98 | -|---POShuffledValueInputTez - scope-111 <- [scope-109, scope-103] +|---POShuffledValueInputTez - scope-110 <- [scope-108, scope-102]
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld Thu Nov 27 12:49:54 2014 @@ -2,16 +2,16 @@ # There are 1 DAGs in the session #-------------------------------------------------- #-------------------------------------------------- -# TEZ DAG plan: scope-56 +# TEZ DAG plan: pig-0_scope-0 #-------------------------------------------------- -Tez vertex scope-40 -> Tez vertex group scope-59,Tez vertex group scope-60, -Tez vertex scope-46 -> Tez vertex group scope-59,Tez vertex group scope-60, +Tez vertex scope-40 -> Tez vertex group scope-58,Tez vertex group scope-59, +Tez vertex scope-46 -> Tez vertex group scope-58,Tez vertex group scope-59, Tez vertex group scope-59 -Tez vertex group scope-60 +Tez vertex group scope-58 Tez vertex scope-40 # Plan on vertex -1-2: Split - scope-58 +1-2: Split - scope-57 | | | a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-14 | | @@ -25,7 +25,7 @@ Tez vertex scope-40 | | | | | |---Constant(100) - scope-11 | | -| 1-3: Split - scope-61 +| 1-3: Split - scope-60 | | | | | d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-33 | | | @@ -70,7 +70,7 @@ Tez vertex scope-40 |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0 Tez vertex scope-46 # Plan on vertex -1-3: Split - scope-62 +1-3: Split - scope-61 | | | d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-33 | | @@ -107,5 +107,5 @@ Tez vertex scope-46 |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-19 Tez vertex group scope-59 <- [scope-40, scope-46] -> null # No plan on vertex group -Tez vertex group scope-60 <- [scope-40, scope-46] -> null +Tez vertex group scope-58 <- [scope-40, scope-46] -> null # No plan on vertex group Modified: pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java Thu Nov 27 12:49:54 2014 @@ -21,15 +21,20 @@ package org.apache.pig.tez; import static org.junit.Assert.assertEquals; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler; import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder; -import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor; -import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.LoaderProcessor; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.test.TestGroupConstParallel; import org.apache.pig.tools.pigstats.PigStats.JobGraph; -import org.apache.pig.tools.pigstats.tez.TezTaskStats; +import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.pig.tools.pigstats.tez.TezDAGStats; +import org.apache.pig.tools.pigstats.tez.TezScriptState; +import org.apache.pig.tools.pigstats.tez.TezVertexStats; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Vertex; import org.junit.Assume; @@ -46,7 +51,9 @@ public class TestGroupConstParallelTez e @Override public void checkGroupAllWithParallelGraphResult(JobGraph jGraph) { - TezTaskStats ts = (TezTaskStats)jGraph.getSinks().get(0); + TezDAGStats ds = (TezDAGStats) jGraph.getJobList().get(0); + jGraph = (JobGraph)ds.getPlan(); + TezVertexStats ts = (TezVertexStats)jGraph.getSinks().get(0); assertEquals(ts.getParallelism(), 1); } @@ -60,7 +67,7 @@ public class TestGroupConstParallelTez e ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc); parallelismSetter.visit(); - DAG tezDag = DAG.create("test"); + DAG tezDag = getTezDAG(tezPlan, pc); TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null); dagBuilder.visit(); for (Vertex v : tezDag.getVertices()) { @@ -80,7 +87,7 @@ public class TestGroupConstParallelTez e ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc); parallelismSetter.visit(); - DAG tezDag = DAG.create("test"); + DAG tezDag = getTezDAG(tezPlan, pc); TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null); dagBuilder.visit(); for (Vertex v : tezDag.getVertices()) { @@ -95,4 +102,13 @@ public class TestGroupConstParallelTez e comp.compile(); return comp.getTezPlan(); } + + private DAG getTezDAG(TezOperPlan tezPlan, PigContext pc) { + TezPlanContainerNode tezPlanNode = new TezPlanContainerNode(OperatorKey.genOpKey("DAGName"), tezPlan); + TezScriptState scriptState = new TezScriptState("test"); + ScriptState.start(scriptState); + scriptState.setDAGScriptInfo(tezPlanNode); + DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString()); + return tezDag; + } } Modified: pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java Thu Nov 27 12:49:54 2014 @@ -23,15 +23,19 @@ import static org.junit.Assert.assertTru import org.apache.hadoop.conf.Configuration; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler; import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder; -import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor; -import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.LoaderProcessor; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.test.TestJobSubmission; import org.apache.pig.test.Util; +import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.pig.tools.pigstats.tez.TezScriptState; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Vertex; @@ -57,7 +61,7 @@ public class TestJobSubmissionTez extend ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc); parallelismSetter.visit(); - DAG tezDag = DAG.create("test"); + DAG tezDag = getTezDAG(tezPlan, pc); TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null); try { dagBuilder.visit(); @@ -69,14 +73,14 @@ public class TestJobSubmissionTez extend @Override public void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception { TezOperPlan tezPlan = buildTezPlan(pp, pc); - + LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc); loaderStorer.visit(); ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc); parallelismSetter.visit(); - DAG tezDag = DAG.create("test"); + DAG tezDag = getTezDAG(tezPlan, pc); TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null); dagBuilder.visit(); for (Vertex v : tezDag.getVertices()) { @@ -96,4 +100,13 @@ public class TestJobSubmissionTez extend comp.compile(); return comp.getTezPlan(); } + + private DAG getTezDAG(TezOperPlan tezPlan, PigContext pc) { + TezPlanContainerNode tezPlanNode = new TezPlanContainerNode(OperatorKey.genOpKey("DAGName"), tezPlan); + TezScriptState scriptState = new TezScriptState("test"); + ScriptState.start(scriptState); + scriptState.setDAGScriptInfo(tezPlanNode); + DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString()); + return tezDag; + } } Modified: pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java Thu Nov 27 12:49:54 2014 @@ -20,10 +20,10 @@ package org.apache.pig.tez; import org.apache.pig.PigConfiguration; import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.CombinerOptimizer; -import org.apache.pig.backend.hadoop.executionengine.tez.SecondaryKeyOptimizerTez; -import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler; -import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.CombinerOptimizer; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.SecondaryKeyOptimizerTez; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.test.MiniGenericCluster; import org.apache.pig.test.TestSecondarySort; @@ -47,12 +47,12 @@ public class TestSecondarySortTez extend TezCompiler comp = new TezCompiler(pp, pc); TezOperPlan tezPlan = comp.compile(); boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty( - PigConfiguration.PROP_NO_COMBINER, "false")); + PigConfiguration.PIG_EXEC_NO_COMBINER, "false")); // Run CombinerOptimizer on Tez plan if (!nocombiner) { boolean doMapAgg = Boolean.parseBoolean(pc.getProperties() - .getProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, + .getProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "false")); CombinerOptimizer co = new CombinerOptimizer(tezPlan, doMapAgg); co.visit(); Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java Thu Nov 27 12:49:54 2014 @@ -18,10 +18,15 @@ package org.apache.pig.tez; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Iterator; +import java.util.List; import java.util.Properties; import java.util.Random; @@ -33,6 +38,9 @@ import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.test.MiniGenericCluster; import org.apache.pig.test.Util; import org.junit.After; @@ -98,7 +106,7 @@ public class TestTezAutoParallelism { } w.close(); Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1); - + w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2)); for (String name : boyNames) { w.println(name + "\t" + "M"); @@ -119,13 +127,14 @@ public class TestTezAutoParallelism { // parallelism is 3 originally, reduce to 1 pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = group A by name;"); pigServer.store("B", "output1"); FileSystem fs = cluster.getFileSystem(); FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){ + @Override public boolean accept(Path path) { if (path.getName().startsWith("part")) { return true; @@ -141,7 +150,7 @@ public class TestTezAutoParallelism { // order by parallelism is 3 originally, reduce to 1 pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = group A by name parallel 3;"); @@ -150,6 +159,7 @@ public class TestTezAutoParallelism { pigServer.store("D", "output2"); FileSystem fs = cluster.getFileSystem(); FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){ + @Override public boolean accept(Path path) { if (path.getName().startsWith("part")) { return true; @@ -173,6 +183,7 @@ public class TestTezAutoParallelism { pigServer.store("D", "output3"); FileSystem fs = cluster.getFileSystem(); FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){ + @Override public boolean accept(Path path) { if (path.getName().startsWith("part")) { return true; @@ -188,7 +199,7 @@ public class TestTezAutoParallelism { // skewed join parallelism is 4 originally, reduce to 1 pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); @@ -196,6 +207,7 @@ public class TestTezAutoParallelism { pigServer.store("C", "output4"); FileSystem fs = cluster.getFileSystem(); FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){ + @Override public boolean accept(Path path) { if (path.getName().startsWith("part")) { return true; @@ -218,6 +230,7 @@ public class TestTezAutoParallelism { pigServer.store("C", "output5"); FileSystem fs = cluster.getFileSystem(); FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){ + @Override public boolean accept(Path path) { if (path.getName().startsWith("part")) { return true; @@ -225,6 +238,40 @@ public class TestTezAutoParallelism { return false; } }); - assertEquals(files.length, 5); + assertEquals(files.length, 4); + } + + @Test + public void testSkewedJoinIncreaseIntermediateParallelism() throws IOException{ + NodeIdGenerator.reset(); + PigServer.resetScope(); + StringWriter writer = new StringWriter(); + // When there is a combiner operation involved user specified parallelism is overriden + Util.createLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism", writer); + try { + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000"); + pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); + pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); + pigServer.registerQuery("C = join A by name, B by name using 'skewed' parallel 1;"); + pigServer.registerQuery("D = group C by A::name;"); + pigServer.registerQuery("E = foreach D generate group, COUNT(C.A::name);"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + List<Tuple> expectedResults = Util + .getTuplesFromConstantTupleStrings(new String[] { + "('Abigail',56L)", "('Alexander',45L)", "('Ava',60L)", + "('Daniel',68L)", "('Elizabeth',42L)", + "('Emily',57L)", "('Emma',50L)", "('Ethan',50L)", + "('Isabella',43L)", "('Jacob',43L)", "('Jayden',59L)", + "('Liam',46L)", "('Madison',46L)", "('Mason',54L)", + "('Mia',51L)", "('Michael',47L)", "('Noah',38L)", + "('Olivia',50L)", "('Sophia',52L)", "('William',43L)" }); + + Util.checkQueryOutputsAfterSort(iter, expectedResults); + assertTrue(writer.toString().contains("Increased requested parallelism of scope-40 to 4")); + } finally { + Util.removeLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism"); + } } } Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Thu Nov 27 12:49:54 2014 @@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.PrintStream; -import java.util.Map; import java.util.Properties; import org.apache.pig.PigConfiguration; @@ -32,13 +31,10 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType; -import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainer; -import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainerNode; -import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainerPrinter; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.NodeIdGenerator; -import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.test.Util; import org.apache.pig.test.utils.TestHelper; import org.junit.AfterClass; @@ -77,8 +73,8 @@ public class TestTezCompiler { @Before public void setUp() throws ExecException { resetScope(); - pc.getProperties().remove(PigConfiguration.OPT_MULTIQUERY); - pc.getProperties().remove(PigConfiguration.TEZ_OPT_UNION); + pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY); + pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION); pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY); pigServer = new PigServer(pc); } @@ -86,6 +82,7 @@ public class TestTezCompiler { private void resetScope() { NodeIdGenerator.reset(); PigServer.resetScope(); + TezPlanContainer.resetScope(); } @Test @@ -96,7 +93,7 @@ public class TestTezCompiler { "c = foreach b generate y;" + "store c into 'file:///tmp/output';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld"); } @Test @@ -107,7 +104,7 @@ public class TestTezCompiler { "c = foreach b generate group, COUNT(a.x);" + "store c into 'file:///tmp/output';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld"); } @Test @@ -119,7 +116,7 @@ public class TestTezCompiler { "d = foreach c generate a::x as x, y, z;" + "store d into 'file:///tmp/output';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld"); } @Test @@ -131,7 +128,7 @@ public class TestTezCompiler { "d = foreach c generate a::x as x, y, z;" + "store d into 'file:///tmp/output';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld"); } @Test @@ -142,7 +139,31 @@ public class TestTezCompiler { "c = foreach b generate y;" + "store c into 'file:///tmp/output';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld"); + } + + @Test + public void testLimitOrderby() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:int);" + + "b = order a by x, y;" + + "c = limit b 10;" + + "store c into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld"); + } + + @Test + public void testLimitScalarOrderby() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:int);" + + "b = order a by x, y;" + + "g = group a all;" + + "h = foreach g generate COUNT(a) as sum;" + + "c = limit b h.sum/2;" + + "store c into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld"); } @Test @@ -153,7 +174,7 @@ public class TestTezCompiler { "c = foreach b generate y;" + "store c into 'file:///tmp/output';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld"); } @Test @@ -164,7 +185,7 @@ public class TestTezCompiler { "c = foreach b { d = distinct a; generate COUNT(d); };" + "store c into 'file:///tmp/output';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld"); } @@ -178,7 +199,7 @@ public class TestTezCompiler { "d = join a by x, b by x, c by x using 'replicated';" + "store d into 'file:///tmp/output/d';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC10.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld"); } @Test @@ -191,7 +212,7 @@ public class TestTezCompiler { "d = join b1 by group, c by x using 'replicated';" + "store d into 'file:///tmp/output/e';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC11.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld"); } @Test @@ -201,7 +222,7 @@ public class TestTezCompiler { "b = stream a through `stream.pl -n 5`;" + "STORE b INTO 'file:///tmp/output';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC12.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld"); } @Test @@ -212,11 +233,11 @@ public class TestTezCompiler { "c = foreach b { d = limit a 10; e = order d by $1; f = order e by $0; generate group, f;};"+ "store c INTO 'file:///tmp/output';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC14.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld"); // With optimization turned off setProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, "true"); - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC15.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-2.gld"); } @Test @@ -226,7 +247,7 @@ public class TestTezCompiler { "b = order a by x;" + "STORE b INTO 'file:///tmp/output';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld"); } // PIG-3759, PIG-3781 @@ -240,7 +261,7 @@ public class TestTezCompiler { "d = foreach c generate group, COUNT(a.y), COUNT(b.z);" + "store d into 'file:///tmp/output/d';"; - run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC18.gld"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld"); } @Test @@ -252,9 +273,9 @@ public class TestTezCompiler { "store c into 'file:///tmp/output/c';" + "store d into 'file:///tmp/output/d';"; - setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld"); - setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld"); } @@ -288,9 +309,9 @@ public class TestTezCompiler { "store f1 into 'file:///tmp/output/f1';" + "store f2 into 'file:///tmp/output/f2';"; - setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld"); - setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld"); } @@ -305,9 +326,9 @@ public class TestTezCompiler { "store b into 'file:///tmp/output/b';" + "store c into 'file:///tmp/output/c';"; - setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld"); - setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3-OPTOFF.gld"); } @@ -323,9 +344,9 @@ public class TestTezCompiler { "store d into 'file:///tmp/output/d';" + "store e into 'file:///tmp/output/e';"; - setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld"); - setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4-OPTOFF.gld"); } @@ -342,9 +363,9 @@ public class TestTezCompiler { "e = foreach d GENERATE a::x, a::y;" + "store e into 'file:///tmp/output/e';"; - setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld"); - setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5-OPTOFF.gld"); } @@ -356,9 +377,9 @@ public class TestTezCompiler { "c = union onschema a, b;" + "store c into 'file:///tmp/output';"; - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld"); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); } @@ -372,9 +393,9 @@ public class TestTezCompiler { "e = foreach d generate group, SUM(c.y);" + "store e into 'file:///tmp/output';"; - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld"); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld"); } @@ -388,9 +409,9 @@ public class TestTezCompiler { "e = join c by x, d by x;" + "store e into 'file:///tmp/output';"; - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld"); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld"); } @@ -406,9 +427,9 @@ public class TestTezCompiler { "store e into 'file:///tmp/output';"; //TODO: PIG-3856 Not optimized - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld"); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld"); query = @@ -420,9 +441,9 @@ public class TestTezCompiler { "store e into 'file:///tmp/output';"; // Optimized - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld"); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld"); } @@ -436,9 +457,9 @@ public class TestTezCompiler { "e = join c by x, d by x using 'skewed';" + "store e into 'file:///tmp/output';"; - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld"); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld"); } @@ -451,9 +472,9 @@ public class TestTezCompiler { "d = order c by x;" + "store d into 'file:///tmp/output';"; - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld"); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld"); } @@ -467,9 +488,9 @@ public class TestTezCompiler { "d = limit c 1;" + "store d into 'file:///tmp/output';"; - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld"); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld"); } @@ -485,9 +506,9 @@ public class TestTezCompiler { "store d into 'file:///tmp/output/d';" + "store e into 'file:///tmp/output/e';"; - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld"); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld"); } @@ -502,10 +523,10 @@ public class TestTezCompiler { "f = group e by x;" + "store f into 'file:///tmp/output';"; - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld"); resetScope(); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld"); } @@ -520,10 +541,10 @@ public class TestTezCompiler { "e = union onschema c, d;" + "store e into 'file:///tmp/output';"; - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld"); resetScope(); - setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11-OPTOFF.gld"); } @@ -557,10 +578,6 @@ public class TestTezCompiler { TezLauncher launcher = new TezLauncher(); pc.inExplain = true; TezPlanContainer tezPlanContainer = launcher.compile(pp, pc); - for (Map.Entry<OperatorKey,TezPlanContainerNode> entry : tezPlanContainer.getKeys().entrySet()) { - TezOperPlan tezPlan = entry.getValue().getNode(); - TezLauncher.optimize(tezPlan, pc); - } ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos); Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java Thu Nov 27 12:49:54 2014 @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEqu import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -41,20 +40,22 @@ import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.MultiQueryOptimizerTez; -import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler; import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler; +import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType; -import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator; -import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor; -import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; import org.apache.pig.builtin.PigStorage; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.util.Pair; import org.apache.pig.test.Util; import org.apache.pig.test.junit.OrderedJUnit4Runner; import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder; +import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.pig.tools.pigstats.tez.TezScriptState; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Vertex; import org.junit.AfterClass; @@ -190,7 +191,7 @@ public class TestTezJobControlCompiler { @Override public List<InputSplit> getSplits(JobContext job) throws IOException { String inputDir = job.getConfiguration().get(INPUT_DIR, ""); - String numSplitString = inputDir.substring(inputDir.lastIndexOf(File.separator)+1); + String numSplitString = inputDir.substring(inputDir.lastIndexOf("/")+1); int numSplit = Integer.parseInt(numSplitString); List<InputSplit> splits = new ArrayList<InputSplit>(); for (int i=0;i<numSplit;i++) { @@ -254,7 +255,7 @@ public class TestTezJobControlCompiler { Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString()); assertEquals(leafVertex.getParallelism(), 15); } - + @Test public void testTezParallelismEstimatorSplitBranch() throws Exception{ pc.getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); @@ -270,7 +271,7 @@ public class TestTezJobControlCompiler { Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString()); assertEquals(leafVertex.getParallelism(), 7); } - + @Test public void testTezParallelismDefaultParallelism() throws Exception{ pc.getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); @@ -289,14 +290,15 @@ public class TestTezJobControlCompiler { PhysicalPlan pp = Util.buildPp(pigServer, query); TezCompiler comp = new TezCompiler(pp, pc); TezOperPlan tezPlan = comp.compile(); + TezLauncher.processLoadAndParallelism(tezPlan, pc); + + TezPlanContainerNode tezPlanNode = new TezPlanContainerNode(OperatorKey.genOpKey("DAGName"), tezPlan); + TezScriptState scriptState = new TezScriptState("test"); + ScriptState.start(scriptState); + scriptState.setDAGScriptInfo(tezPlanNode); + TezJobCompiler jobComp = new TezJobCompiler(pc, new Configuration()); - MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan); - mqOptimizer.visit(); - LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc); - loaderStorer.visit(); - ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc); - parallelismSetter.visit(); - DAG dag = jobComp.buildDAG(tezPlan, new HashMap<String, LocalResource>()); + DAG dag = jobComp.buildDAG(tezPlanNode, new HashMap<String, LocalResource>()); return new Pair<TezOperPlan, DAG>(tezPlan, dag); } } Modified: pig/branches/spark/test/tez-tests URL: http://svn.apache.org/viewvc/pig/branches/spark/test/tez-tests?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/tez-tests (original) +++ pig/branches/spark/test/tez-tests Thu Nov 27 12:49:54 2014 @@ -1,3 +1,4 @@ +**/TestStreamingUDF.java **/TestAccumulator.java **/TestAlgebraicEval.java **/TestBZip.java @@ -11,11 +12,11 @@ **/TestCustomPartitioner.java **/TestEvalPipeline.java **/TestEvalPipeline2.java +**/TestFRJoin.java +**/TestFRJoinNullValue.java **/TestFilterUDF.java **/TestFinish.java **/TestForEachNestedPlan.java -**/TestFRJoin.java -**/TestFRJoinNullValue.java **/TestGrunt.java **/TestImplicitSplit.java **/TestInputOutputMiniClusterFileValidator.java @@ -29,11 +30,14 @@ **/TestMapReduce.java **/TestMapSideCogroup.java **/TestMapReduce2.java +**/TestMergeJoin.java **/TestMergeJoinOuter.java +**/TestNativeMapReduce.java **/TestNestedForeach.java **/TestNewPlanImplicitSplit.java **/TestParser.java **/TestPigContext.java +**/TestPigProgressReporting.java **/TestPigServer.java **/TestPigServerWithMacros.java **/TestPigSplit.java @@ -52,21 +56,14 @@ **/TestStoreInstances.java **/TestStoreOld.java **/TestStreaming.java -**/TestStreamingUDF.java **/TestToolsPigServer.java **/TestUDF.java **/TestUDFContext.java +**/TestGroupConstParallelTez.java +**/TestJobSubmissionTez.java +**/TestLoaderStorerShipCacheFilesTez.java **/TestSecondarySortTez.java **/TestTezAutoParallelism.java **/TestTezCompiler.java **/TestTezJobControlCompiler.java **/TestTezLauncher.java -**/TestAccumuloPigCluster.java -**/TestBigTypeSort.java -**/TestCurrentTime.java -**/TestInvokerGenerator.java -**/TestGroupConstParallelTez.java -**/TestJobSubmissionTez.java -**/TestMergeJoin.java -**/TestNativeMapReduce.java -**/TestPigProgressReporting.java
