Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java Fri Mar 4 18:17:39 2016 @@ -20,6 +20,7 @@ package org.apache.pig.tez; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Properties; import java.util.Random; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -38,6 +40,8 @@ import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder; +import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler; import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter; import org.apache.pig.data.Tuple; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -52,7 +56,7 @@ import org.junit.Test; public class TestTezAutoParallelism { private static final String INPUT_FILE1 = TestTezAutoParallelism.class.getName() + "_1"; private static final String INPUT_FILE2 = TestTezAutoParallelism.class.getName() + "_2"; - private static final String INPUT_DIR = "build/test/data"; + private static final String INPUT_DIR = Util.getTestDirectory(TestTezAutoParallelism.class); private static PigServer pigServer; private static Properties properties; @@ -62,6 +66,8 @@ public class TestTezAutoParallelism { public static void oneTimeSetUp() throws Exception { cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ); properties = cluster.getProperties(); + //Test spilling to disk as tests here have multiple splits + properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, "10"); createFiles(); } @@ -83,7 +89,7 @@ public class TestTezAutoParallelism { } private static void createFiles() throws IOException { - new File(INPUT_DIR).mkdir(); + new File(INPUT_DIR).mkdirs(); PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE1)); @@ -223,7 +229,7 @@ public class TestTezAutoParallelism { // skewed join parallelism is 3 originally, increase to 5 pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("C = join A by name, B by name using 'skewed';"); @@ -238,26 +244,158 @@ public class TestTezAutoParallelism { return false; } }); + assertEquals(files.length, 5); + } + + @Test + public void testSkewedFullJoinIncreaseParallelism() throws IOException{ + // skewed full join parallelism take the initial setting, since the join vertex has a broadcast(sample) dependency, + // which prevent it changing parallelism + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); + pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); + pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); + pigServer.registerQuery("C = join A by name full, B by name using 'skewed';"); + pigServer.store("C", "output6"); + FileSystem fs = cluster.getFileSystem(); + FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){ + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }); + assertEquals(files.length, 5); + } + + @Test + public void testSkewedJoinIncreaseParallelismWithScalar() throws IOException{ + // skewed join parallelism take the initial setting, since the join vertex has a broadcast(scalar) dependency, + // which prevent it changing parallelism + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); + pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); + pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); + pigServer.registerQuery("C = join A by name, B by name using 'skewed';"); + pigServer.registerQuery("D = load 'org.apache.pig.tez.TestTezAutoParallelism_1' as (name:chararray, age:int);"); + pigServer.registerQuery("E = group D all;"); + pigServer.registerQuery("F = foreach E generate COUNT(D) as count;"); + pigServer.registerQuery("G = foreach C generate age/F.count, gender;"); + pigServer.store("G", "output7"); + FileSystem fs = cluster.getFileSystem(); + FileStatus[] files = fs.listStatus(new Path("output7"), new PathFilter(){ + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }); assertEquals(files.length, 4); } @Test - public void testSkewedJoinIncreaseIntermediateParallelism() throws IOException{ + public void testFlattenParallelism() throws IOException{ + String outputDir = "/tmp/testFlattenParallelism"; + String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);" + + "B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);" + + "C = join A by name, B by name using 'skewed' parallel 1;" + + "C1 = group C by A::name;" + + "C2 = FOREACH C1 generate group, FLATTEN(C);" + + "D = group C2 by group;" + + "E = foreach D generate group, COUNT(C2.A::name);" + + "STORE E into '" + outputDir + "/finalout';"; + String log = testAutoParallelism(script, outputDir, true, TezJobCompiler.class, TezDagBuilder.class); + assertTrue(log.contains("For vertex - scope-74: parallelism=10")); + assertTrue(log.contains("For vertex - scope-75: parallelism=70")); + assertTrue(log.contains("Total estimated parallelism is 89")); + } + + @Test + public void testIncreaseIntermediateParallelism1() throws IOException{ + // User specified parallelism is overriden for intermediate step + String outputDir = "/tmp/testIncreaseIntermediateParallelism"; + String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);" + + "B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);" + + "C = join A by name, B by name using 'skewed' parallel 1;" + + "D = group C by A::name;" + + "E = foreach D generate group, COUNT(C.A::name);" + + "STORE E into '" + outputDir + "/finalout';"; + String log = testIncreaseIntermediateParallelism(script, outputDir, true); + // Parallelism of C should be increased + assertTrue(log.contains("Increased requested parallelism of scope-59 to 4")); + assertEquals(1, StringUtils.countMatches(log, "Increased requested parallelism")); + assertTrue(log.contains("Total estimated parallelism is 40")); + } + + @Test + public void testIncreaseIntermediateParallelism2() throws IOException{ + // User specified parallelism should not be overriden for intermediate step if there is a STORE + String outputDir = "/tmp/testIncreaseIntermediateParallelism"; + String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);" + + "B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);" + + "C = join A by name, B by name using 'skewed' parallel 2;" + + "STORE C into '/tmp/testIncreaseIntermediateParallelism';" + + "D = group C by A::name parallel 2;" + + "E = foreach D generate group, COUNT(C.A::name);" + + "STORE E into '" + outputDir + "/finalout';"; + String log = testIncreaseIntermediateParallelism(script, outputDir, true); + // Parallelism of C will not be increased as the Split has a STORE + assertEquals(0, StringUtils.countMatches(log, "Increased requested parallelism")); + } + + @Test + public void testIncreaseIntermediateParallelism3() throws IOException{ + // Multiple levels with default parallelism. Group by followed by Group by + try { + String outputDir = "/tmp/testIncreaseIntermediateParallelism"; + String script = "set default_parallel 1\n" + + "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);" + + "B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);" + + "C = join A by name, B by name;" + + "STORE C into '/tmp/testIncreaseIntermediateParallelism';" + + "C1 = group C by A::name;" + + "C2 = FOREACH C1 generate group, FLATTEN(C);" + + "D = group C2 by group;" + + "E = foreach D generate group, COUNT(C2.A::name);" + + "F = order E by $0;" + + "STORE F into '" + outputDir + "/finalout';"; + String log = testIncreaseIntermediateParallelism(script, outputDir, false); + // Parallelism of C1 should be increased. C2 will not be increased due to order by + assertEquals(1, StringUtils.countMatches(log, "Increased requested parallelism")); + assertTrue(log.contains("Increased requested parallelism of scope-65 to 10")); + assertTrue(log.contains("Total estimated parallelism is 19")); + } finally { + pigServer.setDefaultParallel(-1); + } + } + + private String testIncreaseIntermediateParallelism(String script, String outputDir, boolean sortAndCheck) throws IOException { + return testAutoParallelism(script, outputDir, sortAndCheck, ParallelismSetter.class, TezJobCompiler.class); + } + + private String testAutoParallelism(String script, String outputDir, boolean sortAndCheck, Class... classesToLog) throws IOException { NodeIdGenerator.reset(); PigServer.resetScope(); StringWriter writer = new StringWriter(); // When there is a combiner operation involved user specified parallelism is overriden - Util.createLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism", writer); + Util.createLogAppender("testAutoParallelism", writer, classesToLog); try { pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000"); pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000"); - pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); - pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); - pigServer.registerQuery("C = join A by name, B by name using 'skewed' parallel 1;"); - pigServer.registerQuery("D = group C by A::name;"); - pigServer.registerQuery("E = foreach D generate group, COUNT(C.A::name);"); - Iterator<Tuple> iter = pigServer.openIterator("E"); + pigServer.setBatchOn(); + pigServer.registerScript(new ByteArrayInputStream(script.getBytes())); + pigServer.executeBatch(); + + pigServer.registerQuery("A = load '" + outputDir + "/finalout' as (name:chararray, count:long);"); + Iterator<Tuple> iter = pigServer.openIterator("A"); + List<Tuple> expectedResults = Util .getTuplesFromConstantTupleStrings(new String[] { "('Abigail',56L)", "('Alexander',45L)", "('Ava',60L)", @@ -267,11 +405,15 @@ public class TestTezAutoParallelism { "('Liam',46L)", "('Madison',46L)", "('Mason',54L)", "('Mia',51L)", "('Michael',47L)", "('Noah',38L)", "('Olivia',50L)", "('Sophia',52L)", "('William',43L)" }); - - Util.checkQueryOutputsAfterSort(iter, expectedResults); - assertTrue(writer.toString().contains("Increased requested parallelism of scope-40 to 4")); + if (sortAndCheck) { + Util.checkQueryOutputsAfterSort(iter, expectedResults); + } else { + Util.checkQueryOutputs(iter, expectedResults); + } + return writer.toString(); } finally { - Util.removeLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism"); + Util.removeLogAppender("testAutoParallelism", classesToLog); + Util.deleteFile(cluster, outputDir); } } }
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Fri Mar 4 18:17:39 2016 @@ -33,8 +33,11 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter; +import org.apache.pig.builtin.OrcStorage; +import org.apache.pig.builtin.PigStorage; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat; import org.apache.pig.test.Util; import org.apache.pig.test.utils.TestHelper; import org.junit.AfterClass; @@ -86,6 +89,52 @@ public class TestTezCompiler { } @Test + public void testStoreLoad() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:int);" + + "store a into 'file:///tmp/output';" + + "b = load 'file:///tmp/output' as (x:int, y:int);" + + "store b into 'file:///tmp/output1';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-1.gld"); + } + + @Test + public void testStoreLoadMultiple() throws Exception { + String query = + "a = load 'file:///tmp/input';" + + "store a into 'file:///tmp/output/Dir1';" + + "a = load 'file:///tmp/output/Dir1';" + + "store a into 'file:///tmp/output/Dir2' using BinStorage();" + + "a = load 'file:///tmp/output/Dir1';" + + "store a into 'file:///tmp/output/Dir3';" + + "a = load 'file:///tmp/output/Dir2' using BinStorage();" + + "store a into 'file:///tmp/output/Dir4';" + + "a = load 'file:///tmp/output/Dir3';" + + "b = load 'file:///tmp/output/Dir2' using BinStorage();" + + "c = load 'file:///tmp/output/Dir1';" + + "d = cogroup a by $0, b by $0, c by $0;" + + "store d into 'file:///tmp/output/Dir5';"; + + // To get around difference in ordering of operators in plan due to JDK7 and JDK8 + if (System.getProperties().getProperty("java.version").startsWith("1.8")) { + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld"); + } else { + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld"); + } + } + + @Test + public void testNative() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:int);" + + "b = native 'hadoop-examples.jar' Store a into '/tmp/table_testNativeMRJobSimple_input' Load '/tmp/table_testNativeMRJobSimple_output' `wordcount /tmp/table_testNativeMRJobSimple_input /tmp/table_testNativeMRJobSimple_output`;" + + "store b into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Native-1.gld"); + } + + @Test public void testFilter() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + @@ -120,6 +169,121 @@ public class TestTezCompiler { } @Test + public void testSelfJoin() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = filter a by x < 5;" + + "c = filter a by x == 10;" + + "d = filter a by x > 10;" + + "e = join b by x, c by x, d by x;" + + "store e into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-1.gld"); + } + + @Test + public void testSelfJoinSkewed() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = filter a by x < 5;" + + "c = filter a by x == 10;" + + "d = join b by x, c by x using 'skewed';" + + "store d into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld"); + } + + @Test + public void testSelfJoinReplicated() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = filter a by x < 5;" + + "c = filter a by x == 10;" + + "d = filter a by x > 10;" + + "e = join b by x, c by x, d by x using 'replicated';" + + "store e into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-3.gld"); + } + + @Test + public void testSelfJoinUnionReplicated() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = load 'file:///tmp/input2' as (x:int, z:int);" + + "c = union a, b;" + + "d = join b by x, c by x using 'replicated';" + + "store d into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld"); + } + + @Test + public void testSelfJoinUnion() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "a1 = filter a by x > 5;" + + "a2 = filter a by x < 2;" + + "b = union a1, a2;" + + "c = join b by x, a by x;" + + "store c into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld"); + } + + @Test + public void testSelfJoinUnionDifferentMembers() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "a1 = filter a by x > 5;" + + "a2 = filter a by x < 2;" + + "a3 = filter a by y == 10;" + + "a4 = join a2 by x, a3 by x;" + + "a5 = foreach a4 generate a2::x as x, a3::y as y;" + + "b = union a1, a5;" + + "c = join b by x, a by x;" + + "store c into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld"); + } + + @Test + public void testCross() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = load 'file:///tmp/input2' as (x:int, z:int);" + + "c = cross a, b;" + + "d = foreach c generate a::x as x, y, z;" + + "store d into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-1.gld"); + } + + @Test + public void testSelfCross() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = filter a by x < 5;" + + "c = filter a by x == 10;" + + "d = cross b, c;" + + "store d into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-2.gld"); + } + + @Test + public void testCrossScalarSplit() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = load 'file:///tmp/input2' as (x:int, z:int);" + + "c = cross b, a;" + + "d = foreach c generate a.x, a.y, z;" + //Scalar + "store d into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld"); + } + + @Test public void testSkewedJoin() throws Exception { String query = "a = load 'file:///tmp/input1' as (x:int, y:int);" + @@ -132,6 +296,19 @@ public class TestTezCompiler { } @Test + public void testSkewedJoinFilter() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "a = filter a by x == 1;" + + "b = load 'file:///tmp/input2' as (x:int, z:int);" + + "c = join a by x, b by x using 'skewed';" + + "d = foreach c generate a::x as x, y, z;" + + "store d into 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld"); + } + + @Test public void testLimit() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + @@ -250,6 +427,29 @@ public class TestTezCompiler { run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld"); } + @Test + public void testOrderByWithFilter() throws Exception { + String query = + "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" + + "b = filter a by x == 1;" + + "c = order b by x;" + + "STORE c INTO 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld"); + } + + @Test + public void testOrderByReadOnceLoadFunc() throws Exception { + setProperty("pig.sort.readonce.loadfuncs","org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage"); + String query = + "a = load 'file:///tmp/input' using org.apache.pig.backend.hadoop.hbase.HBaseStorage(',') as (x:int, y:int);" + + "b = order a by x;" + + "STORE b INTO 'file:///tmp/output';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld"); + setProperty("pig.sort.readonce.loadfuncs", null); + } + // PIG-3759, PIG-3781 // Combiner should not be added in case of co-group @Test @@ -370,6 +570,59 @@ public class TestTezCompiler { } @Test + public void testMultiQueryScalar() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:int);" + + "b = group a by x;" + + "c = foreach b generate group, COUNT(a) as cnt;" + + "SPLIT a into d if (2 * c.cnt) < y, e OTHERWISE;" + + "store d into 'file:///tmp/output1';" + + "store e into 'file:///tmp/output2';"; + + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6-OPTOFF.gld"); + } + + @Test + public void testMultiQueryMultipleReplicateJoin() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:int);" + + "b = load 'file:///tmp/input' as (x:int, y:int);" + + "c = join a by $0, b by $0 using 'replicated';" + + "d = join a by $1, b by $1 using 'replicated';" + + "e = union c,d;" + + "store e into 'file:///tmp/output';"; + + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7-OPTOFF.gld"); + } + + @Test + public void testMultiQueryMultipleScalar() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = filter a by x == 5;" + + "b = foreach b generate $0 as b1;" + + "c = filter a by x == 10;" + + "c = foreach c generate $0 as c1;" + + "d = group a by x;" + + "e = foreach d generate group, b.b1, c.c1;" + + "store e into 'file:///tmp/output';"; + + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld"); + } + + @Test public void testUnionStore() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:chararray);" + @@ -379,11 +632,41 @@ public class TestTezCompiler { setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld"); + resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); } @Test + public void testUnionUnSupportedStore() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + + "c = union onschema a, b;" + + "store c into 'file:///tmp/output';"; + + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + String oldSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS); + String oldUnSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName()); + // Plan should not have union optimization applied + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, null); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, OrcStorage.class.getName()); + query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + + "c = union onschema a, b;" + + "store c into 'file:///tmp/output' using " + DummyStoreWithOutputFormat.class.getName() + "();"; + // Plan should not have union optimization applied + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld"); + // Restore the value + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, oldSupported); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, oldUnSupported); + } + + @Test public void testUnionGroupBy() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + @@ -449,6 +732,8 @@ public class TestTezCompiler { @Test public void testUnionSkewedJoin() throws Exception { + // TODO: PIG-4574 optimization needs to be done for this as well. + // Requires changes in UnionOptimizer String query = "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + @@ -465,6 +750,8 @@ public class TestTezCompiler { @Test public void testUnionOrderby() throws Exception { + // TODO: PIG-4574 optimization needs to be done for this as well. + // Requires changes in UnionOptimizer String query = "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + @@ -500,7 +787,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "split a into a1 if x > 100, a2 otherwise;" + - "c = union onschema a1, b;" + + "c = union onschema a1, a2, b;" + "split c into d if x > 500, e otherwise;" + "store a2 into 'file:///tmp/output/a2';" + "store d into 'file:///tmp/output/d';" + @@ -508,6 +795,7 @@ public class TestTezCompiler { setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld"); + resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld"); } @@ -521,17 +809,18 @@ public class TestTezCompiler { "d = load 'file:///tmp/input1' as (x:int, y:chararray);" + "e = union onschema c, d;" + "f = group e by x;" + - "store f into 'file:///tmp/output';"; + "store e into 'file:///tmp/output1';" + + "store f into 'file:///tmp/output2';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld"); resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld"); + } - //TODO: union followed by union followed by store does not work. - //@Test + @Test public void testUnionUnionStore() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:chararray);" + @@ -549,6 +838,127 @@ public class TestTezCompiler { } @Test + public void testMultipleUnionSplitJoin() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = filter a by x == 2;" + + "b1 = foreach b generate *;" + + "b2 = foreach b generate *;" + + "b3 = union onschema b1, b2;" + + "c = filter a by x == 3;" + + "c1 = foreach c generate y, x;" + + "c2 = foreach c generate y, x;" + + "c3 = union c1, c2;" + + "a1 = union onschema b3, c3;" + + "store a1 into 'file:///tmp/output1';" + + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + + "e = join a1 by x, d by x using 'skewed';" + + "store e into 'file:///tmp/output2';"; + + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12-OPTOFF.gld"); + } + + @Test + public void testUnionSplitReplicateJoin() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = filter a by x == 2;" + + "c = union onschema a, b;" + + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + + "e = join c by x, d by x using 'replicated';" + + "store e into 'file:///tmp/output';"; + + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13-OPTOFF.gld"); + + query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = filter a by x == 2;" + + "c = union onschema a, b;" + + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + + "e = join d by x, c by x using 'replicated';" + + "store e into 'file:///tmp/output';"; + + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-14.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-14-OPTOFF.gld"); + } + + @Test + public void testUnionSplitSkewedJoin() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = filter a by x == 2;" + + "c = union onschema a, b;" + + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + + "e = join c by x, d by x using 'skewed';" + + "store e into 'file:///tmp/output';"; + + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld"); + + query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = filter a by x == 2;" + + "c = union onschema a, b;" + + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + + "e = join d by x, c by x using 'skewed';" + + "store e into 'file:///tmp/output';"; + + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld"); + } + + @Test + public void testUnionScalar() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + + "c = union onschema a, b;" + + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + + "e = filter c by x == d.x;" + + "store e into 'file:///tmp/output';"; + + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17-OPTOFF.gld"); + + query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + + "c = union onschema a, b;" + + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + + "e = filter d by x == c.x;" + + "store e into 'file:///tmp/output';"; + + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-18.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-18-OPTOFF.gld"); + } + + @Test public void testRank() throws Exception { String query = "a = load 'file:///tmp/input1' as (x:int, y:int);" + @@ -569,8 +979,16 @@ public class TestTezCompiler { run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld"); } + private String getProperty(String property) { + return pigServer.getPigContext().getProperties().getProperty(property); + } + private void setProperty(String property, String value) { - pigServer.getPigContext().getProperties().setProperty(property, value); + if (value == null) { + pigServer.getPigContext().getProperties().remove(property); + } else { + pigServer.getPigContext().getProperties().setProperty(property, value); + } } private void run(String query, String expectedFile) throws Exception { Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java Fri Mar 4 18:17:39 2016 @@ -238,8 +238,9 @@ public class TestTezJobControlCompiler { + "store e into 'output';"; Pair<TezOperPlan, DAG> compiledPlan = compile(query); TezOperator leafOper = compiledPlan.first.getLeaves().get(0); + assertTrue(leafOper.isUseGraceParallelism()); Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString()); - assertEquals(leafVertex.getParallelism(), 70); + assertEquals(leafVertex.getParallelism(), -1); } @Test @@ -271,8 +272,9 @@ public class TestTezJobControlCompiler { List<TezOperator> leaves = compiledPlan.first.getLeaves(); Collections.sort(leaves); TezOperator leafOper = leaves.get(1); + assertTrue(leafOper.isUseGraceParallelism()); Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString()); - assertEquals(leafVertex.getParallelism(), 7); + assertEquals(leafVertex.getParallelism(), -1); } @Test Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java Fri Mar 4 18:17:39 2016 @@ -20,10 +20,17 @@ package org.apache.pig.tez; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType; import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; +import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.test.MiniGenericCluster; import org.apache.pig.test.Util; @@ -54,8 +61,8 @@ public class TestTezLauncher { }; private static final String OUTPUT_FILE = "TestTezLauncherOutput"; - private static final String[] OUTPUT_RECORDS = { - "all\t{(apple),(pear),(pear),(strawberry),(orange)}" + private static final String[] OUTPUT_RECORDS = new String[] { + "(apple)", "(pear)", "(pear)", "(strawberry)", "(orange)" }; @BeforeClass @@ -94,16 +101,34 @@ public class TestTezLauncher { PigStats pigStats = launcher.launchPig(pp, "testRun1", pc); assertTrue(pigStats.isSuccessful()); - String[] output = Util.readOutput(cluster.getFileSystem(), OUTPUT_FILE); - for (int i = 0; i < output.length; i++) { - assertEquals(OUTPUT_RECORDS[i], output[i]); - } - assertEquals(1, pigStats.getInputStats().size()); assertEquals(INPUT_FILE, pigStats.getInputStats().get(0).getName()); assertEquals(1, pigStats.getOutputStats().size()); assertEquals(OUTPUT_FILE, pigStats.getOutputStats().get(0).getName()); + + query = "m = load '" + OUTPUT_FILE + "' as (a:chararray, b:{(y:chararray)});"; + pigServer = new PigServer(pc); + pigServer.registerQuery(query); + Iterator<Tuple> iter = pigServer.openIterator("m"); + Tuple result = iter.next(); + assertEquals(result.get(0).toString(), "all"); + Iterator<Tuple> innerIter = ((DataBag)result.get(1)).iterator(); + int count = 0; + while (innerIter.hasNext()) { + assertTrue(Arrays.asList(OUTPUT_RECORDS).contains(innerIter.next().toString())); + count++; + } + assertEquals(count, OUTPUT_RECORDS.length); + } + + @Test + public void testQueueName() throws Exception { + Configuration conf = new Configuration(); + conf.set("tez.queue.name", "special"); + conf = MRToTezHelper.getDAGAMConfFromMRConf(conf); + assertEquals(conf.get("tez.queue.name"), "special"); + } } Modified: pig/branches/spark/test/perf/pigmix/bin/runpigmix.pl URL: http://svn.apache.org/viewvc/pig/branches/spark/test/perf/pigmix/bin/runpigmix.pl?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/perf/pigmix/bin/runpigmix.pl (original) +++ pig/branches/spark/test/perf/pigmix/bin/runpigmix.pl Fri Mar 4 18:17:39 2016 @@ -1,8 +1,8 @@ #!/usr/local/bin/perl -w -if(scalar(@ARGV) < 6 ) +if(scalar(@ARGV) < 6) { - print STDERR "Usage: $0 <pig_home> <pig_bin> <pigmix_jar> <hadoop_home> <hadoop_bin> <pig mix scripts dir> <hdfs_root> <pigmix_output> [parallel] [numruns] [runmapreduce] \n"; + print STDERR "Usage: $0 <pig_home> <pig_bin> <pigmix_jar> <hadoop_home> <hadoop_bin> <pig mix scripts dir> [hdfs_root] [pigmix_output] [parallel] [numruns] [runmapreduce] [cleanup_after_test]\n"; exit(-1); } my $pighome = shift; @@ -16,7 +16,14 @@ my $pigmixoutput = shift; my $parallel = shift; my $runs = shift; my $runmapreduce = shift; +my $cleanup_after_test = shift; my $pigjar = "$pighome/pig-withouthadoop.jar"; +if(!defined($hdfsroot)) { + $hdfsroot = '/user/pig/tests/data/pigmix'; +} +if(!defined($pigmixoutput)) { + $pigmixoutput = 'output'; +} if(!defined($parallel)) { $parallel = 40; } @@ -26,6 +33,9 @@ if(!defined($runs)) { if(!defined($runmapreduce)) { $runmapreduce = 1; } +if(!defined($cleanup_after_test)) { + $cleanup_after_test = 0; +} $ENV{'HADOOP_HOME'} = $hadoophome; $ENV{'HADOOP_CLIENT_OPTS'}="-Xmx1024m"; @@ -110,5 +120,9 @@ sub cleanup { print STDERR `$cmd 2>&1`; $cmd = "$pigbin -e rmf tmp"; print STDERR `$cmd 2>&1`; + if ($cleanup_after_test) { + $cmd = "$hadoopbin fs -rmr $pigmixoutput"; + print STDERR `$cmd 2>&1`; + } }
