Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java Fri Feb 24 03:34:37 2017 @@ -36,7 +36,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; @@ -48,7 +47,6 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.test.MiniGenericCluster; import org.apache.pig.test.Util; -import org.apache.tez.dag.api.TezConfiguration; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -64,23 +62,12 @@ public class TestTezAutoParallelism { private static Properties properties; private static MiniGenericCluster cluster; - private static final PathFilter PART_FILE_FILTER = new PathFilter() { - @Override - public boolean accept(Path path) { - if (path.getName().startsWith("part")) { - return true; - } - return false; - } - }; - @BeforeClass public static void oneTimeSetUp() throws Exception { cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ); properties = cluster.getProperties(); //Test spilling to disk as tests here have multiple splits properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, "10"); - properties.setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false"); createFiles(); } @@ -97,11 +84,6 @@ public class TestTezAutoParallelism { @After public void tearDown() throws Exception { - removeProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION); - removeProperty(MRConfiguration.MAX_SPLIT_SIZE); - removeProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM); - removeProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART); - removeProperty(TezConfiguration.TEZ_AM_LOG_LEVEL); pigServer.shutdown(); pigServer = null; } @@ -149,53 +131,32 @@ public class TestTezAutoParallelism { @Test public void testGroupBy() throws IOException{ // parallelism is 3 originally, reduce to 1 - setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = group A by name;"); pigServer.store("B", "output1"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER); + FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){ + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }); assertEquals(files.length, 1); - fs.delete(new Path("output1"), true); - } - - @Test - public void testBytesPerReducer() throws IOException{ - - NodeIdGenerator.reset(); - PigServer.resetScope(); - - setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000"); - - StringWriter writer = new StringWriter(); - Util.createLogAppender("testAutoParallelism", writer, TezDagBuilder.class); - try { - pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); - pigServer.registerQuery("B = group A by name;"); - pigServer.store("B", "output1"); - FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER); - assertEquals(files.length, 10); - String log = writer.toString(); - assertTrue(log.contains("For vertex - scope-13: parallelism=3")); - assertTrue(log.contains("For vertex - scope-14: parallelism=10")); - } finally { - Util.removeLogAppender("testAutoParallelism", TezDagBuilder.class); - Util.deleteFile(cluster, "output1"); - } } @Test public void testOrderbyDecreaseParallelism() throws IOException{ // order by parallelism is 3 originally, reduce to 1 - setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = group A by name parallel 3;"); @@ -203,54 +164,86 @@ public class TestTezAutoParallelism { pigServer.registerQuery("D = order C by age;"); pigServer.store("D", "output2"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output2"), PART_FILE_FILTER); + FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){ + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }); assertEquals(files.length, 1); } @Test public void testOrderbyIncreaseParallelism() throws IOException{ // order by parallelism is 3 originally, increase to 4 - setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000"); + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000"); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = group A by name parallel 3;"); pigServer.registerQuery("C = foreach B generate group as name, AVG(A.age) as age;"); pigServer.registerQuery("D = order C by age;"); pigServer.store("D", "output3"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output3"), PART_FILE_FILTER); + FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){ + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }); assertEquals(files.length, 4); } @Test public void testSkewedJoinDecreaseParallelism() throws IOException{ // skewed join parallelism is 4 originally, reduce to 1 - setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("C = join A by name, B by name using 'skewed';"); pigServer.store("C", "output4"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output4"), PART_FILE_FILTER); + FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){ + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }); assertEquals(files.length, 1); } @Test public void testSkewedJoinIncreaseParallelism() throws IOException{ // skewed join parallelism is 3 originally, increase to 5 - setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("C = join A by name, B by name using 'skewed';"); pigServer.store("C", "output5"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER); + FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){ + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }); assertEquals(files.length, 5); } @@ -258,15 +251,23 @@ public class TestTezAutoParallelism { public void testSkewedFullJoinIncreaseParallelism() throws IOException{ // skewed full join parallelism take the initial setting, since the join vertex has a broadcast(sample) dependency, // which prevent it changing parallelism - setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("C = join A by name full, B by name using 'skewed';"); pigServer.store("C", "output6"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER); + FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){ + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }); assertEquals(files.length, 5); } @@ -274,9 +275,9 @@ public class TestTezAutoParallelism { public void testSkewedJoinIncreaseParallelismWithScalar() throws IOException{ // skewed join parallelism take the initial setting, since the join vertex has a broadcast(scalar) dependency, // which prevent it changing parallelism - setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("C = join A by name, B by name using 'skewed';"); @@ -286,29 +287,19 @@ public class TestTezAutoParallelism { pigServer.registerQuery("G = foreach C generate age/F.count, gender;"); pigServer.store("G", "output7"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output7"), PART_FILE_FILTER); + FileStatus[] files = fs.listStatus(new Path("output7"), new PathFilter(){ + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }); assertEquals(files.length, 4); } @Test - public void testSkewedJoinRightInputAutoParallelism() throws IOException{ - setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); - setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "1.0"); - setProperty(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG"); - pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); - pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); - pigServer.registerQuery("B = FILTER B by name == 'Noah';"); - pigServer.registerQuery("B1 = group B by name;"); - pigServer.registerQuery("C = join A by name, B1 by group using 'skewed';"); - pigServer.store("C", "output8"); - FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output8"), PART_FILE_FILTER); - assertEquals(5, files.length); - } - - @Test public void testFlattenParallelism() throws IOException{ String outputDir = "/tmp/testFlattenParallelism"; String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);" @@ -395,9 +386,9 @@ public class TestTezAutoParallelism { // When there is a combiner operation involved user specified parallelism is overriden Util.createLogAppender("testAutoParallelism", writer, classesToLog); try { - setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000"); - setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000"); + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000"); pigServer.setBatchOn(); pigServer.registerScript(new ByteArrayInputStream(script.getBytes())); pigServer.executeBatch(); @@ -425,12 +416,4 @@ public class TestTezAutoParallelism { Util.deleteFile(cluster, outputDir); } } - - private void setProperty(String property, String value) { - pigServer.getPigContext().getProperties().setProperty(property, value); - } - - private void removeProperty(String property) { - pigServer.getPigContext().getProperties().remove(property); - } }
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Fri Feb 24 03:34:37 2017 @@ -20,21 +20,13 @@ package org.apache.pig.tez; import static org.junit.Assert.assertEquals; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.IOException; import java.io.PrintStream; import java.util.Properties; -import java.util.Random; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; -import org.apache.pig.StoreFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; @@ -43,9 +35,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter; import org.apache.pig.builtin.OrcStorage; import org.apache.pig.builtin.PigStorage; -import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat; import org.apache.pig.test.Util; @@ -76,14 +66,11 @@ public class TestTezCompiler { @BeforeClass public static void setUpBeforeClass() throws Exception { - resetFileLocalizer(); pc = new PigContext(new TezLocalExecType(), new Properties()); - FileUtils.deleteDirectory(new File("/tmp/pigoutput")); } @AfterClass public static void tearDownAfterClass() throws Exception { - resetFileLocalizer(); } @Before @@ -92,7 +79,6 @@ public class TestTezCompiler { pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY); pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION); pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY); - pc.getProperties().remove(PigConfiguration.PIG_BLOOMJOIN_STRATEGY); pigServer = new PigServer(pc); } @@ -102,20 +88,13 @@ public class TestTezCompiler { TezPlanContainer.resetScope(); } - private static void resetFileLocalizer() { - FileLocalizer.deleteTempFiles(); - FileLocalizer.setInitialized(false); - // Set random seed to generate deterministic temporary paths - FileLocalizer.setR(new Random(1331L)); - } - @Test public void testStoreLoad() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + - "store a into 'file:///tmp/pigoutput';" + - "b = load 'file:///tmp/pigoutput' as (x:int, y:int);" + - "store b into 'file:///tmp/pigoutput1';"; + "store a into 'file:///tmp/output';" + + "b = load 'file:///tmp/output' as (x:int, y:int);" + + "store b into 'file:///tmp/output1';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-1.gld"); } @@ -124,85 +103,25 @@ public class TestTezCompiler { public void testStoreLoadMultiple() throws Exception { String query = "a = load 'file:///tmp/input';" + - "store a into 'file:///tmp/pigoutput/Dir1';" + - "a = load 'file:///tmp/pigoutput/Dir1';" + - "store a into 'file:///tmp/pigoutput/Dir2' using BinStorage();" + - "a = load 'file:///tmp/pigoutput/Dir1';" + - "store a into 'file:///tmp/pigoutput/Dir3';" + - "a = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" + - "store a into 'file:///tmp/pigoutput/Dir4';" + - "a = load 'file:///tmp/pigoutput/Dir3';" + - "b = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" + - "c = load 'file:///tmp/pigoutput/Dir1';" + + "store a into 'file:///tmp/output/Dir1';" + + "a = load 'file:///tmp/output/Dir1';" + + "store a into 'file:///tmp/output/Dir2' using BinStorage();" + + "a = load 'file:///tmp/output/Dir1';" + + "store a into 'file:///tmp/output/Dir3';" + + "a = load 'file:///tmp/output/Dir2' using BinStorage();" + + "store a into 'file:///tmp/output/Dir4';" + + "a = load 'file:///tmp/output/Dir3';" + + "b = load 'file:///tmp/output/Dir2' using BinStorage();" + + "c = load 'file:///tmp/output/Dir1';" + "d = cogroup a by $0, b by $0, c by $0;" + - "store d into 'file:///tmp/pigoutput/Dir5';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld"); - } - - @Test - public void testStoreLoadJoinMultiple() throws Exception { - // Case where different store load statements are used in a single join - String query = - "a = load 'file:///tmp/pigoutput/Dir1';" + - "b = filter a by $0 == 1;" + - "c = filter a by $0 == 2;" + - "store b into 'file:///tmp/pigoutput/Dir2';" + - "store c into 'file:///tmp/pigoutput/Dir3';" + - "d = load 'file:///tmp/pigoutput/Dir2';" + - "e = load 'file:///tmp/pigoutput/Dir3';" + - "f = join d by $0, e by $0;" + - "store f into 'file:///tmp/pigoutput/Dir5';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-3.gld"); - - resetScope(); - query = - "a = load 'file:///tmp/pigoutput/Dir1';" + - "b = distinct a;" + - "c = group a by $0;" + - "store b into 'file:///tmp/pigoutput/Dir2';" + - "store c into 'file:///tmp/pigoutput/Dir3';" + - "d = load 'file:///tmp/pigoutput/Dir2';" + - "e = load 'file:///tmp/pigoutput/Dir3';" + - "f = load 'file:///tmp/pigoutput/Dir4';" + - "g = join d by $0, f by $0 using 'repl';" + - "h = join e by $0, f by $0 using 'repl';" + - "store g into 'file:///tmp/pigoutput/Dir4';" + - "store h into 'file:///tmp/pigoutput/Dir5';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-4.gld"); - } - - @Test - public void testStoreLoadSplit() throws Exception { - // Cases where segmenting into two DAGs is not straight forward due to Split. - // The Split operator is required in both the segments. + "store d into 'file:///tmp/output/Dir5';"; - resetFileLocalizer(); - // Split operator as root vertex - String query = - "a = load 'file:///tmp/input';" + - "a1 = filter a by $0 == 5;" + - "store a1 into 'file:///tmp/pigoutput/Dir1';" + - "b = load 'file:///tmp/pigoutput/Dir1';" + - "c = join a by $0, b by $0;" + - "store c into 'file:///tmp/pigoutput/Dir2';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-5.gld"); - - // Split operator as intermediate vertex - query = - "a = load 'file:///tmp/input';" + - "a = distinct a;" + - "store a into 'file:///tmp/pigoutput/Dir1';" + - "b = load 'file:///tmp/pigoutput/Dir1';" + - "c = join a by $0, b by $0;" + - "store c into 'file:///tmp/pigoutput/Dir2';"; - - resetScope(); - resetFileLocalizer(); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld"); + // To get around difference in ordering of operators in plan due to JDK7 and JDK8 + if (System.getProperties().getProperty("java.version").startsWith("1.8")) { + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld"); + } else { + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld"); + } } @Test @@ -210,7 +129,7 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = native 'hadoop-examples.jar' Store a into '/tmp/table_testNativeMRJobSimple_input' Load '/tmp/table_testNativeMRJobSimple_output' `wordcount /tmp/table_testNativeMRJobSimple_input /tmp/table_testNativeMRJobSimple_output`;" + - "store b into 'file:///tmp/pigoutput';"; + "store b into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Native-1.gld"); } @@ -221,7 +140,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = filter a by x > 0;" + "c = foreach b generate y;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld"); } @@ -232,7 +151,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = group a by x;" + "c = foreach b generate group, COUNT(a.x);" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld"); } @@ -244,131 +163,12 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = join a by x, b by x;" + "d = foreach c generate a::x as x, y, z;" + - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld"); } @Test - public void testBloomJoin() throws Exception { - String query = - "a = load 'file:///tmp/input1' as (x, y:int);" + - "b = load 'file:///tmp/input2' as (x, z:int);" + - "c = load 'file:///tmp/input2' as (x, w:int);" + - "d = join b by x, a by x, c by x using 'bloom';" + - "e = foreach d generate a::x as x, y, z, w;" + - "store e into 'file:///tmp/pigoutput';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld"); - } - - @Test - public void testBloomJoinLeftOuter() throws Exception { - String query = - "a = load 'file:///tmp/input1' as (x:chararray, y:int);" + - "b = load 'file:///tmp/input2' as (x:chararray, z:int);" + - "d = join a by x left, b by x using 'bloom';" + - "e = foreach d generate a::x as x, y, z;" + - "store e into 'file:///tmp/pigoutput';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld"); - } - - @Test - public void testBloomJoinUnion() throws Exception { - // Left input from a union - String query = - "a = load 'file:///tmp/input1' as (x:int, y:int);" + - "b = load 'file:///tmp/input2' as (x:int, z:int);" + - "c = load 'file:///tmp/input3' as (x:int, z:int);" + - "b = union b, c;" + - "d = join a by x, b by x using 'bloom';" + - "e = foreach d generate a::x as x, y, z;" + - "store e into 'file:///tmp/pigoutput';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld"); - setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null); - - resetScope(); - // Right input from a union - query = - "a = load 'file:///tmp/input1' as (x:int, y:int);" + - "b = load 'file:///tmp/input2' as (x:int, z:int);" + - "c = load 'file:///tmp/input3' as (x:int, z:int);" + - "b = union b, c;" + - "d = join b by x, a by x using 'bloom';" + - "e = foreach d generate a::x as x, y, z;" + - "store e into 'file:///tmp/pigoutput';"; - - // Needs shared edges and PIG-3856 to be a more optimial plan - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld"); - } - - @Test - public void testBloomJoinSplit() throws Exception { - // Left input from a split - String query = - "a = load 'file:///tmp/input1' as (x:int, y:int);" + - "b = load 'file:///tmp/input2' as (x:int, z:int);" + - "a1 = filter a by x == 3;" + - "a2 = filter a by x == 4;" + - "d = join a1 by x, a2 by x, b by x using 'bloom';" + - "e = foreach d generate a1::x as x, a1::y as y1, a2::y as y2, z;" + - "store e into 'file:///tmp/pigoutput';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld"); - setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null); - - resetScope(); - // Right input from a split - query = - "a = load 'file:///tmp/input1' as (x:int, y:int);" + - "b = load 'file:///tmp/input2' as (x:int, z:int);" + - "a1 = filter a by x == 3;" + - "a2 = filter a by x == 4;" + - "d = join b by x, a1 by x using 'bloom';" + - "e = foreach d generate a1::x as x, y, z;" + - "store a2 into 'file:///tmp/pigoutput/a2';" + - "store e into 'file:///tmp/pigoutput/e';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld"); - } - - @Test - public void testBloomSelfJoin() throws Exception { - String query = - "a = load 'file:///tmp/input1' as (x:int, y:int);" + - "b = filter a by x < 5;" + - "c = filter a by x == 10;" + - "d = filter a by x > 10;" + - "e = join b by x, c by x, d by x using 'bloom';" + - "store e into 'file:///tmp/pigoutput';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld"); - } - - @Test public void testSelfJoin() throws Exception { String query = "a = load 'file:///tmp/input1' as (x:int, y:int);" + @@ -376,7 +176,7 @@ public class TestTezCompiler { "c = filter a by x == 10;" + "d = filter a by x > 10;" + "e = join b by x, c by x, d by x;" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-1.gld"); } @@ -388,7 +188,7 @@ public class TestTezCompiler { "b = filter a by x < 5;" + "c = filter a by x == 10;" + "d = join b by x, c by x using 'skewed';" + - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld"); } @@ -401,7 +201,7 @@ public class TestTezCompiler { "c = filter a by x == 10;" + "d = filter a by x > 10;" + "e = join b by x, c by x, d by x using 'replicated';" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-3.gld"); } @@ -413,7 +213,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = union a, b;" + "d = join b by x, c by x using 'replicated';" + - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld"); } @@ -426,7 +226,7 @@ public class TestTezCompiler { "a2 = filter a by x < 2;" + "b = union a1, a2;" + "c = join b by x, a by x;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld"); } @@ -442,7 +242,7 @@ public class TestTezCompiler { "a5 = foreach a4 generate a2::x as x, a3::y as y;" + "b = union a1, a5;" + "c = join b by x, a by x;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld"); } @@ -454,7 +254,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = cross a, b;" + "d = foreach c generate a::x as x, y, z;" + - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-1.gld"); } @@ -466,7 +266,7 @@ public class TestTezCompiler { "b = filter a by x < 5;" + "c = filter a by x == 10;" + "d = cross b, c;" + - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-2.gld"); } @@ -478,7 +278,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = cross b, a;" + "d = foreach c generate a.x, a.y, z;" + //Scalar - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld"); } @@ -490,7 +290,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = join a by x, b by x using 'skewed';" + "d = foreach c generate a::x as x, y, z;" + - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld"); } @@ -503,7 +303,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = join a by x, b by x using 'skewed';" + "d = foreach c generate a::x as x, y, z;" + - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld"); } @@ -514,7 +314,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = limit a 10;" + "c = foreach b generate y;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld"); } @@ -525,7 +325,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = order a by x, y;" + "c = limit b 10;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld"); } @@ -538,31 +338,18 @@ public class TestTezCompiler { "g = group a all;" + "h = foreach g generate COUNT(a) as sum;" + "c = limit b h.sum/2;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld"); } @Test - public void testLimitReplJoin() throws Exception { - String query = - "a = load 'file:///tmp/input' as (x:int, y:int);" + - "b = load 'file:///tmp/input' as (x:int, y:int);" + - "c = limit a 1;" + - "d = join c by x, b by x using 'replicated';" + - "store a into 'file:///tmp/pigoutput/a';" + - "store d into 'file:///tmp/pigoutput/d';"; - - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-4.gld"); - } - - @Test public void testDistinct() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = distinct a;" + "c = foreach b generate y;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld"); } @@ -573,7 +360,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = group a by x;" + "c = foreach b { d = distinct a; generate COUNT(d); };" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld"); } @@ -587,7 +374,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = load 'file:///tmp/input3' as (x:int, z:int);" + "d = join a by x, b by x, c by x using 'replicated';" + - "store d into 'file:///tmp/pigoutput/d';"; + "store d into 'file:///tmp/output/d';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld"); } @@ -600,7 +387,7 @@ public class TestTezCompiler { "b1 = foreach b generate group, COUNT(a.y);" + "c = load 'file:///tmp/input2' as (x:int, z:int);" + "d = join b1 by group, c by x using 'replicated';" + - "store d into 'file:///tmp/pigoutput/e';"; + "store d into 'file:///tmp/output/e';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld"); } @@ -610,7 +397,7 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" + "b = stream a through `stream.pl -n 5`;" + - "STORE b INTO 'file:///tmp/pigoutput';"; + "STORE b INTO 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld"); } @@ -621,7 +408,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int, z:int);" + "b = group a by $0;" + "c = foreach b { d = limit a 10; e = order d by $1; f = order e by $0; generate group, f;};"+ - "store c INTO 'file:///tmp/pigoutput';"; + "store c INTO 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld"); @@ -635,7 +422,7 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" + "b = order a by x;" + - "STORE b INTO 'file:///tmp/pigoutput';"; + "STORE b INTO 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld"); } @@ -646,7 +433,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" + "b = filter a by x == 1;" + "c = order b by x;" + - "STORE c INTO 'file:///tmp/pigoutput';"; + "STORE c INTO 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld"); } @@ -657,7 +444,7 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' using org.apache.pig.backend.hadoop.hbase.HBaseStorage(',') as (x:int, y:int);" + "b = order a by x;" + - "STORE b INTO 'file:///tmp/pigoutput';"; + "STORE b INTO 'file:///tmp/output';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld"); setProperty("pig.sort.readonce.loadfuncs", null); @@ -672,7 +459,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = cogroup a by x, b by x;" + "d = foreach c generate group, COUNT(a.y), COUNT(b.z);" + - "store d into 'file:///tmp/pigoutput/d';"; + "store d into 'file:///tmp/output/d';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld"); } @@ -682,9 +469,9 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + "split a into b if x <= 5, c if x <= 10, d if x >10;" + - "store b into 'file:///tmp/pigoutput/b';" + - "store c into 'file:///tmp/pigoutput/c';" + - "store d into 'file:///tmp/pigoutput/d';"; + "store b into 'file:///tmp/output/b';" + + "store c into 'file:///tmp/output/c';" + + "store d into 'file:///tmp/output/d';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld"); @@ -713,14 +500,14 @@ public class TestTezCompiler { // Needs to be removed in Tez plan as well. "f1 = limit f 1;" + "f2 = union d1, f1;" + - "store b1 into 'file:///tmp/pigoutput/b1';" + - "store b2 into 'file:///tmp/pigoutput/b2';" + - "store c1 into 'file:///tmp/pigoutput/c1';" + - "store c3 into 'file:///tmp/pigoutput/c1';" + - "store d1 into 'file:///tmp/pigoutput/d1';" + - "store e1 into 'file:///tmp/pigoutput/e1';" + - "store f1 into 'file:///tmp/pigoutput/f1';" + - "store f2 into 'file:///tmp/pigoutput/f2';"; + "store b1 into 'file:///tmp/output/b1';" + + "store b2 into 'file:///tmp/output/b2';" + + "store c1 into 'file:///tmp/output/c1';" + + "store c3 into 'file:///tmp/output/c1';" + + "store d1 into 'file:///tmp/output/d1';" + + "store e1 into 'file:///tmp/output/e1';" + + "store f1 into 'file:///tmp/output/f1';" + + "store f2 into 'file:///tmp/output/f2';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld"); @@ -736,8 +523,8 @@ public class TestTezCompiler { "b = foreach b generate group, COUNT(a.x);" + "c = group a by (x,y);" + "c = foreach c generate group, COUNT(a.y);" + - "store b into 'file:///tmp/pigoutput/b';" + - "store c into 'file:///tmp/pigoutput/c';"; + "store b into 'file:///tmp/output/b';" + + "store c into 'file:///tmp/output/c';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld"); @@ -753,9 +540,9 @@ public class TestTezCompiler { "c = join a by x, b by x;" + "d = foreach c generate $0, $1, $3;" + "e = foreach c generate $0, $1, $2, $3;" + - "store c into 'file:///tmp/pigoutput/c';" + - "store d into 'file:///tmp/pigoutput/d';" + - "store e into 'file:///tmp/pigoutput/e';"; + "store c into 'file:///tmp/output/c';" + + "store d into 'file:///tmp/output/d';" + + "store e into 'file:///tmp/output/e';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld"); @@ -768,13 +555,13 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = group a by x;" + //b: {group: int,a: {(x: int,y: int)}} - "store b into 'file:///tmp/pigoutput/b';" + + "store b into 'file:///tmp/output/b';" + "c = foreach b generate a.x, a.y;" + //c: {{(x: int)},{(y: int)}} - "store c into 'file:///tmp/pigoutput/c';" + + "store c into 'file:///tmp/output/c';" + "d = foreach b GENERATE FLATTEN(a);" + //d: {a::x: int,a::y: int} - "store d into 'file:///tmp/pigoutput/d';" + + "store d into 'file:///tmp/output/d';" + "e = foreach d GENERATE a::x, a::y;" + - "store e into 'file:///tmp/pigoutput/e';"; + "store e into 'file:///tmp/output/e';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld"); @@ -789,8 +576,8 @@ public class TestTezCompiler { "b = group a by x;" + "c = foreach b generate group, COUNT(a) as cnt;" + "SPLIT a into d if (2 * c.cnt) < y, e OTHERWISE;" + - "store d into 'file:///tmp/pigoutput1';" + - "store e into 'file:///tmp/pigoutput2';"; + "store d into 'file:///tmp/output1';" + + "store e into 'file:///tmp/output2';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6.gld"); @@ -807,7 +594,7 @@ public class TestTezCompiler { "c = join a by $0, b by $0 using 'replicated';" + "d = join a by $1, b by $1 using 'replicated';" + "e = union c,d;" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7.gld"); @@ -826,7 +613,7 @@ public class TestTezCompiler { "c = foreach c generate $0 as c1;" + "d = group a by x;" + "e = foreach d generate group, b.b1, c.c1;" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld"); @@ -836,67 +623,18 @@ public class TestTezCompiler { } @Test - public void testMultiQueryMultipleReplicateJoinWithUnion() throws Exception { - // Replicate joins are from a split - String query = - "a = load 'file:///tmp/input1' as (x:int, y:int);" + - "b = load 'file:///tmp/input2' as (x:int, y:int);" + - "c = load 'file:///tmp/input3' as (x:int, y:int);" + - "d = union a, b;" + - "e = filter c by y < 2;" + - "f = filter c by y > 5;" + - "g = join d by x, e by x using 'replicated';" + - "h = join g by d::x, f by x using 'replicated';" + - "store h into 'file:///tmp/pigoutput';"; - - setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9-OPTOFF.gld"); - - // Union is also from a split - query = - "a = load 'file:///tmp/input1' as (x:int, y:int);" + - "b = filter a by x == 2;" + - "c = load 'file:///tmp/input3' as (x:int, y:int);" + - "d = union a, b;" + - "e = filter c by y < 2;" + - "f = filter c by y > 5;" + - "g = join d by x, e by x using 'replicated';" + - "h = join g by d::x, f by x using 'replicated';" + - "store h into 'file:///tmp/pigoutput';"; - - resetScope(); - setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10-OPTOFF.gld"); - } - - @Test public void testUnionStore() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld"); resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); - query = - "a = load 'file:///tmp/input' as (x:int, y:chararray);" + - "b = load 'file:///tmp/input' as (y:chararray, x:int);" + - "c = union onschema a, b PARALLEL 15;" + - "store c into 'file:///tmp/pigoutput';"; - // Union optimization should be turned off if PARALLEL clause is specified - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); } @Test @@ -905,15 +643,14 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); String oldSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS); String oldUnSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName()); - // Plan should not have union optimization applied as PigStorage is unsupported + // Plan should not have union optimization applied run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); - resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, null); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, OrcStorage.class.getName()); @@ -921,37 +658,27 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + - "store c into 'file:///tmp/pigoutput' using " + DummyStoreWithOutputFormat.class.getName() + "();"; - // Plan should not have union optimization applied as only ORC is supported + "store c into 'file:///tmp/output' using " + DummyStoreWithOutputFormat.class.getName() + "();"; + // Plan should not have union optimization applied run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld"); resetScope(); - setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null); - query = - "a = load 'file:///tmp/input' as (x:int, y:chararray);" + - "b = load 'file:///tmp/input' as (y:chararray, x:int);" + - "c = union onschema a, b;" + - "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();"; - // Plan should not have union optimization applied as supportsParallelWriteToStoreLocation returns false - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld"); - - resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName()); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null); query = "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "split a into b if x > 5, c if x == 7, d if x == 8, e otherwise;" + "u1 = union onschema b, c;" + - "store u1 into 'file:///tmp/pigoutput/u1';" + + "store u1 into 'file:///tmp/output/u1';" + //TODO: multiple levels of split not merged "u2 = union onschema a, b, c;" + - "store u2 into 'file:///tmp/pigoutput/u2';" + + "store u2 into 'file:///tmp/output/u2';" + "u3 = union onschema d, e;" + - "store u3 into 'file:///tmp/pigoutput/u3';" + + "store u3 into 'file:///tmp/output/u3';" + "j1 = join d by x, a by x using 'replicated';" + "j1 = foreach j1 generate d::x as x, d::y as y;" + "u4 = union onschema j1, a;" + - "store u4 into 'file:///tmp/pigoutput/u4';"; + "store u4 into 'file:///tmp/output/u4';"; // Plan should have union optimization applied even for unsupported storefunc run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld"); @@ -969,7 +696,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = group c by x;" + "e = foreach d generate group, SUM(c.y);" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld"); @@ -985,7 +712,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join c by x, d by x;" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld"); @@ -1002,7 +729,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join c by x, d by x using 'replicated';" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; //TODO: PIG-3856 Not optimized setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); @@ -1016,7 +743,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join d by x, c by x using 'replicated';" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; // Optimized setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); @@ -1035,7 +762,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join c by x, d by x using 'skewed';" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld"); @@ -1052,7 +779,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + "d = order c by x;" + - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld"); @@ -1068,7 +795,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + "d = limit c 1;" + - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld"); @@ -1084,9 +811,9 @@ public class TestTezCompiler { "split a into a1 if x > 100, a2 otherwise;" + "c = union onschema a1, a2, b;" + "split c into d if x > 500, e otherwise;" + - "store a2 into 'file:///tmp/pigoutput/a2';" + - "store d into 'file:///tmp/pigoutput/d';" + - "store e into 'file:///tmp/pigoutput/e';"; + "store a2 into 'file:///tmp/output/a2';" + + "store d into 'file:///tmp/output/d';" + + "store e into 'file:///tmp/output/e';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld"); @@ -1104,8 +831,8 @@ public class TestTezCompiler { "d = load 'file:///tmp/input1' as (x:int, y:chararray);" + "e = union onschema c, d;" + "f = group e by x;" + - "store e into 'file:///tmp/pigoutput1';" + - "store f into 'file:///tmp/pigoutput2';"; + "store e into 'file:///tmp/output1';" + + "store f into 'file:///tmp/output2';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld"); @@ -1123,7 +850,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, y:chararray);" + "e = union onschema c, d;" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld"); @@ -1145,10 +872,10 @@ public class TestTezCompiler { "c2 = foreach c generate y, x;" + "c3 = union c1, c2;" + "a1 = union onschema b3, c3;" + - "store a1 into 'file:///tmp/pigoutput1';" + + "store a1 into 'file:///tmp/output1';" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join a1 by x, d by x using 'skewed';" + - "store e into 'file:///tmp/pigoutput2';"; + "store e into 'file:///tmp/output2';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12.gld"); @@ -1165,7 +892,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join c by x, d by x using 'replicated';" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld"); @@ -1179,7 +906,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join d by x, c by x using 'replicated';" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); @@ -1197,7 +924,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join c by x, d by x using 'skewed';" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld"); @@ -1211,7 +938,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join d by x, c by x using 'skewed';" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); @@ -1229,7 +956,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = filter c by x == d.x;" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17.gld"); @@ -1243,7 +970,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = filter d by x == c.x;" + - "store e into 'file:///tmp/pigoutput';"; + "store e into 'file:///tmp/output';"; resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); @@ -1254,76 +981,11 @@ public class TestTezCompiler { } @Test - public void testUnionSplitUnionStore() throws Exception { - String query = - "a = load 'file:///tmp/input' as (x:int, y:chararray);" + - "b = load 'file:///tmp/input1' as (y:chararray, x:int);" + - "c = union onschema a, b;" + - "split c into d if x <= 5, e if x <= 10, f if x >10, g if y == '6';" + - "h = union onschema d, e;" + - "i = union onschema f, g;" + - "store h into 'file:///tmp/pigoutput/1';" + - "store i into 'file:///tmp/pigoutput/2';"; - - resetScope(); - setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19-OPTOFF.gld"); - - // With a join in between - query = - "a = load 'file:///tmp/input' as (x:chararray);" + - "b = load 'file:///tmp/input' as (x:chararray);" + - "c = load 'file:///tmp/input' as (y:chararray);" + - "u1 = union onschema a, b;" + - "SPLIT u1 INTO r IF x != '', s OTHERWISE;" + - "d = JOIN r BY x LEFT, c BY y;" + - "u2 = UNION ONSCHEMA d, s;" + - "e = FILTER u2 BY x == '';" + - "f = FILTER u2 BY x == 'm';" + - "u3 = UNION ONSCHEMA e, f;" + - "store u3 into 'file:///tmp/pigoutput';"; - - setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20-OPTOFF.gld"); - } - - @Test - public void testUnionSplitUnionLimitStore() throws Exception { - // Similar to previous testcase but a LIMIT at the end to test a non-store vertex group - String query = - "a = load 'file:///tmp/input' as (x:chararray);" + - "b = load 'file:///tmp/input' as (x:chararray);" + - "c = load 'file:///tmp/input' as (y:chararray);" + - "u1 = union onschema a, b;" + - "SPLIT u1 INTO r IF x != '', s OTHERWISE;" + - "d = JOIN r BY x LEFT, c BY y;" + - "u2 = UNION ONSCHEMA d, s;" + - "e = FILTER u2 BY x == '';" + - "f = FILTER u2 BY x == 'm';" + - "u3 = UNION ONSCHEMA e, f;" + - "SPLIT u3 INTO t if x != '', u OTHERWISE;" + - "v = LIMIT t 10;" + - "store v into 'file:///tmp/pigoutput';"; - - setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21.gld"); - resetScope(); - setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21-OPTOFF.gld"); - } - - @Test public void testRank() throws Exception { String query = "a = load 'file:///tmp/input1' as (x:int, y:int);" + "b = rank a;" + - "store b into 'file:///tmp/pigoutput/d';"; + "store b into 'file:///tmp/output/d';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-1.gld"); } @@ -1334,7 +996,7 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input1' as (x:int, y:int);" + "b = rank a by x;" + - "store b into 'file:///tmp/pigoutput/d';"; + "store b into 'file:///tmp/output/d';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld"); } @@ -1390,32 +1052,5 @@ public class TestTezCompiler { assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)), TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean))); } - - public static class TestDummyStoreFunc extends StoreFunc { - - @Override - public OutputFormat getOutputFormat() throws IOException { - return null; - } - - @Override - public void setStoreLocation(String location, Job job) - throws IOException { - } - - @Override - public void prepareToWrite(RecordWriter writer) throws IOException { - } - - @Override - public void putNext(Tuple t) throws IOException { - } - - @Override - public Boolean supportsParallelWriteToStoreLocation() { - return false; - } - - } } Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java Fri Feb 24 03:34:37 2017 @@ -117,15 +117,15 @@ public class TestTezGraceParallelism { Util.createLogAppender("testDecreaseParallelism", writer, new Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class}); try { // DAG: 47 \ - // -> 49(join) -> 52(distinct) -> 56(group) + // -> 49(join) -> 52(distinct) -> 61(group) // 48 / // Parallelism at compile time: // DAG: 47(1) \ - // -> 49(2) -> 52(20) -> 56(200) + // -> 49(2) -> 52(20) -> 61(200) // 48(1) / // However, when 49 finishes, the actual output of 49 only justify parallelism 1. - // We adjust the parallelism for 56 to 7 based on this. - // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 7 to 1. + // We adjust the parallelism for 61 to 100 based on this. + // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 100 to 1. // pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); @@ -140,10 +140,10 @@ public class TestTezGraceParallelism { "('F',1349L)", "('M',1373L)"}); Util.checkQueryOutputsAfterSort(iter, expectedResults); assertTrue(writer.toString().contains("Initialize parallelism for scope-52 to 18")); - assertTrue(writer.toString().contains("Initialize parallelism for scope-56 to 7")); + assertTrue(writer.toString().contains("Initialize parallelism for scope-61 to 7")); assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-49 to 1 from 2")); assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52 to 1 from 18")); - assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-56 to 1 from 7")); + assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61 to 1 from 7")); } finally { Util.removeLogAppender("testDecreaseParallelism", PigGraceShuffleVertexManager.class, ShuffleVertexManager.class); } @@ -217,8 +217,8 @@ public class TestTezGraceParallelism { count++; } assertEquals(count, 20); - assertTrue(writer.toString().contains("All predecessors for scope-79 are finished, time to set parallelism for scope-80")); - assertTrue(writer.toString().contains("Initialize parallelism for scope-80 to 10")); + assertTrue(writer.toString().contains("All predecessors for scope-84 are finished, time to set parallelism for scope-85")); + assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to 10")); } finally { Util.removeLogAppender("testJoinWithDifferentDepth", PigGraceShuffleVertexManager.class); } @@ -262,9 +262,9 @@ public class TestTezGraceParallelism { StringWriter writer = new StringWriter(); Util.createLogAppender("testJoinWithUnion", writer, PigGraceShuffleVertexManager.class); try { - // DAG: 29 -> 32 -> 36 \ - // -> 55 (vertex group) -> 51 - // 37 -> 40 -> 44 / + // DAG: 29 -> 32 -> 41 \ + // -> 70 (vertex group) -> 61 + // 42 -> 45 -> 54 / pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("B = distinct A;"); pigServer.registerQuery("C = group B by name;"); @@ -280,8 +280,8 @@ public class TestTezGraceParallelism { count++; } assertEquals(count, 20); - assertTrue(writer.toString().contains("time to set parallelism for scope-36")); - assertTrue(writer.toString().contains("time to set parallelism for scope-44")); + assertTrue(writer.toString().contains("time to set parallelism for scope-41")); + assertTrue(writer.toString().contains("time to set parallelism for scope-54")); } finally { Util.removeLogAppender("testJoinWithUnion", PigGraceShuffleVertexManager.class); } @@ -322,33 +322,4 @@ public class TestTezGraceParallelism { super.setStoreLocation(location, job); } } - - @Test - // See PIG-4786 - public void testCross() throws IOException{ - // scope-90 is the cross vertex. It should not use PigGraceShuffleVertexManager - NodeIdGenerator.reset(); - PigServer.resetScope(); - StringWriter writer = new StringWriter(); - Util.createLogAppender("testCross", writer, PigGraceShuffleVertexManager.class); - File outputDir = File.createTempFile("intemediate", "txt"); - outputDir.delete(); - pigServer.getPigContext().getProperties().setProperty("mapreduce.input.fileinputformat.split.maxsize", "3000"); - pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true"); - pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); - pigServer.registerQuery("B = order A by name;"); - pigServer.registerQuery("C = distinct B;"); - pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);"); - pigServer.registerQuery("E = group D by name;"); - pigServer.registerQuery("F = foreach E generate group as name, AVG(D.age) as avg_age;"); - pigServer.registerQuery("G = cross C, F;"); - Iterator<Tuple> iter = pigServer.openIterator("G"); - int count = 0; - while (iter.hasNext()) { - iter.next(); - count++; - } - assertEquals(count, 400); - assertFalse(writer.toString().contains("scope-90")); - } } Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java Fri Feb 24 03:34:37 2017 @@ -21,9 +21,7 @@ import static org.junit.Assert.assertEqu import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import java.io.File; import java.io.IOException; -import java.lang.reflect.Field; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -31,20 +29,17 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler; import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; @@ -53,7 +48,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; -import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager; import org.apache.pig.builtin.PigStorage; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.OperatorKey; @@ -63,11 +57,8 @@ import org.apache.pig.test.junit.Ordered import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder; import org.apache.pig.tools.pigstats.ScriptState; import org.apache.pig.tools.pigstats.tez.TezScriptState; -import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Vertex; -import org.apache.tez.dag.api.VertexManagerPluginDescriptor; -import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -86,8 +77,7 @@ import org.junit.runner.RunWith; "testTezParallelismEstimatorFilterFlatten", "testTezParallelismEstimatorHashJoin", "testTezParallelismEstimatorSplitBranch", - "testTezParallelismDefaultParallelism", - "testShuffleVertexManagerConfig" + "testTezParallelismDefaultParallelism" }) public class TestTezJobControlCompiler { private static PigContext pc; @@ -99,7 +89,6 @@ public class TestTezJobControlCompiler { public static void setUpBeforeClass() throws Exception { input1 = Util.createTempFileDelOnExit("input1", "txt").toURI(); input2 = Util.createTempFileDelOnExit("input2", "txt").toURI(); - FileUtils.deleteDirectory(new File("/tmp/pigoutput")); } @AfterClass @@ -118,7 +107,7 @@ public class TestTezJobControlCompiler { "a = load '" + input1 +"' as (x:int, y:int);" + "b = filter a by x > 0;" + "c = foreach b generate y;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; Pair<TezOperPlan, DAG> compiledPlan = compile(query); @@ -138,7 +127,7 @@ public class TestTezJobControlCompiler { "a = load '" + input1 +"' as (x:int, y:int);" + "b = group a by x;" + "c = foreach b generate group, a;" + - "store c into 'file:///tmp/pigoutput';"; + "store c into 'file:///tmp/output';"; Pair<TezOperPlan, DAG> compiledPlan = compile(query); @@ -170,7 +159,7 @@ public class TestTezJobControlCompiler { "b = load '" + input2 +"' as (x:int, z:int);" + "c = join a by x, b by x;" + "d = foreach c generate a::x as x, y, z;" + - "store d into 'file:///tmp/pigoutput';"; + "store d into 'file:///tmp/output';"; Pair<TezOperPlan, DAG> compiledPlan = compile(query); @@ -300,72 +289,6 @@ public class TestTezJobControlCompiler { TezOperator leafOper = compiledPlan.first.getLeaves().get(0); Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString()); assertEquals(leafVertex.getParallelism(), 5); - pc.defaultParallel = -1; - } - - @Test - public void testShuffleVertexManagerConfig() throws Exception{ - pc.getProperties().setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "0.3"); - pc.getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "500"); - - try { - - String query = "a = load '10' using " + ArbitarySplitsLoader.class.getName() - + "() as (name:chararray, age:int, gpa:double);" - + "b = limit a 5;" - + "c = group b by name;" - + "store c into 'output';"; - - VertexManagerPluginDescriptor vmPlugin = getLeafVertexVMPlugin(query); - Configuration vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload()); - - // Case of grace auto parallelism (PigGraceShuffleVertexManager) - assertEquals(PigGraceShuffleVertexManager.class.getName(), vmPlugin.getClassName()); - // min and max src fraction, auto parallel, desired size, bytes.per.reducer, pig.tez.plan and pigcontext - assertEquals(7, vmPluginConf.size()); - assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION)); - assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION)); - assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL)); - assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE)); - assertEquals("500", vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM)); - - // Case of auto parallelism (ShuffleVertexManager) - pc.getProperties().setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false"); - vmPlugin = getLeafVertexVMPlugin(query); - vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload()); - assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName()); - // min and max src fraction, auto parallel, desired size - assertEquals(4, vmPluginConf.size()); - assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION)); - assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION)); - assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL)); - assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE)); - - // Case of default parallel or PARALLEL (ShuffleVertexManager) - pc.defaultParallel = 2; - vmPlugin = getLeafVertexVMPlugin(query); - vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload()); - assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName()); - // min and max src fraction - assertEquals(2, vmPluginConf.size()); - assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION)); - assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION)); - } finally { - pc.getProperties().remove(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART); - pc.getProperties().remove(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM); - pc.getProperties().remove(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM); - pc.defaultParallel = -1; - } - } - - private VertexManagerPluginDescriptor getLeafVertexVMPlugin(String query) throws Exception { - Pair<TezOperPlan, DAG> compiledPlan = compile(query); - TezOperator leafOper = compiledPlan.first.getLeaves().get(0); - Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString()); - Field vmPluginField = Vertex.class.getDeclaredField("vertexManagerPlugin"); - vmPluginField.setAccessible(true); - VertexManagerPluginDescriptor vmPlugin = (VertexManagerPluginDescriptor) vmPluginField.get(leafVertex); - return vmPlugin; } private Pair<TezOperPlan, DAG> compile(String query) throws Exception {
